乳腺癌预测系统 - 完整功能详解

这是一个基于 Django 框架开发的乳腺癌智能预测与管理系统,集成了机器学习、AI问答、安全防护等多项先进技术。

一、核心功能模块

1、乳腺癌预测引擎

需求背景
  • 辅助医疗诊断,提供早期乳腺癌风险筛查
  • 降低误诊率,提高检测效率
  • 为普通用户提供便捷的自我检查工具
技术实现
模型训练代码 (System/views.py):
def initialize_model()"""初始化随机森林模型(仅在应用启动时调用一次)"""
    global clf, sc, _model_initialized
    
    if _model_initialized:
        return clf, sc
    
    # 加载数据集
    data_path = os.path.join(settings.BASE_DIR, 'static', 'Breast Cancer Data.csv')
    dataset = pd.read_csv(data_path)
    X = dataset.iloc[, 232].values  # 提取30个特征值
    y = dataset.iloc[, 1].values     # 标签(良性/恶性)
    
    # 编码分类数据
    from sklearn.preprocessing import LabelEncoder
    labelencoder_X_1 = LabelEncoder()
    y = labelencoder_X_1.fit_transform(y)
    
    # 划分训练集和测试集(80%训练,20%测试)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=0)
    
    # 特征缩放(标准化)
    sc = StandardScaler()
    X_train = sc.fit_transform(X_train)
    X_test = sc.transform(X_test)
    
    # 训练随机森林模型
    clf = RandomForestClassifier(n_estimators=100)
    clf.fit(X_train, y_train)
    
    _model_initialized = True
    print("✓ 模型已加载到内存")
    
    return clf, sc
预测接口 (System/views.py - predict_cancer):
def predict_cancer(request)"""乳腺癌预测视图"""
    if request.method == 'POST'try# 收集表单数据(30个特征值)
            data = []
            input_data = {}
            for i in range(1, 31):
                value = float(request.POST.get(f'value{i}', 0))
                data.append(value)
                input_data[f'value{i}'] = value
            
            # 转换为 numpy 数组并重塑
            data_np = np.asarray(data, dtype=float)
            data_np = data_np.reshape(1, -1)
            
            # 使用模型进行预测
            t = time.time()
            data_scaled = sc.transform(data_np)  # 特征缩放
            output = clf.predict(data_scaled)     # 预测结果
            acc = clf.predict_proba(data_scaled)  # 置信度
            t = time.time() - t
            
            # 解析结果
            if output[0] == 1:
                result = '恶性 (Malignant)'
            else:
                result = '良性 (Benign)'
            
            # 获取准确率
            acc_value = max(acc[0][0], acc[0][1])
            
            # 保存预测记录到数据库(仅登录用户)
            if request.user.is_authenticated:
                from System.models import PredictionRecord
                PredictionRecord.objects.create(
                    user=request.user,
                    input_data=input_data,
                    result=result,
                    confidence=acc_value
                )
                log_action(request, 'PREDICT', f'用户进行了乳腺癌预测,结果:{result}')
            
            context = {
                'output':result,
                'accuracy':acc_value * 100,
                'time'round(t, 4)
            }
            
            return render(request, 'result.html', context)
            
        except Exception as e:
            messages.error(request, f'预测失败:{str(e)}')
            return redirect('index')
    
    return redirect('index')
输入特征(30个医学指标):
  • 半径、纹理、周长、面积、平滑度等细胞核特征
  • 每个特征包含均值、标准差、最差值三个维度
输出结果:
  • 预测类别:良性 (Benign) / 恶性 (Malignant)
  • 置信度:预测概率百分比
  • 预测耗时:毫秒级响应

2、AI 智能问答系统

需求背景
  • 为用户提供专业的乳腺癌相关知识解答
  • 支持多轮对话,保持上下文关联
  • 流式输出,提升用户体验
技术实现
流式问答接口 (System/views.py - ask_ai_stream):
@login_required
def ask_ai_stream(request)"""AI 问答流式输出 API - 支持上下文关联"""
    if request.method == 'POST':
        question = request.POST.get('question', '').strip()
        
        if not question:
            return StreamingHttpResponse([json.dumps({'error''请输入问题'}, ensure_ascii=False)], content_type='application/json')
        
        def generate_response()try# 获取最近 10 条对话历史作为上下文
                from System.models import ChatHistory
                recent_history = list(ChatHistory.objects.filter(user=request.user).order_by('-created_at')[10])
                recent_history.reverse()  # 按时间正序排列
                
                # 构建对话历史消息列表
                messages = [
                    {'role''system', 'content''你是一位专业的医疗助手,专门解答乳腺癌相关问题。请用中文回答。请注意保持对话的连贯性,根据之前的对话内容来回答当前问题。'}
                ]
                
                # 添加历史对话
                for history in recent_history:
                    messages.append({'role''user', 'content':history.question})
                    messages.append({'role''assistant', 'content':history.answer})
                
                # 添加当前问题
                messages.append({'role''user', 'content':question})
                
                # 调用 DeepSeek API(流式模式)
                headers = {
                    'Content-Type''application/json',
                    'Authorization'f'Bearer {settings.DEEPSEEK_API_KEY}'
                }
                
                payload = {
                    'model''deepseek-chat',
                    'messages':messages,
                    'temperature'0.7,
                    'max_tokens'1500,
                    'stream'True  # 启用流式输出
                }
                
                response = requests.post(
                    settings.DEEPSEEK_API_URL,
                    headers=headers,
                    json=payload,
                    timeout=60,
                    stream=True
                )
                
                if response.status_code == 200:
                    full_answer = ''
                    chunk_count = 0
                    for line in response.iter_lines()if line:
                            try:
                                line_str = line.decode('utf-8').strip()
                                if not line_str or line_str == 'data:[DONE]'continue
                                if line_str.startswith('data:'):
                                    json_str = line_str[6]  # 去掉 'data:' 前缀
                                    data = json.loads(json_str)
                                    if 'choices' in data and len(data['choices']) > 0:
                                        delta = data['choices'][0].get('delta', {})
                                        content = delta.get('content', '')
                                        if content:
                                            full_answer += content
                                            chunk_count += 1
                                            # 实时返回每个片段
                                            json_response = json.dumps({'content':content, 'done'False}, ensure_ascii=False)
                                            yield json_response.encode('utf-8') + b'\n'
                            except (json.JSONDecodeError, UnicodeDecodeError) as e:
                                continue
                    
                    # 保存完整的对话记录到数据库
                    from System.models import ChatHistory
                    chat = ChatHistory.objects.create(
                        user=request.user,
                        question=question,
                        answer=full_answer
                    )
                    
                    # 返回完成信号
                    done_json = json.dumps({'content''', 'done'True}, ensure_ascii=False)
                    yield done_json.encode('utf-8') + b'\n'
                    
            except Exception as e:
                yield json.dumps({'error'f'AI 问答失败:{str(e)}'}, ensure_ascii=False).encode('utf-8') + b'\n'
        
        return StreamingHttpResponse(generate_response(), content_type='text/event-stream')
    
    return StreamingHttpResponse([json.dumps({'error''仅支持 POST 请求'}, ensure_ascii=False)], content_type='application/json')
特性:
  • 上下文记忆:自动关联最近10条对话
  • 流式输出:逐字显示,减少等待焦虑
  • 历史记录:永久保存对话内容
  • 专业角色:设定为医疗助手

3、用户认证与安全系统

3.1 注册功能
需求:
  • 强密码策略
  • 图形验证码防机器人
  • 安全问题找回密码
实现代码 (System/views.py - register):
def register(request)"""用户注册视图(带验证码和安全问题)"""
    if request.method == 'POST':
        username = request.POST.get('username', '').strip()
        email = request.POST.get('email', '').strip()
        password = request.POST.get('password', '')
        confirm_password = request.POST.get('confirm_password', '')
        captcha_input = request.POST.get('captcha', '').strip()
        security_question = request.POST.get('security_question', '')
        security_answer = request.POST.get('security_answer', '').strip()
        
        # 验证验证码
        session_captcha = request.session.get('captcha_code')
        from System.captcha_utils import check_captcha
        if not check_captcha(session_captcha, captcha_input):
            messages.error(request, '验证码错误,请重新输入')
            log_action(request, 'CAPTCHA_FAILED', f'注册时验证码错误:{captcha_input}', log_type='WARNING')
            return render(request, 'register.html', {
                'username':username, 
                'email':email,
                'security_question':security_question
            })
        
        # 清除已使用的验证码
        del request.session['captcha_code']
        
        # 验证
        errors = []
        
        # 用户名验证
        if len(username) < 3:
            errors.append('用户名至少需要 3 个字符')
        elif User.objects.filter(username=username).exists():
            errors.append('用户名已存在')
        
        # 邮箱验证
        email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
        if not re.match(email_pattern, email):
            errors.append('邮箱格式不正确')
        elif User.objects.filter(email=email).exists():
            errors.append('该邮箱已被注册')
        
        # 密码验证
        if password != confirm_password:
            errors.append('两次输入的密码不一致')
        else# 验证密码强度
            is_valid, pwd_errors = validate_password_strength(password)
            if not is_valid:
                errors.extend(pwd_errors)
            
            # 检查密码是否与用户名相似
            if len(username) >= 3 and username.lower() in password.lower():
                errors.append('密码不能包含用户名')
        
        # 安全问题验证
        if not security_question:
            errors.append('请选择一个安全问题')
        if not security_answer or len(security_answer) < 2:
            errors.append('安全问题答案至少需要 2 个字符')
        
        if errors:
            for error in errors:
                messages.error(request, error)
            return render(request, 'register.html', {
                'username':username, 
                'email':email,
                'security_question':security_question
            })
        
        # 创建用户
        user = User.objects.create_user(username=username, email=email, password=password)
        user.save()
        
        # 保存安全问题
        from System.models import SecurityQuestion
        SecurityQuestion.objects.create(
            user=user,
            question=security_question,
            answer=security_answer.lower().strip()  # 转为小写便于后续验证
        )
        
        # 记录日志
        log_action(request, 'REGISTER', f'新用户注册:{username},邮箱:{email}')
        
        messages.success(request, '注册成功!请登录')
        return redirect('login')
    
    return render(request, 'register.html')
密码强度验证函数:
def validate_password_strength(password)"""验证密码强度 - 增强版"""
    errors = []
    
    # 长度检查(至少 8 位,建议 12 位以上)
    if len(password) < 8:
        errors.append('密码长度至少为 8 个字符')
    elif len(password) < 12:
        errors.append('建议使用 12 位以上的密码以提高安全性')
    
    # 必须包含大写字母
    if not re.search(r'[A-Z]', password):
        errors.append('密码必须包含至少一个大写字母')
    
    # 必须包含小写字母
    if not re.search(r'[a-z]', password):
        errors.append('密码必须包含至少一个小写字母')
    
    # 必须包含数字
    if not re.search(r'\d', password):
        errors.append('密码必须包含至少一个数字')
    
    # 必须包含特殊字符
    if not re.search(r"[!@#$%^&*()_+\-=\[\]{};':\"\\|,.<>\/?`~]", password):
        errors.append('密码必须包含至少一个特殊字符(如 !@#$%^&* 等)')
    
    # 不能包含连续重复字符(如 aaa, 111)
    if re.search(r'(.)\1{2,}', password):
        errors.append('密码不能包含连续重复的字符(如 aaa, 111)')
    
    # 不能包含连续序列(如 abc, 123)
    if _contains_sequential_chars(password):
        errors.append('密码不能包含连续序列字符(如 abc, 123, xyz)')
    
    # 不能是常见弱密码
    weak_passwords = [
        'password', '12345678', 'abcdefg', 'qwerty', 'admin123',
        '123456789', '1234567890', 'abc123', 'password1', 'iloveyou',
        'sunshine', 'princess', 'football', 'charlie', 'access',
        'master', 'michael', 'shadow', 'jennifer', 'monkey'
    ]
    if password.lower() in weak_passwords:
        errors.append('密码过于简单,请使用更复杂的密码')
    
    return len(errors) == 0, errors
3.2 登录与防暴力破解
实现代码 (System/views.py - user_login):
def user_login(request)"""用户登录视图(带验证码验证 + 记住我功能 + 暴力破解防护)"""
    if request.method == 'POST':
        login_input = request.POST.get('login_input', '').strip()
        password = request.POST.get('password', '')
        captcha_input = request.POST.get('captcha', '').strip()
        remember_me = request.POST.get('remember_me') == '1'
        
        # 验证验证码
        session_captcha = request.session.get('captcha_code')
        from System.captcha_utils import check_captcha
        if not check_captcha(session_captcha, captcha_input):
            messages.error(request, '验证码错误,请重新输入')
            log_action(request, 'CAPTCHA_FAILED', f'登录时验证码错误:{captcha_input}', log_type='WARNING')
            return render(request, 'login.html')
        
        # 清除已使用的验证码
        del request.session['captcha_code']
        
        # 检查IP是否被临时锁定(防暴力破解)
        client_ip = get_client_ip(request)
        if _is_ip_locked_for_login(client_ip):
            messages.error(request, '由于多次登录失败,您的IP已被临时锁定,请稍后再试')
            log_action(request, 'BRUTE_FORCE_ATTEMPT', f'IP {client_ip} 在锁定期间尝试登录', log_type='CRITICAL')
            return render(request, 'login.html')
        
        # 首先尝试查找用户(通过用户名或邮箱)
        user_obj = None
        try:
            user_obj = User.objects.get(username=login_input)
        except User.DoesNotExist:
            try:
                user_obj = User.objects.get(email=login_input)
            except User.DoesNotExist:
                user_obj = None
        
        # 如果找到用户,检查是否被禁用
        if user_obj is not None and not user_obj.is_active:
            # 获取禁用原因
            try:
                profile = user_obj.profile
                disable_reason = profile.disable_reason if profile and profile.disable_reason else '未提供具体原因'
                disabled_at = profile.disabled_at.strftime('%Y-%m-%d %H:%M') if profile and profile.disabled_at else '未知时间'
                disabled_by = profile.disabled_by.username if profile and profile.disabled_by else '管理员'
                
                error_message = f'您的账号已被管理员禁用<br>'
                error_message += f'<strong>禁用时间:</strong>{disabled_at}<br>'
                error_message += f'<strong>操作人:</strong>{disabled_by}<br>'
                error_message += f'<strong>禁用原因:</strong>{disable_reason}'
            except:
                error_message = '您的账号已被管理员禁用,请联系管理员解封'
            
            from django.utils.safestring import mark_safe
            messages.error(request, mark_safe(error_message))
            log_action(request, 'LOGIN', f'被禁用用户尝试登录:{user_obj.username}', log_type='WARNING')
            return render(request, 'login.html')
        
        # 尝试使用用户名或邮箱登录(验证密码)
        user = authenticate(request, username=login_input, password=password)
        
        if user is None# 如果用户名登录失败,尝试用邮箱查找用户
            try:
                user_obj = User.objects.get(email=login_input)
                user = authenticate(request, username=user_obj.username, password=password)
            except User.DoesNotExist:
                user = None
        
        if user is not None# 登录成功,清除失败计数
            _clear_login_failures(client_ip)
            
            login(request, user)
            messages.success(request, f'欢迎回来,{user.username}!')
            
            # 处理"记住我"功能 - 设置 session 过期时间
            if remember_me:
                # 14 天 = 14 * 24 * 60 * 60 = 1209600 秒
                request.session.set_expiry(1209600)
                log_action(request, 'LOGIN', f'用户登录并启用记住我:{user.username}')
            else# 默认 session 过期时间(浏览器关闭时过期)
                request.session.set_expiry(0)
                log_action(request, 'LOGIN', f'用户登录:{user.username}')
            
            # 管理员直接跳转到管理员仪表盘
            if user.is_staff:
                return redirect('admin_dashboard')
            
            next_url = request.GET.get('next', '/')
            return redirect(next_url)
        else# 登录失败,记录失败次数
            _record_login_failure(client_ip, login_input)
            messages.error(request, '用户名/邮箱或密码错误')
            log_action(request, 'CAPTCHA_FAILED', f'登录失败:{login_input}', log_type='WARNING')
            return render(request, 'login.html')
    
    return render(request, 'login.html')
防暴力破解机制:
def _record_login_failure(ip_address, username)"""记录登录失败次数(防暴力破解)"""
    from django.core.cache import cache
    
    # 构建缓存键
    fail_key = f"login_failures:{ip_address}"
    
    # 获取当前失败次数
    failures = cache.get(fail_key, 0)
    failures += 1
    
    # 更新缓存,15分钟后过期
    cache.set(fail_key, failures, 900)
    
    # 如果失败次数过多,记录安全事件
    if failures >= 5try:
            SystemLog.objects.create(
                log_type='WARNING',
                action_type='BRUTE_FORCE_ATTEMPT',
                description=f'IP {ip_address} 登录失败 {failures} 次,尝试用户名:{username}',
                ip_address=ip_address
            )
        except Exception as e:
            print(f'日志记录失败:{e}')
    
    # 如果失败次数达到10次,锁定IP 30分钟
    if failures >= 10:
        lock_key = f"login_lock:{ip_address}"
        cache.set(lock_key, True, 1800)  # 30分钟
        try:
            SystemLog.objects.create(
                log_type='CRITICAL',
                action_type='BRUTE_FORCE_ATTEMPT',
                description=f'IP {ip_address} 因多次登录失败被锁定30分钟(失败{failures}次)',
                ip_address=ip_address
            )
        except Exception as e:
            print(f'日志记录失败:{e}')


def _clear_login_failures(ip_address)"""清除登录失败计数(登录成功后调用)"""
    from django.core.cache import cache
    
    fail_key = f"login_failures:{ip_address}"
    lock_key = f"login_lock:{ip_address}"
    
    cache.delete(fail_key)
    cache.delete(lock_key)


def _is_ip_locked_for_login(ip_address)"""检查IP是否被锁定"""
    from django.core.cache import cache
    
    lock_key = f"login_lock:{ip_address}"
    return cache.get(lock_key, False)
3.3 找回密码流程
三步验证机制:
  1. 第一步:输入用户名/邮箱 + 验证码
  2. 第二步:回答安全问题
  3. 第三步:设置新密码
实现代码 (System/views.py):
def reset_password_request(request)"""找回密码 - 第一步:输入用户名/邮箱"""
    if request.method == 'POST':
        login_input = request.POST.get('login_input', '').strip()
        captcha_input = request.POST.get('captcha', '').strip()
        
        # 验证验证码
        session_captcha = request.session.get('captcha_code')
        from System.captcha_utils import check_captcha
        if not check_captcha(session_captcha, captcha_input):
            messages.error(request, '验证码错误,请重新输入')
            log_action(request, 'CAPTCHA_FAILED', f'重置密码时验证码错误:{captcha_input}', log_type='WARNING')
            return render(request, 'reset_password_request.html')
        
        # 清除已使用的验证码
        del request.session['captcha_code']
        
        # 查找用户
        user_obj = None
        try:
            user_obj = User.objects.get(username=login_input)
        except User.DoesNotExist:
            try:
                user_obj = User.objects.get(email=login_input)
            except User.DoesNotExist:
                pass
        
        if user_obj is None:
            messages.error(request, '该用户不存在')
            return render(request, 'reset_password_request.html')
        
        # 检查是否有安全问题
        try:
            security_question = user_obj.security_question
            # 将用户 ID 存入 session,用于下一步
            request.session['reset_password_user_id'] = user_obj.id
            log_action(request, 'RESET_PASSWORD', f'用户 {user_obj.username} 开始重置密码')
            return redirect('reset_password_verify')
        except:
            messages.error(request, '该用户未设置安全问题,请联系管理员')
            return render(request, 'reset_password_request.html')
    
    return render(request, 'reset_password_request.html')


def reset_password_verify(request)"""找回密码 - 第二步:验证安全问题"""
    # 检查是否有用户 ID 在 session 中
    user_id = request.session.get('reset_password_user_id')
    if not user_id:
        messages.error(request, '请先输入用户名/邮箱')
        return redirect('reset_password_request')
    
    try:
        user_obj = User.objects.get(id=user_id)
        security_question = user_obj.security_question
    except:
        messages.error(request, '用户信息无效,请重新操作')
        return redirect('reset_password_request')
    
    if request.method == 'POST':
        answer = request.POST.get('answer', '').strip().lower()
        captcha_input = request.POST.get('captcha', '').strip()
        
        # 验证验证码
        session_captcha = request.session.get('captcha_code')
        from System.captcha_utils import check_captcha
        if not check_captcha(session_captcha, captcha_input):
            messages.error(request, '验证码错误,请重新输入')
            log_action(request, 'CAPTCHA_FAILED', f'重置密码验证答案时验证码错误', log_type='WARNING')
            return render(request, 'reset_password_verify.html', {'question':security_question.question})
        
        # 清除已使用的验证码
        del request.session['captcha_code']
        
        # 验证答案(不区分大小写)
        if answer == security_question.answer.lower()# 验证通过,进入重置密码页面
            request.session['reset_password_verified'] = True
            return redirect('reset_password_confirm')
        else:
            messages.error(request, '答案错误,请重新回答')
            log_action(request, 'CAPTCHA_FAILED', f'重置密码答案错误', log_type='WARNING')
            return render(request, 'reset_password_verify.html', {'question':security_question.question})
    
    return render(request, 'reset_password_verify.html', {'question':security_question.question})


def reset_password_confirm(request)"""找回密码 - 第三步:设置新密码"""
    # 检查是否已验证
    verified = request.session.get('reset_password_verified', False)
    user_id = request.session.get('reset_password_user_id')
    
    if not verified or not user_id:
        messages.error(request, '请先完成安全验证')
        return redirect('reset_password_request')
    
    try:
        user_obj = User.objects.get(id=user_id)
    except:
        messages.error(request, '用户信息无效')
        return redirect('reset_password_request')
    
    if request.method == 'POST':
        new_password = request.POST.get('new_password', '')
        confirm_password = request.POST.get('confirm_password', '')
        
        # 验证密码
        errors = []
        if new_password != confirm_password:
            errors.append('两次输入的密码不一致')
        else# 验证密码强度
            is_valid, pwd_errors = validate_password_strength(new_password)
            if not is_valid:
                errors.extend(pwd_errors)
        
        if errors:
            for error in errors:
                messages.error(request, error)
            return render(request, 'reset_password_confirm.html')
        
        # 设置新密码
        user_obj.set_password(new_password)
        user_obj.save()
        
        # 清除 session
        del request.session['reset_password_user_id']
        del request.session['reset_password_verified']
        
        log_action(request, 'RESET_PASSWORD', f'用户 {user_obj.username} 成功重置密码')
        
        # 创建系统消息通知用户
        beijing_time = timezone.localtime(timezone.now()).strftime("%Y-%m-%d %H:%M:%S")
        create_system_message(
            user=user_obj,
            title='账户安全提醒:密码已重置',
            content=f'您的账户密码已于 {beijing_time} 通过安全问题验证成功重置。\n\n如果您没有进行此操作,请立即联系管理员!\n\n为了保障账户安全,建议您:\n1. 登录后立即修改密码\n2. 检查账户是否有异常活动\n3. 更新您的安全问题答案',
            severity='serious',  # 严重级别,需要用户确认
            category='other'
        )
        
        messages.success(request, '密码重置成功,请登录')
        return redirect('login')
    
    return render(request, 'reset_password_confirm.html')

4、数据管理与导出系统

4.1 预测历史记录
软删除机制 (System/views.py):
@login_required
def soft_delete_prediction(request, record_id)"""软删除预测记录(AJAX)"""
    if request.method != 'POST'return JsonResponse({'success'False, 'error''无效的请求方法'}, status=405)
    
    tryfrom System.models import PredictionRecord
        from django.utils import timezone
        
        record = PredictionRecord.objects.get(id=record_id, user=request.user)
        
        # 标记为已删除(非物理删除)
        record.is_deleted = True
        record.deleted_at = timezone.now()
        record.save()
        
        log_action(request, 'SOFT_DELETE', f'用户软删除预测记录 #{record_id},结果:{record.result}', log_type='INFO')
        
        return JsonResponse({'success'True, 'message''记录已删除', 'record_id':record_id})
    except PredictionRecord.DoesNotExist:
        return JsonResponse({'success'False, 'error''记录不存在'}, status=404)
    except Exception as e:
        return JsonResponse({'success'False, 'error'f'删除失败:{str(e)}'}, status=500)


@login_required
def undo_delete_prediction(request, record_id)"""撤销删除预测记录(AJAX)"""
    if request.method != 'POST'return JsonResponse({'success'False, 'error''无效的请求方法'}, status=405)
    
    tryfrom System.models import PredictionRecord
        
        record = PredictionRecord.objects.get(id=record_id, user=request.user, is_deleted=True)
        
        # 恢复记录
        record.is_deleted = False
        record.deleted_at = None
        record.save()
        
        log_action(request, 'UNDO_DELETE', f'用户撤销删除预测记录 #{record_id},结果:{record.result}', log_type='INFO')
        
        return JsonResponse({'success'True, 'message''记录已恢复'})
    except PredictionRecord.DoesNotExist:
        return JsonResponse({'success'False, 'error''记录不存在或未被删除'}, status=404)
    except Exception as e:
        return JsonResponse({'success'False, 'error'f'恢复失败:{str(e)}'}, status=500)
4.2 高级数据导出
  • 支持格式:CSV、JSON、PDF
  • 支持内容:预测记录、AI对话历史、完整账户数据
  • 时间筛选:全部、最近30天、自定义日期范围
  • 密码保护:ZIP加密、PDF加密

实现代码 (System/views.py - export_data_api):

@login_required
def export_data_api(request)"""高级数据导出 API - 支持多格式、时间筛选、内容选择、密码保护"""
    if request.method != 'POST'return JsonResponse({'success'False, 'error''仅支持 POST 请求'}, status=405)
    
    tryimport csv
        import io
        from datetime import timedelta
        from System.models import PredictionRecord, ChatHistory, ExportTask
        
        # 获取导出参数
        format_type = request.POST.get('format', 'csv')  # csv, json, pdf
        content_type = request.POST.get('content_type', 'predictions_only')  # predictions_only, predictions_and_chats, full_account
        time_range = request.POST.get('time_range', 'all')  # all, last_30_days, custom
        start_date_str = request.POST.get('start_date', '')
        end_date_str = request.POST.get('end_date', '')
        password = request.POST.get('password', '').strip()
        password_protected = bool(password and request.POST.get('enable_password') == 'true')
        
        # 验证参数
        if format_type not in ['csv', 'json', 'pdf']return JsonResponse({'success'False, 'error''不支持的导出格式'}, status=400)
        
        if content_type not in ['predictions_only', 'predictions_and_chats', 'full_account']return JsonResponse({'success'False, 'error''不支持的内容类型'}, status=400)
        
        # 创建导出任务记录
        export_task = ExportTask.objects.create(
            user=request.user,
            format_type=format_type,
            content_type=content_type,
            time_range=time_range,
            password_protected=password_protected,
            status='processing'
        )
        
        # 确定时间范围
        now = timezone.now()
        if time_range == 'last_30_days':
            start_date = now - timedelta(days=30)
            end_date = now
        elif time_range == 'custom'if not start_date_str or not end_date_str:
                export_task.status = 'failed'
                export_task.error_message = '自定义日期范围需要开始和结束日期'
                export_task.save()
                return JsonResponse({'success'False, 'error''请提供开始和结束日期'}, status=400)
            try:
                start_date = timezone.make_aware(timezone.datetime.strptime(start_date_str, '%Y-%m-%d'))
                end_date = timezone.make_aware(timezone.datetime.strptime(end_date_str + ' 23:59:59', '%Y-%m-%d %H:%M:%S'))
            except ValueError:
                export_task.status = 'failed'
                export_task.error_message = '日期格式错误'
                export_task.save()
                return JsonResponse({'success'False, 'error''日期格式错误,请使用 YYYY-MM-DD 格式'}, status=400)
        else# all
            start_date = None
            end_date = None
        
        # 收集数据
        data_dict = {
            'export_info'{
                'username':request.user.username,
                'email':request.user.email,
                'export_time':now.strftime('%Y-%m-%d %H:%M:%S'),
                'format':format_type,
                'content_type':content_type,
                'time_range':time_range,
            },
            'statistics'{}
        }
        
        # 获取预测记录
        predictions_qs = PredictionRecord.objects.filter(user=request.user, is_deleted=False)
        if start_date and end_date:
            predictions_qs = predictions_qs.filter(created_at__range=[start_date, end_date])
        predictions_qs = predictions_qs.order_by('-created_at')
        
        predictions = list(predictions_qs.values('id', 'input_data', 'result', 'confidence', 'created_at'))
        
        # 转换datetime为字符串
        for pred in predictions:
            pred['created_at'] = pred['created_at'].strftime('%Y-%m-%d %H:%M:%S')
        
        data_dict['predictions'] = predictions
        data_dict['statistics']['prediction_count'] = len(predictions)
        
        # 如果需要AI对话历史
        if content_type in ['predictions_and_chats', 'full_account']:
            chats_qs = ChatHistory.objects.filter(user=request.user)
            if start_date and end_date:
                chats_qs = chats_qs.filter(created_at__range=[start_date, end_date])
            chats_qs = chats_qs.order_by('-created_at')
            
            chats = list(chats_qs.values('id', 'question', 'answer', 'created_at'))
            for chat in chats:
                chat['created_at'] = chat['created_at'].strftime('%Y-%m-%d %H:%M:%S')
            
            data_dict['chat_history'] = chats
            data_dict['statistics']['chat_count'] = len(chats)
        
        # 如果是完整账户数据
        if content_type == 'full_account':
            data_dict['user_info'] = {
                'username':request.user.username,
                'email':request.user.email,
                'first_name':request.user.first_name,
                'last_name':request.user.last_name,
                'date_joined':request.user.date_joined.strftime('%Y-%m-%d %H:%M:%S'),
                'is_active':request.user.is_active,
            }
        
        # 计算统计信息
        if predictions:
            dates = [p['created_at'] for p in predictions]
            data_dict['statistics']['time_span'] = f'{min(dates)}{max(dates)}'
            benign_count = sum(1 for p in predictions if '良性' in p['result'])
            malignant_count = sum(1 for p in predictions if '恶性' in p['result'])
            data_dict['statistics']['benign_count'] = benign_count
            data_dict['statistics']['malignant_count'] = malignant_count
            avg_confidence = sum(p['confidence'] for p in predictions) / len(predictions)
            data_dict['statistics']['avg_confidence'] = f'{avg_confidence:.2f}%'
        
        # 根据格式生成文件
        timestamp = now.strftime('%Y%m%d_%H%M%S')
        
        if format_type == 'csv':
            response = generate_csv_export(data_dict, timestamp, password_protected, password)
        elif format_type == 'json':
            response = generate_json_export(data_dict, timestamp, password_protected, password)
        elif format_type == 'pdf':
            response = generate_pdf_export_advanced(data_dict, timestamp, password_protected, password)
        else:
            export_task.status = 'failed'
            export_task.error_message = '不支持的格式'
            export_task.save()
            return JsonResponse({'success'False, 'error''不支持的导出格式'}, status=400)
        
        # 更新导出任务状态
        export_task.status = 'completed'
        export_task.completed_at = now
        export_task.save()
        
        # 记录日志
        log_action(request, 'EXPORT_DATA', 
                  f'用户导出数据:格式={format_type}, 内容={content_type}, 时间范围={time_range}, '
                  f'记录数={len(predictions)}, 密码保护={password_protected}')
        
        return response
        
    except Exception as e:
        # 更新任务状态为失败
        try:
            export_task.status = 'failed'
            export_task.error_message = str(e)
            export_task.save()
        exceptpass
        
        return JsonResponse({'success'False, 'error'f'导出失败:{str(e)}'}, status=500)
CSV加密导出示例:
def generate_csv_export(data_dict, timestamp, password_protected=False, password=None)"""生成 CSV 格式导出(支持传统ZIP密码保护)"""
    import csv
    import io
    
    output = io.StringIO()
    writer = csv.writer(output)
    
    # 写入统计摘要
    writer.writerow(['=== 数据导出统计摘要 ==='])
    writer.writerow(['用户名', data_dict['export_info']['username']])
    writer.writerow(['导出时间', data_dict['export_info']['export_time']])
    writer.writerow(['导出格式', 'CSV'])
    writer.writerow(['内容类型', data_dict['export_info']['content_type']])
    writer.writerow([])
    
    stats = data_dict['statistics']
    writer.writerow(['总预测记录数', stats.get('prediction_count', 0)])
    writer.writerow(['良性记录数', stats.get('benign_count', 0)])
    writer.writerow(['恶性记录数', stats.get('malignant_count', 0)])
    writer.writerow(['平均置信度', stats.get('avg_confidence', 'N/A')])
    writer.writerow(['时间跨度', stats.get('time_span', 'N/A')])
    if 'chat_count' in stats:
        writer.writerow(['AI对话记录数', stats['chat_count']])
    writer.writerow([])
    writer.writerow([])
    
    # 写入预测记录
    if data_dict.get('predictions'):
        writer.writerow(['=== 预测记录 ==='])
        writer.writerow(['序号', '预测时间', '结果', '置信度(%)'])
        for idx, pred in enumerate(data_dict['predictions'], 1):
            writer.writerow([
                idx,
                pred['created_at'],
                pred['result'],
                f"{pred['confidence']*100.2f}"
            ])
        writer.writerow([])
        writer.writerow([])
    
    # 写入AI对话历史
    if data_dict.get('chat_history'):
        writer.writerow(['=== AI对话历史 ==='])
        writer.writerow(['序号', '时间', '问题', '回答'])
        for idx, chat in enumerate(data_dict['chat_history'], 1)# 限制问题和回答的长度
            question = chat['question'][200] if len(chat['question']) > 200 else chat['question']
            answer = chat['answer'][500] if len(chat['answer']) > 500 else chat['answer']
            writer.writerow([idx, chat['created_at'], question, answer])
        writer.writerow([])
    
    csv_content = output.getvalue()
    
    # 如果需要密码保护,创建带密码的ZIP文件
    if password_protected and password:
        try# 使用pyzipper创建带密码的ZIP(纯Python实现,无需编译)
            tryimport pyzipper
                
                # 创建内存中的ZIP文件
                zip_buffer = io.BytesIO()
                
                # 使用AES加密创建ZIP
                with pyzipper.AESZipFile(
                    zip_buffer,
                    'w',
                    compression=pyzipper.ZIP_LZMA,
                    encryption=pyzipper.WZ_AES
                ) as zf:
                    zf.setpassword(password.encode('utf-8'))
                    zf.writestr(
                        f'data_export_{timestamp}.csv',
                        csv_content.encode('utf-8-sig')
                    )
                
                zip_data = zip_buffer.getvalue()
                
                response = HttpResponse(zip_data, content_type='application/zip')
                response['Content-Disposition'] = f'attachment; filename="data_export_{timestamp}.zip"'
                response['X-Password-Protected'] = 'true'
                
                return response
                
            except ImportError:
                # 如果pyzipper未安装,尝试使用传统方法
                print('警告:pyzipper未安装,尝试使用备用方案')
                print('安装命令:pip install pyzipper')
                raise
                
        except Exception as e:
            # 如果加密失败,返回未加密的CSV
            print(f'CSV ZIP加密失败:{e}')
            import traceback
            traceback.print_exc()
    
    # 未加密的CSV(或加密失败时的回退)
    response = HttpResponse(csv_content, content_type='text/csv; charset=utf-8-sig')
    response['Content-Disposition'] = f'attachment; filename="data_export_{timestamp}.csv"'
    
    return response

5、消息通知系统

5.1 消息分类与危急程度
消息类型:
  • 系统维护与更新
  • Bug修复
  • 普遍问题反馈
  • 其他
危急程度:
  • 无(绿色)
  • 轻微(黄色)
  • 严重(橙色)- 需要用户确认
  • 紧急(红色)- 需要用户确认

模型定义 (System/models.py):

class Announcement(models.Model)"""消息通知模型(原公告系统)"""
    STATUS_CHOICES = [
        ('DRAFT', '草稿'),
        ('PUBLISHED', '已发布'),
        ('ARCHIVED', '已归档'),
    ]
    
    # 消息分类
    CATEGORY_CHOICES = [
        ('system_maintenance', '系统维护与更新'),
        ('bug_fix', 'Bug 修复'),
        ('common_feedback', '普遍问题反馈'),
        ('other', '其他'),
    ]
    
    # 危急程度
    SEVERITY_CHOICES = [
        ('none', '无'),      # 绿色
        ('minor', '轻微'),   # 黄色
        ('serious', '严重'), # 橙色
        ('urgent', '紧急'),  # 红色
    ]
    
    title = models.CharField(max_length=200, verbose_name='标题')
    content = models.TextField(verbose_name='内容')
    author = models.ForeignKey(User, on_delete=models.CASCADE, verbose_name='发布者', related_name='announcements')
    target_user = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True, 
                                    verbose_name='目标用户', related_name='target_announcements',
                                    help_text='留空表示对所有用户可见(公告),指定用户则仅该用户可见')
    status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='DRAFT', verbose_name='状态')
    category = models.CharField(max_length=30, choices=CATEGORY_CHOICES, default='other', verbose_name='消息分类')
    severity = models.CharField(max_length=20, choices=SEVERITY_CHOICES, default='none', verbose_name='危急程度')
    is_pinned = models.BooleanField(default=False, verbose_name='是否置顶')
    requires_confirmation = models.BooleanField(default=False, verbose_name='是否需要用户确认')
    created_at = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
    updated_at = models.DateTimeField(auto_now=True, verbose_name='更新时间')
    published_at = models.DateTimeField(null=True, blank=True, verbose_name='发布时间')
    
    class Meta:
        verbose_name = '消息'
        verbose_name_plural = '消息'
        ordering = ['-is_pinned', '-published_at']
    
    def __str__(self)return f'{self.title} - {self.get_status_display()}'
    
    def get_severity_color(self)"""获取危急程度对应的颜色"""
        color_map = {
            'none''#5cb85c',     # 绿色
            'minor''#f0ad4e',    # 黄色
            'serious''#ff9800',  # 橙色
            'urgent''#d9534f',   # 红色
        }
        return color_map.get(self.severity, '#5cb85c')
    
    def needs_user_confirmation(self)"""判断是否需要用户确认(严重及以上级别)"""
        return self.severity in ['serious', 'urgent']


class AnnouncementRead(models.Model)"""消息已读/已确认记录模型"""
    user = models.ForeignKey(User, on_delete=models.CASCADE, verbose_name='用户', related_name='read_announcements')
    announcement = models.ForeignKey(Announcement, on_delete=models.CASCADE, verbose_name='消息', related_name='read_by_users')
    read_at = models.DateTimeField(auto_now_add=True, verbose_name='阅读时间')
    confirmed = models.BooleanField(default=False, verbose_name='是否已确认')
    confirmed_at = models.DateTimeField(null=True, blank=True, verbose_name='确认时间')
    
    class Meta:
        verbose_name = '消息已读记录'
        verbose_name_plural = '消息已读记录'
        unique_together = ('user', 'announcement')  # 确保每个用户对每条消息只有一条记录
        ordering = ['-read_at']
    
    def __str__(self):
        status = '已确认' if self.confirmed else '已读未确认'
        return f'{self.user.username} {status} {self.announcement.title}'
5.2 系统自动消息
当用户执行敏感操作时(修改密码、修改邮箱等),系统会自动发送安全提醒消息:
def create_system_message(user, title, content, severity='serious', category='other')"""
    创建系统自动消息
    :param user:接收消息的用户
    :param title:消息标题
    :param content:消息内容
    :param severity:危急程度(none/minor/serious/urgent)
    :param category:消息分类
    """
    from System.models import Announcement
    from django.contrib.auth.models import User
    
    # 获取或创建 System 用户作为发布者
    system_user, created = User.objects.get_or_create(
        username='System',
        defaults={
            'email''system@breast-cancer-prediction.com',
            'is_active'False,  # 系统账户不活跃,不能登录
            'is_staff'False,
        }
    )
    
    if created:
        print('已创建 System 用户账户')
    
    # 严重和紧急级别自动设置需要确认
    requires_confirmation = severity in ['serious', 'urgent']
    
    message = Announcement.objects.create(
        title=title,
        content=content,
        author=system_user,
        target_user=user,  # 关键修改:指定目标用户,只有该用户可见
        status='PUBLISHED',
        category=category,
        severity=severity,
        is_pinned=False,
        requires_confirmation=requires_confirmation,
        published_at=timezone.now()
    )
    
    print(f'[系统消息] 已为用户 {user.username} 创建消息:{title}')
    return message

6、管理员后台系统

6.1 管理员功能清单
  1. 用户管理:查看、搜索、禁用/启用用户
  2. 日志管理:查看所有系统日志,支持筛选
  3. 日志导出:导出加密PDF格式的日志报告
  4. 消息管理:创建、编辑、删除消息通知
  5. 每日一言管理:管理首页显示的名言
  6. 数据大屏:可视化展示系统运行数据
  7. 健康监控:监控系统资源使用情况
6.2 用户管理实现
@login_required
def admin_toggle_user_status(request, user_id)"""禁用/启用用户账号(需要密码二次确认)"""
    # 检查是否为管理员
    if not request.user.is_staff:
        messages.error(request, '您没有权限执行此操作')
        return redirect('index')
    
    if request.method == 'POST'# POST请求:处理禁用/启用操作
        tryimport json
            data = json.loads(request.body)
            admin_password = data.get('password', '')
            disable_reason = data.get('reason', '').strip()
            
            # 验证管理员密码
            authenticated_user = authenticate(request, username=request.user.username, password=admin_password)
            if authenticated_user is Nonereturn JsonResponse({'success'False, 'error''密码错误,请重新输入'}, status=403)
            
            # 获取目标用户
            user = User.objects.get(id=user_id)
            if user.is_staff:
                return JsonResponse({'success'False, 'error''不能修改其他管理员的状态'}, status=403)
            if user.username == 'System'return JsonResponse({'success'False, 'error''不能操作系统账户'}, status=403)
            
            # 如果是禁用操作,必须有原因
            if user.is_active and not disable_reason:
                return JsonResponse({'success'False, 'error''禁用账户时必须填写原因'}, status=400)
            
            # 切换用户状态
            user.is_active = not user.is_active
            user.save()
            
            # 更新UserProfile中的禁用信息
            profile = user.profile
            if not user.is_active:
                # 禁用账户
                profile.disable_reason = disable_reason
                profile.disabled_at = timezone.now()
                profile.disabled_by = request.user
            else# 启用账户,清空禁用信息
                profile.disable_reason = None
                profile.disabled_at = None
                profile.disabled_by = None
            profile.save()
            
            status = '启用' if user.is_active else '禁用'
            log_action(request, 'TOGGLE_USER_STATUS', f'管理员{status}用户账号:{user.username},原因:{disable_reason if disable_reason else "无"}', log_type='WARNING')
            
            return JsonResponse({
                'success'True, 
                'message'f'已{status}用户 {user.username} 的账号',
                'new_status':user.is_active
            })
            
        except json.JSONDecodeError:
            return JsonResponse({'success'False, 'error''请求数据格式错误'}, status=400)
        except User.DoesNotExist:
            return JsonResponse({'success'False, 'error''用户不存在'}, status=404)
        except Exception as e:
            return JsonResponse({'success'False, 'error'f'操作失败:{str(e)}'}, status=500)
    
    # GET请求:返回当前用户状态
    try:
        user = User.objects.get(id=user_id)
        return JsonResponse({
            'user_id':user.id,
            'username':user.username,
            'is_active':user.is_active,
            'is_staff':user.is_staff
        })
    except User.DoesNotExist:
        return JsonResponse({'error''用户不存在'}, status=404)
6.3 系统健康监控
@login_required
def health_monitor(request)"""系统健康监控面板(仅管理员)"""
    import psutil
    import os
    from System.models import PredictionRecord, ChatHistory, SystemLog, Announcement
    
    # 检查是否为管理员
    if not request.user.is_staff:
        messages.error(request, '您没有权限访问此页面')
        return redirect('index')
    
    # 获取系统信息
    try# CPU 使用率
        cpu_percent = psutil.cpu_percent(interval=1)
        
        # 内存使用率
        memory = psutil.virtual_memory()
        memory_percent = memory.percent
        memory_used = memory.used / (1024 ** 3)  # GB
        memory_total = memory.total / (1024 ** 3)  # GB
        
        # 磁盘使用率
        disk = psutil.disk_usage('/')
        disk_percent = disk.percent
        disk_used = disk.used / (1024 ** 3)  # GB
        disk_total = disk.total / (1024 ** 3)  # GB
    except# Windows 系统可能需要不同的路径
        try:
            cpu_percent = psutil.cpu_percent(interval=1)
            memory = psutil.virtual_memory()
            memory_percent = memory.percent
            memory_used = memory.used / (1024 ** 3)
            memory_total = memory.total / (1024 ** 3)
            
            disk = psutil.disk_usage('C:\\')
            disk_percent = disk.percent
            disk_used = disk.used / (1024 ** 3)
            disk_total = disk.total / (1024 ** 3)
        except:
            cpu_percent = 0
            memory_percent = 0
            memory_used = 0
            memory_total = 0
            disk_percent = 0
            disk_used = 0
            disk_total = 0
    
    # 数据库统计
    # 只统计普通用户(排除管理员和系统账户)
    total_users = User.objects.filter(is_staff=False).exclude(username='System').count()
    active_users = User.objects.filter(is_active=True, is_staff=False).exclude(username='System').count()
    total_predictions = PredictionRecord.objects.count()
    total_ai_chats = ChatHistory.objects.count()
    total_logs = SystemLog.objects.count()
    error_logs = SystemLog.objects.filter(log_type='ERROR').count()
    warning_logs = SystemLog.objects.filter(log_type='WARNING').count()
    total_announcements = Announcement.objects.count()
    published_announcements = Announcement.objects.filter(status='PUBLISHED').count()
    
    # 在线用户数(活跃 session)
    from django.contrib.sessions.models import Session
    online_users = Session.objects.filter(expire_date__gte=timezone.now()).count()
    
    # 模型状态
    model_status = "已加载" if _model_initialized else "未加载"
    
    context = {
        'cpu_percent':cpu_percent,
        'memory_percent':memory_percent,
        'memory_used'round(memory_used, 2),
        'memory_total'round(memory_total, 2),
        'disk_percent':disk_percent,
        'disk_used'round(disk_used, 2),
        'disk_total'round(disk_total, 2),
        'total_users':total_users,
        'active_users':active_users,
        'online_users':online_users,
        'total_predictions':total_predictions,
        'total_ai_chats':total_ai_chats,
        'total_logs':total_logs,
        'error_logs':error_logs,
        'warning_logs':warning_logs,
        'total_announcements':total_announcements,
        'published_announcements':published_announcements,
        'model_status':model_status,
    }
    
    log_action(request, 'VIEW_HEALTH_MONITOR', '管理员查看系统健康监控')
    
    return render(request, 'health_monitor.html', context)

二、前后端接口交互

URL路由配置 (Breast_Cancer_Prediction_System/urls.py)

from django.contrib import admin
from django.urls import path
from django.conf import settings
from django.conf.urls.static import static
from System import views

urlpatterns = [
    # 管理员功能(必须放在 admin/ 之前)
    path('admin-login/', views.admin_login, name='admin_login'),
    path('admin-dashboard/', views.admin_dashboard, name='admin_dashboard'),
    path('myadmin/users/', views.admin_users, name='admin_users'),
    path('myadmin/users/toggle/<int:user_id>/', views.admin_toggle_user_status, name='admin_toggle_user'),
    path('myadmin/logs/', views.admin_logs, name='admin_logs'),
    path('myadmin/logs-export/', views.admin_logs_export_page, name='admin_logs_export_page'),
    path('myadmin/api/logs-export/', views.admin_logs_export_api, name='admin_logs_export_api'),
    path('myadmin/log-export-history/', views.admin_log_export_history, name='admin_log_export_history'),
    path('dashboard/', views.dashboard, name='dashboard'),
    path('admin-change-password/', views.admin_change_password, name='admin_change_password'),
    path('health-monitor/', views.health_monitor, name='health_monitor'),
    
    # Django 自带 admin
    path('admin/', admin.site.urls),
    
    # 其他路由
    path('', views.index, name='index'),
    path('predict/', views.predict_cancer, name='predict'),
    path('predict', views.predict_cancer, name='predict_no_slash'),  # 支持不带斜杠的 URL
    path('register/', views.register, name='register'),
    path('login/', views.user_login, name='login'),
    path('logout/', views.user_logout, name='logout'),
    path('delete-account/', views.delete_account, name='delete_account'),
    path('delete-account-confirm/', views.delete_account_confirm, name='delete_account_confirm'),
    path('change-password/', views.change_password, name='change_password'),
    path('ask-ai/', views.ask_ai, name='ask_ai'),
    path('ask-ai-stream/', views.ask_ai_stream, name='ask_ai_stream'),  # AI 流式问答 API
    path('chat-history/', views.chat_history, name='chat_history'),
    
    # 验证码生成
    path('captcha/', views.generate_captcha, name='generate_captcha'),
    
    # 找回密码
    path('reset-password/', views.reset_password_request, name='reset_password_request'),
    path('reset-password/verify/', views.reset_password_verify, name='reset_password_verify'),
    path('reset-password/confirm/', views.reset_password_confirm, name='reset_password_confirm'),
    
    # 用户个人中心与预测历史
    path('profile/', views.profile, name='profile'),
    path('account-management/', views.account_management, name='account_management'),
    path('prediction-history/', views.prediction_history, name='prediction_history'),
    path('prediction-history/delete/<int:record_id>/', views.delete_prediction_record, name='delete_prediction_record'),
    path('export-report/', views.export_report, name='export_report_all'),
    path('export-report/<int:record_id>/', views.export_report, name='export_report'),
    
    # 公告系统
    path('announcements/', views.announcement_list, name='announcement_list'),
    path('announcements/create/', views.announcement_create, name='announcement_create'),
    path('announcements/edit/<int:announcement_id>/', views.announcement_edit, name='announcement_edit'),
    path('announcements/delete/<int:announcement_id>/', views.announcement_delete, name='announcement_delete'),
    path('api/confirm-message/<int:message_id>/', views.confirm_message, name='confirm_message'),
    
    # 每日一言管理(管理员)
    path('admin-quotes/', views.admin_quotes, name='admin_quotes'),
    path('admin-quotes/toggle/<int:quote_id>/', views.admin_toggle_quote_status, name='admin_toggle_quote_status'),
    path('admin-quotes/delete/<int:quote_id>/', views.admin_delete_quote, name='admin_delete_quote'),
    
    # 软删除与撤销
    path('prediction-history/soft-delete/<int:record_id>/', views.soft_delete_prediction, name='soft_delete_prediction'),
    path('prediction-history/undo-delete/<int:record_id>/', views.undo_delete_prediction, name='undo_delete_prediction'),
    
    # 高级数据导出
    path('export-data/', views.export_data_page, name='export_data_page'),
    path('api/export-data/', views.export_data_api, name='export_data_api'),
]

# 开发环境下提供媒体文件服务
if settings.DEBUG:
    urlpatterns += static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT)

三、前端页面详细说明

1、首页 (home.html)

核心功能:
  • 显示紧急消息确认(需要用户确认的严重/紧急消息置顶)
  • 最新公告提示栏(最多3条未读公告)
  • 每日一言展示(渐变色卡片)
  • 乳腺癌知识科普(Tab切换:什么是乳腺癌/谁会得/症状)
  • 预测表单(30个输入字段,仅登录用户可见)
智能辅助功能:
// 实时验证与异常值检测
$('.field-input').on('input', function() {
    const value = parseFloat($(this).val());
    const config = fieldConfig[fieldId];
    
    // 检查是否在正常范围内
    if (value < config.min || value > config.max) {
        $(this).css('border-color', '#dc3545'); // 红色边框
        showWarning($(this), `⚠️ 警告:该值超出正常范围 (${config.min} - ${config.max})`);
    } else {
        $(this).css('border-color', '#28a745'); // 绿色边框
        hideWarning($(this));
    }
    
    // 自动保存草稿到 localStorage
    saveDraft();
});

// 填充示例数据按钮
$('#fillExampleBtn').click(function() {
    // 填充良性病例的示例数据
    for (let i = 1; i <= 30; i++) {
        $('#value' + i).val(exampleData['value' + i]);
    }
});

// 每30秒自动保存草稿
setInterval(saveDraft, 30000);

// 页面加载时恢复草稿
$(document).ready(function() {
    loadDraft();
});
用户体验优化:
  • 悬停显示字段医学解释(Bootstrap Tooltip)
  • 输入框实时颜色反馈(绿色=正常,红色=异常)
  • 自动保存草稿(LocalStorage)
  • 一键填充示例数据
  • 一键清空表单
  • 返回顶部按钮(滚动超过300px显示)

2、登录页面 (login.html)

核心功能:
  • 用户名/邮箱双模式登录
  • 图形验证码(点击刷新)
  • "记住我"功能(14天免登录)
  • 忘记密码链接
  • 注册账号入口
验证码刷新逻辑:
function refreshCaptcha() {
    var img = document.getElementById('captcha_image');
    img.src = '{% url "generate_captcha" %}?t=' + new Date().getTime();
}
样式特点:
  • 简洁的居中卡片布局
  • 响应式设计
  • 错误提示高亮显示

3、预测结果页 (result.html)

核心功能:
  • 超大徽章显示预测结果(良性/恶性)
  • 置信度与预测用时展示
  • 医疗免责声明
  • 返回首页重新预测按钮
  • 乳腺肿瘤相关知识卡片
动画效果:
/* 结果徽章脉冲动画 */
@keyframes pulse {
    0%, 100% { transform:scale(1); }
    50% { transform:scale(1.08); }
}

/* 发光效果 */
@keyframes glow {
    from { box-shadow:0 10px 30px rgba(0,0,0,0.3), 0 0 20px rgba(255,255,255,0.5); }
    to { box-shadow:0 10px 40px rgba(0,0,0,0.4), 0 0 40px rgba(255,255,255,0.8); }
}

.diagnosis-badge {
    animation:pulse 2s infinite, glow 3s ease-in-out infinite alternate;
}

/* 良性:绿色渐变 */
.benign {
    background:linear-gradient(135deg, #56ab2f 0%, #a8e063 100%);
}

/* 恶性:红色渐变 */
.malignant {
    background:linear-gradient(135deg, #ff416c 0%, #ff4b2b 100%);
}
视觉设计:
  • 紫色渐变背景(#667eea → #764ba2)
  • 卡片滑入动画(slideIn)
  • 统计卡片悬停上浮效果
  • 响应式布局

4、AI问答页面 (ai_ask.html)

核心功能:
  • 流式输出AI回答(逐字显示)
  • Markdown渲染(支持代码块、列表等)
  • 对话历史自动保存
  • 上下文关联(最近10轮对话)
  • 打字指示器动画
流式接收实现:
// 使用 fetch API 处理流式响应
fetch('{% url "ask_ai_stream" %}', {
    method:'POST',
    headers:{
        'Content-Type''application/x-www-form-urlencoded',
        'X-Requested-With''XMLHttpRequest'
    },
    body:new URLSearchParams({
        'question':question,
        'csrfmiddlewaretoken':$('input[name="csrfmiddlewaretoken"]').val()
    })
})
.then(response => {
    const reader = response.body.getReader();
    const decoder = new TextDecoder('utf-8');
    let buffer = '';
    
    function read() {
        return reader.read().then(({ done, value }) => {
            if (done) {
                typingIndicator.hide();
                askBtn.prop('disabled', false);
                questionInput.prop('disabled', false);
                return;
            }
            
            // 解码二进制数据
            const chunk = decoder.decode(value, { stream:true });
            buffer += chunk;
            
            // 按行分割处理JSON
            const lines = buffer.split('\n');
            buffer = lines.pop() || '';
            
            lines.forEach(line => {
                try {
                    const jsonData = JSON.parse(line);
                    
                    if (!jsonData.done) {
                        fullAnswer += jsonData.content;
                        // 实时渲染Markdown
                        aiMessageContent.html(marked.parse(fullAnswer));
                        chatBox.scrollTop(chatBox[0].scrollHeight);
                    }
                } catch (err) {
                    console.error('JSON解析错误:', err);
                }
            });
            
            return read();
        });
    }
    
    return read();
});
Markdown渲染:
<!-- 引入marked.js库 -->
<script src="https://cdn.staticfile.org/marked/4.3.0/marked.min.js"></script>

// 渲染AI回答
renderedContent = marked.parse(content);
UI特性:
  • 用户消息:左侧蓝色边框
  • AI消息:右侧青色边框
  • 淡入动画(fadeIn)
  • 打字指示器(三个闪烁的点)
  • 自动滚动到底部

四、数据模型设计

核心模型概览

  1. UserProfile:用户扩展信息(头像、禁用信息等)
  2. ChatHistory:AI对话历史
  3. PredictionRecord:预测记录(支持软删除)
  4. SystemLog:系统日志(记录所有操作)
  5. SecurityQuestion:安全问题(找回密码用)
  6. Announcement:消息通知(支持分类、危急程度)
  7. AnnouncementRead:消息已读/确认记录
  8. DailyQuote:每日一言
  9. ExportTask:数据导出任务
  10. LogExportRecord:管理员日志导出记录

五、安全防护体系

1、多层安全防护

中间件配置 (settings.py):
MIDDLEWARE = [
    'django.middleware.security.SecurityMiddleware',
    'System.security_middleware.SecurityMiddleware',  # 安全防护中间件(SQL注入、XSS等)
    'System.rate_limit_middleware.RateLimitMiddleware',  # 速率限制中间件(防暴力破解、DoS)
    'django.contrib.sessions.middleware.SessionMiddleware',
    'django.middleware.common.CommonMiddleware',
    'django.middleware.csrf.CsrfViewMiddleware',
    'django.contrib.auth.middleware.AuthenticationMiddleware',
    'django.contrib.messages.middleware.MessageMiddleware',
    'django.middleware.clickjacking.XFrameOptionsMiddleware',
    'System.middleware.LanguageMiddleware',  # 自定义语言中间件
]

2、防护措施

  • SQL注入防护:使用Django ORM,自动转义
  • XSS防护:模板自动转义 + 安全中间件
  • CSRF防护:Django内置CSRF Token
  • 暴力破解防护:IP锁定机制(5次警告,10次锁定30分钟)
  • 速率限制:防止DoS攻击
  • 密码强度验证:强制要求复杂密码
  • 图形验证码:防止机器人注册/登录
  • Session安全:HttpOnly Cookie,14天过期

六、技术栈总结

后端技术

  • 框架:Django 5.0+
  • 数据库:MySQL (mysqlclient)
  • 机器学习:scikit-learn (Random Forest)
  • 数据处理:pandas, numpy
  • AI集成:DeepSeek API
  • PDF生成:reportlab, PyPDF2
  • 加密:cryptography, pyzipper
  • 系统监控:psutil

前端技术

  • 模板引擎:Django Templates
  • 样式:Bootstrap
  • 图表:Chart.js (用于数据大屏)

开发环境

  • 时区:Asia/Shanghai (北京时间)
  • 语言:zh-hans (简体中文)
  • 缓存:LocMemCache (本地内存缓存)

七、系统特色亮点

  1. 智能化预测:基于随机森林算法,准确率高
  2. AI问答:集成DeepSeek大模型,支持上下文对话
  3. 安全防护:多层防护体系,企业级安全标准
  4. 数据导出:支持多格式、密码保护、时间筛选
  5. 消息系统:分级通知,重要消息需用户确认
  6. 软删除:数据可恢复,避免误删损失
  7. 健康监控:实时监控系统资源使用情况
  8. 日志审计:完整记录所有操作,支持导出审计
  9. 用户体验:流式输出、记住我、图形验证码
  10. 可扩展性:模块化设计,易于功能扩展

八、Linux部署建议

1、前言

  • 本项目作者已经成功部署在openEuler24.03 Linux环境中,并成功运行
  • 本系统在上传前删除了作者本人的deepseek api,请自行前往https://platform.deepseek.com/usage网址充值并获取deepseek api,将该api复制进入settings.py文件的DEEPSEEK_API_KEY环境变量。
  • 为确保本系统能够在Linux中正常运行,请通过以下命令下载字体库
sudo dnf install wqy-zenhei-fonts wqy-microhei-fonts

2、部署方式

第一步:系统更新和基础准备
# 1. 更新系统包
sudo dnf update -y

# 2. 安装基础工具(如果尚未安装)
sudo dnf install -y wget vim git
第二步:安装 Python 3
# 1. 安装 Python 3 和 pip
sudo dnf install -y python3 python3-pip python3-devel

# 2. 验证安装
python3 --version
第三步:安装 MySQL 数据库
# 1. 安装 MySQL 服务器和客户端
sudo dnf install -y mysql-server mysql-devel

# 2. 启动 MySQL 服务
sudo systemctl start mysqld

# 3. 设置开机自启
sudo systemctl enable mysqld

# 4. 查看 MySQL 状态
sudo systemctl status mysqld

# 5. 初始化 MySQL(设置 root 密码等)
sudo mysql_secure_installation

执行 mysql_secure_installation 时的交互说明:

  • 是否安装密码验证插件:N(本地使用可选)
  • 设置 root 密码:输入你的密码(记住这个密码)
  • 移除匿名用户:Y
  • 禁止 root 远程登录:Y
  • 移除测试数据库:Y
  • 重新加载权限表:Y

继续在 MySQL 命令行中执行:

6. 进入数据库
mysql -u root -p

7. 创建数据库
CREATE DATABASE breast_cancer_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

8. 退出 MySQL
EXIT;
第四步:上传项目文件到虚拟机(使用 Git)
# 在虚拟机上执行
git clone <你的仓库地址>
第五步:安装 Python 依赖
# 1. 进入项目目录
cd ./Breast_Cancer_Prediction_System

# 2. 安装虚拟环境
python3 -m venv venv

# 3. 运行虚拟环境
source venv/bin/activate

# 4. 安装依赖包(必须启动虚拟环境)
pip3 install -r requirements.txt
第六步:配置数据库连接
vim Breast_Cancer_Prediction_System/settings.py
第七步:数据库迁移
# 1. 进入项目目录
cd /Breast_Cancer_Prediction_System

# 2. 执行数据库迁移
python3 manage.py makemigrations
python3 manage.py migrate

# 3. 收集静态文件
python3 manage.py collectstatic --noinput

第八步:创建管理员账户

python3 create_admin_user.py
第九步:配置防火墙(如果需要外部访问)
# 1. 开放 8000 端口
sudo firewall-cmd --permanent --add-port=8000/tcp

# 2. 重载防火墙配置
sudo firewall-cmd --reload

# 3. 查看已开放的端口
sudo firewall-cmd --list-ports
第十步:启动 Django 服务
python3 manage.py runserver 0.0.0.0:8000

3、连接Linux

  • 输入Linux的<IP地址>:8000即可启动项目
  • 作者本人使用的是VMware Workstation Pro 17,将openEuler Linux安装进入虚拟机,随后通过局域网IP访问
  • 上述Linux部署步骤是在全新的openEuler虚拟机上部署

九、项目效果图

由于页面过多,无法一一展示,仅展示部分功能的效果页面

1、管理员页面

(1)系统健康监控面板

管理员仪表盘

(2)系统健康监控面板

系统健康监控面板

(3)用户管理页面

管理员-用户管理页面

(4)系统日志查看

系统日志查看

(5)消息中心

在这里插入图片描述

2、用户页面

(1)主页+乳腺癌预测表单页面

在这里插入图片描述

(2)预测结果页面

在这里插入图片描述

(3)AI问答页面
  • 成功案例页面(与乳腺癌相关的问题)
    在这里插入图片描述
  • 失败案例页面(与乳腺癌无关的问题)
    在这里插入图片描述
(4)对话历史页面

在这里插入图片描述

(5)预测历史页面

在这里插入图片描述

(6)消息页面

在这里插入图片描述

(7)账号管理页面

在这里插入图片描述

(8)修改密码页面

在这里插入图片描述

(9)注销页面

在这里插入图片描述

3、登录与注册页面

  • 登录页面
    在这里插入图片描述
  • 注册页面
    在这里插入图片描述
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐