乳腺癌预测系统 - 完整功能详解
·
乳腺癌预测系统 - 完整功能详解
这是一个基于 Django 框架开发的乳腺癌智能预测与管理系统,集成了机器学习、AI问答、安全防护等多项先进技术。
一、核心功能模块
1、乳腺癌预测引擎
需求背景
- 辅助医疗诊断,提供早期乳腺癌风险筛查
- 降低误诊率,提高检测效率
- 为普通用户提供便捷的自我检查工具
技术实现
模型训练代码 (System/views.py):
def initialize_model():
"""初始化随机森林模型(仅在应用启动时调用一次)"""
global clf, sc, _model_initialized
if _model_initialized:
return clf, sc
# 加载数据集
data_path = os.path.join(settings.BASE_DIR, 'static', 'Breast Cancer Data.csv')
dataset = pd.read_csv(data_path)
X = dataset.iloc[:, 2:32].values # 提取30个特征值
y = dataset.iloc[:, 1].values # 标签(良性/恶性)
# 编码分类数据
from sklearn.preprocessing import LabelEncoder
labelencoder_X_1 = LabelEncoder()
y = labelencoder_X_1.fit_transform(y)
# 划分训练集和测试集(80%训练,20%测试)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=0)
# 特征缩放(标准化)
sc = StandardScaler()
X_train = sc.fit_transform(X_train)
X_test = sc.transform(X_test)
# 训练随机森林模型
clf = RandomForestClassifier(n_estimators=100)
clf.fit(X_train, y_train)
_model_initialized = True
print("✓ 模型已加载到内存")
return clf, sc
预测接口 (System/views.py - predict_cancer):
def predict_cancer(request):
"""乳腺癌预测视图"""
if request.method == 'POST':
try:
# 收集表单数据(30个特征值)
data = []
input_data = {}
for i in range(1, 31):
value = float(request.POST.get(f'value{i}', 0))
data.append(value)
input_data[f'value{i}'] = value
# 转换为 numpy 数组并重塑
data_np = np.asarray(data, dtype=float)
data_np = data_np.reshape(1, -1)
# 使用模型进行预测
t = time.time()
data_scaled = sc.transform(data_np) # 特征缩放
output = clf.predict(data_scaled) # 预测结果
acc = clf.predict_proba(data_scaled) # 置信度
t = time.time() - t
# 解析结果
if output[0] == 1:
result = '恶性 (Malignant)'
else:
result = '良性 (Benign)'
# 获取准确率
acc_value = max(acc[0][0], acc[0][1])
# 保存预测记录到数据库(仅登录用户)
if request.user.is_authenticated:
from System.models import PredictionRecord
PredictionRecord.objects.create(
user=request.user,
input_data=input_data,
result=result,
confidence=acc_value
)
log_action(request, 'PREDICT', f'用户进行了乳腺癌预测,结果:{result}')
context = {
'output':result,
'accuracy':acc_value * 100,
'time':round(t, 4)
}
return render(request, 'result.html', context)
except Exception as e:
messages.error(request, f'预测失败:{str(e)}')
return redirect('index')
return redirect('index')
输入特征(30个医学指标):
- 半径、纹理、周长、面积、平滑度等细胞核特征
- 每个特征包含均值、标准差、最差值三个维度
输出结果:
- 预测类别:良性 (Benign) / 恶性 (Malignant)
- 置信度:预测概率百分比
- 预测耗时:毫秒级响应
2、AI 智能问答系统
需求背景
- 为用户提供专业的乳腺癌相关知识解答
- 支持多轮对话,保持上下文关联
- 流式输出,提升用户体验
技术实现
流式问答接口 (System/views.py - ask_ai_stream):
@login_required
def ask_ai_stream(request):
"""AI 问答流式输出 API - 支持上下文关联"""
if request.method == 'POST':
question = request.POST.get('question', '').strip()
if not question:
return StreamingHttpResponse([json.dumps({'error':'请输入问题'}, ensure_ascii=False)], content_type='application/json')
def generate_response():
try:
# 获取最近 10 条对话历史作为上下文
from System.models import ChatHistory
recent_history = list(ChatHistory.objects.filter(user=request.user).order_by('-created_at')[:10])
recent_history.reverse() # 按时间正序排列
# 构建对话历史消息列表
messages = [
{'role':'system', 'content':'你是一位专业的医疗助手,专门解答乳腺癌相关问题。请用中文回答。请注意保持对话的连贯性,根据之前的对话内容来回答当前问题。'}
]
# 添加历史对话
for history in recent_history:
messages.append({'role':'user', 'content':history.question})
messages.append({'role':'assistant', 'content':history.answer})
# 添加当前问题
messages.append({'role':'user', 'content':question})
# 调用 DeepSeek API(流式模式)
headers = {
'Content-Type':'application/json',
'Authorization':f'Bearer {settings.DEEPSEEK_API_KEY}'
}
payload = {
'model':'deepseek-chat',
'messages':messages,
'temperature':0.7,
'max_tokens':1500,
'stream':True # 启用流式输出
}
response = requests.post(
settings.DEEPSEEK_API_URL,
headers=headers,
json=payload,
timeout=60,
stream=True
)
if response.status_code == 200:
full_answer = ''
chunk_count = 0
for line in response.iter_lines():
if line:
try:
line_str = line.decode('utf-8').strip()
if not line_str or line_str == 'data:[DONE]':
continue
if line_str.startswith('data:'):
json_str = line_str[6:] # 去掉 'data:' 前缀
data = json.loads(json_str)
if 'choices' in data and len(data['choices']) > 0:
delta = data['choices'][0].get('delta', {})
content = delta.get('content', '')
if content:
full_answer += content
chunk_count += 1
# 实时返回每个片段
json_response = json.dumps({'content':content, 'done':False}, ensure_ascii=False)
yield json_response.encode('utf-8') + b'\n'
except (json.JSONDecodeError, UnicodeDecodeError) as e:
continue
# 保存完整的对话记录到数据库
from System.models import ChatHistory
chat = ChatHistory.objects.create(
user=request.user,
question=question,
answer=full_answer
)
# 返回完成信号
done_json = json.dumps({'content':'', 'done':True}, ensure_ascii=False)
yield done_json.encode('utf-8') + b'\n'
except Exception as e:
yield json.dumps({'error':f'AI 问答失败:{str(e)}'}, ensure_ascii=False).encode('utf-8') + b'\n'
return StreamingHttpResponse(generate_response(), content_type='text/event-stream')
return StreamingHttpResponse([json.dumps({'error':'仅支持 POST 请求'}, ensure_ascii=False)], content_type='application/json')
特性:
- 上下文记忆:自动关联最近10条对话
- 流式输出:逐字显示,减少等待焦虑
- 历史记录:永久保存对话内容
- 专业角色:设定为医疗助手
3、用户认证与安全系统
3.1 注册功能
需求:
- 强密码策略
- 图形验证码防机器人
- 安全问题找回密码
实现代码 (System/views.py - register):
def register(request):
"""用户注册视图(带验证码和安全问题)"""
if request.method == 'POST':
username = request.POST.get('username', '').strip()
email = request.POST.get('email', '').strip()
password = request.POST.get('password', '')
confirm_password = request.POST.get('confirm_password', '')
captcha_input = request.POST.get('captcha', '').strip()
security_question = request.POST.get('security_question', '')
security_answer = request.POST.get('security_answer', '').strip()
# 验证验证码
session_captcha = request.session.get('captcha_code')
from System.captcha_utils import check_captcha
if not check_captcha(session_captcha, captcha_input):
messages.error(request, '验证码错误,请重新输入')
log_action(request, 'CAPTCHA_FAILED', f'注册时验证码错误:{captcha_input}', log_type='WARNING')
return render(request, 'register.html', {
'username':username,
'email':email,
'security_question':security_question
})
# 清除已使用的验证码
del request.session['captcha_code']
# 验证
errors = []
# 用户名验证
if len(username) < 3:
errors.append('用户名至少需要 3 个字符')
elif User.objects.filter(username=username).exists():
errors.append('用户名已存在')
# 邮箱验证
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_pattern, email):
errors.append('邮箱格式不正确')
elif User.objects.filter(email=email).exists():
errors.append('该邮箱已被注册')
# 密码验证
if password != confirm_password:
errors.append('两次输入的密码不一致')
else:
# 验证密码强度
is_valid, pwd_errors = validate_password_strength(password)
if not is_valid:
errors.extend(pwd_errors)
# 检查密码是否与用户名相似
if len(username) >= 3 and username.lower() in password.lower():
errors.append('密码不能包含用户名')
# 安全问题验证
if not security_question:
errors.append('请选择一个安全问题')
if not security_answer or len(security_answer) < 2:
errors.append('安全问题答案至少需要 2 个字符')
if errors:
for error in errors:
messages.error(request, error)
return render(request, 'register.html', {
'username':username,
'email':email,
'security_question':security_question
})
# 创建用户
user = User.objects.create_user(username=username, email=email, password=password)
user.save()
# 保存安全问题
from System.models import SecurityQuestion
SecurityQuestion.objects.create(
user=user,
question=security_question,
answer=security_answer.lower().strip() # 转为小写便于后续验证
)
# 记录日志
log_action(request, 'REGISTER', f'新用户注册:{username},邮箱:{email}')
messages.success(request, '注册成功!请登录')
return redirect('login')
return render(request, 'register.html')
密码强度验证函数:
def validate_password_strength(password):
"""验证密码强度 - 增强版"""
errors = []
# 长度检查(至少 8 位,建议 12 位以上)
if len(password) < 8:
errors.append('密码长度至少为 8 个字符')
elif len(password) < 12:
errors.append('建议使用 12 位以上的密码以提高安全性')
# 必须包含大写字母
if not re.search(r'[A-Z]', password):
errors.append('密码必须包含至少一个大写字母')
# 必须包含小写字母
if not re.search(r'[a-z]', password):
errors.append('密码必须包含至少一个小写字母')
# 必须包含数字
if not re.search(r'\d', password):
errors.append('密码必须包含至少一个数字')
# 必须包含特殊字符
if not re.search(r"[!@#$%^&*()_+\-=\[\]{};':\"\\|,.<>\/?`~]", password):
errors.append('密码必须包含至少一个特殊字符(如 !@#$%^&* 等)')
# 不能包含连续重复字符(如 aaa, 111)
if re.search(r'(.)\1{2,}', password):
errors.append('密码不能包含连续重复的字符(如 aaa, 111)')
# 不能包含连续序列(如 abc, 123)
if _contains_sequential_chars(password):
errors.append('密码不能包含连续序列字符(如 abc, 123, xyz)')
# 不能是常见弱密码
weak_passwords = [
'password', '12345678', 'abcdefg', 'qwerty', 'admin123',
'123456789', '1234567890', 'abc123', 'password1', 'iloveyou',
'sunshine', 'princess', 'football', 'charlie', 'access',
'master', 'michael', 'shadow', 'jennifer', 'monkey'
]
if password.lower() in weak_passwords:
errors.append('密码过于简单,请使用更复杂的密码')
return len(errors) == 0, errors
3.2 登录与防暴力破解
实现代码 (System/views.py - user_login):
def user_login(request):
"""用户登录视图(带验证码验证 + 记住我功能 + 暴力破解防护)"""
if request.method == 'POST':
login_input = request.POST.get('login_input', '').strip()
password = request.POST.get('password', '')
captcha_input = request.POST.get('captcha', '').strip()
remember_me = request.POST.get('remember_me') == '1'
# 验证验证码
session_captcha = request.session.get('captcha_code')
from System.captcha_utils import check_captcha
if not check_captcha(session_captcha, captcha_input):
messages.error(request, '验证码错误,请重新输入')
log_action(request, 'CAPTCHA_FAILED', f'登录时验证码错误:{captcha_input}', log_type='WARNING')
return render(request, 'login.html')
# 清除已使用的验证码
del request.session['captcha_code']
# 检查IP是否被临时锁定(防暴力破解)
client_ip = get_client_ip(request)
if _is_ip_locked_for_login(client_ip):
messages.error(request, '由于多次登录失败,您的IP已被临时锁定,请稍后再试')
log_action(request, 'BRUTE_FORCE_ATTEMPT', f'IP {client_ip} 在锁定期间尝试登录', log_type='CRITICAL')
return render(request, 'login.html')
# 首先尝试查找用户(通过用户名或邮箱)
user_obj = None
try:
user_obj = User.objects.get(username=login_input)
except User.DoesNotExist:
try:
user_obj = User.objects.get(email=login_input)
except User.DoesNotExist:
user_obj = None
# 如果找到用户,检查是否被禁用
if user_obj is not None and not user_obj.is_active:
# 获取禁用原因
try:
profile = user_obj.profile
disable_reason = profile.disable_reason if profile and profile.disable_reason else '未提供具体原因'
disabled_at = profile.disabled_at.strftime('%Y-%m-%d %H:%M') if profile and profile.disabled_at else '未知时间'
disabled_by = profile.disabled_by.username if profile and profile.disabled_by else '管理员'
error_message = f'您的账号已被管理员禁用<br>'
error_message += f'<strong>禁用时间:</strong>{disabled_at}<br>'
error_message += f'<strong>操作人:</strong>{disabled_by}<br>'
error_message += f'<strong>禁用原因:</strong>{disable_reason}'
except:
error_message = '您的账号已被管理员禁用,请联系管理员解封'
from django.utils.safestring import mark_safe
messages.error(request, mark_safe(error_message))
log_action(request, 'LOGIN', f'被禁用用户尝试登录:{user_obj.username}', log_type='WARNING')
return render(request, 'login.html')
# 尝试使用用户名或邮箱登录(验证密码)
user = authenticate(request, username=login_input, password=password)
if user is None:
# 如果用户名登录失败,尝试用邮箱查找用户
try:
user_obj = User.objects.get(email=login_input)
user = authenticate(request, username=user_obj.username, password=password)
except User.DoesNotExist:
user = None
if user is not None:
# 登录成功,清除失败计数
_clear_login_failures(client_ip)
login(request, user)
messages.success(request, f'欢迎回来,{user.username}!')
# 处理"记住我"功能 - 设置 session 过期时间
if remember_me:
# 14 天 = 14 * 24 * 60 * 60 = 1209600 秒
request.session.set_expiry(1209600)
log_action(request, 'LOGIN', f'用户登录并启用记住我:{user.username}')
else:
# 默认 session 过期时间(浏览器关闭时过期)
request.session.set_expiry(0)
log_action(request, 'LOGIN', f'用户登录:{user.username}')
# 管理员直接跳转到管理员仪表盘
if user.is_staff:
return redirect('admin_dashboard')
next_url = request.GET.get('next', '/')
return redirect(next_url)
else:
# 登录失败,记录失败次数
_record_login_failure(client_ip, login_input)
messages.error(request, '用户名/邮箱或密码错误')
log_action(request, 'CAPTCHA_FAILED', f'登录失败:{login_input}', log_type='WARNING')
return render(request, 'login.html')
return render(request, 'login.html')
防暴力破解机制:
def _record_login_failure(ip_address, username):
"""记录登录失败次数(防暴力破解)"""
from django.core.cache import cache
# 构建缓存键
fail_key = f"login_failures:{ip_address}"
# 获取当前失败次数
failures = cache.get(fail_key, 0)
failures += 1
# 更新缓存,15分钟后过期
cache.set(fail_key, failures, 900)
# 如果失败次数过多,记录安全事件
if failures >= 5:
try:
SystemLog.objects.create(
log_type='WARNING',
action_type='BRUTE_FORCE_ATTEMPT',
description=f'IP {ip_address} 登录失败 {failures} 次,尝试用户名:{username}',
ip_address=ip_address
)
except Exception as e:
print(f'日志记录失败:{e}')
# 如果失败次数达到10次,锁定IP 30分钟
if failures >= 10:
lock_key = f"login_lock:{ip_address}"
cache.set(lock_key, True, 1800) # 30分钟
try:
SystemLog.objects.create(
log_type='CRITICAL',
action_type='BRUTE_FORCE_ATTEMPT',
description=f'IP {ip_address} 因多次登录失败被锁定30分钟(失败{failures}次)',
ip_address=ip_address
)
except Exception as e:
print(f'日志记录失败:{e}')
def _clear_login_failures(ip_address):
"""清除登录失败计数(登录成功后调用)"""
from django.core.cache import cache
fail_key = f"login_failures:{ip_address}"
lock_key = f"login_lock:{ip_address}"
cache.delete(fail_key)
cache.delete(lock_key)
def _is_ip_locked_for_login(ip_address):
"""检查IP是否被锁定"""
from django.core.cache import cache
lock_key = f"login_lock:{ip_address}"
return cache.get(lock_key, False)
3.3 找回密码流程
三步验证机制:
- 第一步:输入用户名/邮箱 + 验证码
- 第二步:回答安全问题
- 第三步:设置新密码
实现代码 (System/views.py):
def reset_password_request(request):
"""找回密码 - 第一步:输入用户名/邮箱"""
if request.method == 'POST':
login_input = request.POST.get('login_input', '').strip()
captcha_input = request.POST.get('captcha', '').strip()
# 验证验证码
session_captcha = request.session.get('captcha_code')
from System.captcha_utils import check_captcha
if not check_captcha(session_captcha, captcha_input):
messages.error(request, '验证码错误,请重新输入')
log_action(request, 'CAPTCHA_FAILED', f'重置密码时验证码错误:{captcha_input}', log_type='WARNING')
return render(request, 'reset_password_request.html')
# 清除已使用的验证码
del request.session['captcha_code']
# 查找用户
user_obj = None
try:
user_obj = User.objects.get(username=login_input)
except User.DoesNotExist:
try:
user_obj = User.objects.get(email=login_input)
except User.DoesNotExist:
pass
if user_obj is None:
messages.error(request, '该用户不存在')
return render(request, 'reset_password_request.html')
# 检查是否有安全问题
try:
security_question = user_obj.security_question
# 将用户 ID 存入 session,用于下一步
request.session['reset_password_user_id'] = user_obj.id
log_action(request, 'RESET_PASSWORD', f'用户 {user_obj.username} 开始重置密码')
return redirect('reset_password_verify')
except:
messages.error(request, '该用户未设置安全问题,请联系管理员')
return render(request, 'reset_password_request.html')
return render(request, 'reset_password_request.html')
def reset_password_verify(request):
"""找回密码 - 第二步:验证安全问题"""
# 检查是否有用户 ID 在 session 中
user_id = request.session.get('reset_password_user_id')
if not user_id:
messages.error(request, '请先输入用户名/邮箱')
return redirect('reset_password_request')
try:
user_obj = User.objects.get(id=user_id)
security_question = user_obj.security_question
except:
messages.error(request, '用户信息无效,请重新操作')
return redirect('reset_password_request')
if request.method == 'POST':
answer = request.POST.get('answer', '').strip().lower()
captcha_input = request.POST.get('captcha', '').strip()
# 验证验证码
session_captcha = request.session.get('captcha_code')
from System.captcha_utils import check_captcha
if not check_captcha(session_captcha, captcha_input):
messages.error(request, '验证码错误,请重新输入')
log_action(request, 'CAPTCHA_FAILED', f'重置密码验证答案时验证码错误', log_type='WARNING')
return render(request, 'reset_password_verify.html', {'question':security_question.question})
# 清除已使用的验证码
del request.session['captcha_code']
# 验证答案(不区分大小写)
if answer == security_question.answer.lower():
# 验证通过,进入重置密码页面
request.session['reset_password_verified'] = True
return redirect('reset_password_confirm')
else:
messages.error(request, '答案错误,请重新回答')
log_action(request, 'CAPTCHA_FAILED', f'重置密码答案错误', log_type='WARNING')
return render(request, 'reset_password_verify.html', {'question':security_question.question})
return render(request, 'reset_password_verify.html', {'question':security_question.question})
def reset_password_confirm(request):
"""找回密码 - 第三步:设置新密码"""
# 检查是否已验证
verified = request.session.get('reset_password_verified', False)
user_id = request.session.get('reset_password_user_id')
if not verified or not user_id:
messages.error(request, '请先完成安全验证')
return redirect('reset_password_request')
try:
user_obj = User.objects.get(id=user_id)
except:
messages.error(request, '用户信息无效')
return redirect('reset_password_request')
if request.method == 'POST':
new_password = request.POST.get('new_password', '')
confirm_password = request.POST.get('confirm_password', '')
# 验证密码
errors = []
if new_password != confirm_password:
errors.append('两次输入的密码不一致')
else:
# 验证密码强度
is_valid, pwd_errors = validate_password_strength(new_password)
if not is_valid:
errors.extend(pwd_errors)
if errors:
for error in errors:
messages.error(request, error)
return render(request, 'reset_password_confirm.html')
# 设置新密码
user_obj.set_password(new_password)
user_obj.save()
# 清除 session
del request.session['reset_password_user_id']
del request.session['reset_password_verified']
log_action(request, 'RESET_PASSWORD', f'用户 {user_obj.username} 成功重置密码')
# 创建系统消息通知用户
beijing_time = timezone.localtime(timezone.now()).strftime("%Y-%m-%d %H:%M:%S")
create_system_message(
user=user_obj,
title='账户安全提醒:密码已重置',
content=f'您的账户密码已于 {beijing_time} 通过安全问题验证成功重置。\n\n如果您没有进行此操作,请立即联系管理员!\n\n为了保障账户安全,建议您:\n1. 登录后立即修改密码\n2. 检查账户是否有异常活动\n3. 更新您的安全问题答案',
severity='serious', # 严重级别,需要用户确认
category='other'
)
messages.success(request, '密码重置成功,请登录')
return redirect('login')
return render(request, 'reset_password_confirm.html')
4、数据管理与导出系统
4.1 预测历史记录
软删除机制 (System/views.py):
@login_required
def soft_delete_prediction(request, record_id):
"""软删除预测记录(AJAX)"""
if request.method != 'POST':
return JsonResponse({'success':False, 'error':'无效的请求方法'}, status=405)
try:
from System.models import PredictionRecord
from django.utils import timezone
record = PredictionRecord.objects.get(id=record_id, user=request.user)
# 标记为已删除(非物理删除)
record.is_deleted = True
record.deleted_at = timezone.now()
record.save()
log_action(request, 'SOFT_DELETE', f'用户软删除预测记录 #{record_id},结果:{record.result}', log_type='INFO')
return JsonResponse({'success':True, 'message':'记录已删除', 'record_id':record_id})
except PredictionRecord.DoesNotExist:
return JsonResponse({'success':False, 'error':'记录不存在'}, status=404)
except Exception as e:
return JsonResponse({'success':False, 'error':f'删除失败:{str(e)}'}, status=500)
@login_required
def undo_delete_prediction(request, record_id):
"""撤销删除预测记录(AJAX)"""
if request.method != 'POST':
return JsonResponse({'success':False, 'error':'无效的请求方法'}, status=405)
try:
from System.models import PredictionRecord
record = PredictionRecord.objects.get(id=record_id, user=request.user, is_deleted=True)
# 恢复记录
record.is_deleted = False
record.deleted_at = None
record.save()
log_action(request, 'UNDO_DELETE', f'用户撤销删除预测记录 #{record_id},结果:{record.result}', log_type='INFO')
return JsonResponse({'success':True, 'message':'记录已恢复'})
except PredictionRecord.DoesNotExist:
return JsonResponse({'success':False, 'error':'记录不存在或未被删除'}, status=404)
except Exception as e:
return JsonResponse({'success':False, 'error':f'恢复失败:{str(e)}'}, status=500)
4.2 高级数据导出
- 支持格式:CSV、JSON、PDF
- 支持内容:预测记录、AI对话历史、完整账户数据
- 时间筛选:全部、最近30天、自定义日期范围
- 密码保护:ZIP加密、PDF加密
实现代码 (System/views.py - export_data_api):
@login_required
def export_data_api(request):
"""高级数据导出 API - 支持多格式、时间筛选、内容选择、密码保护"""
if request.method != 'POST':
return JsonResponse({'success':False, 'error':'仅支持 POST 请求'}, status=405)
try:
import csv
import io
from datetime import timedelta
from System.models import PredictionRecord, ChatHistory, ExportTask
# 获取导出参数
format_type = request.POST.get('format', 'csv') # csv, json, pdf
content_type = request.POST.get('content_type', 'predictions_only') # predictions_only, predictions_and_chats, full_account
time_range = request.POST.get('time_range', 'all') # all, last_30_days, custom
start_date_str = request.POST.get('start_date', '')
end_date_str = request.POST.get('end_date', '')
password = request.POST.get('password', '').strip()
password_protected = bool(password and request.POST.get('enable_password') == 'true')
# 验证参数
if format_type not in ['csv', 'json', 'pdf']:
return JsonResponse({'success':False, 'error':'不支持的导出格式'}, status=400)
if content_type not in ['predictions_only', 'predictions_and_chats', 'full_account']:
return JsonResponse({'success':False, 'error':'不支持的内容类型'}, status=400)
# 创建导出任务记录
export_task = ExportTask.objects.create(
user=request.user,
format_type=format_type,
content_type=content_type,
time_range=time_range,
password_protected=password_protected,
status='processing'
)
# 确定时间范围
now = timezone.now()
if time_range == 'last_30_days':
start_date = now - timedelta(days=30)
end_date = now
elif time_range == 'custom':
if not start_date_str or not end_date_str:
export_task.status = 'failed'
export_task.error_message = '自定义日期范围需要开始和结束日期'
export_task.save()
return JsonResponse({'success':False, 'error':'请提供开始和结束日期'}, status=400)
try:
start_date = timezone.make_aware(timezone.datetime.strptime(start_date_str, '%Y-%m-%d'))
end_date = timezone.make_aware(timezone.datetime.strptime(end_date_str + ' 23:59:59', '%Y-%m-%d %H:%M:%S'))
except ValueError:
export_task.status = 'failed'
export_task.error_message = '日期格式错误'
export_task.save()
return JsonResponse({'success':False, 'error':'日期格式错误,请使用 YYYY-MM-DD 格式'}, status=400)
else: # all
start_date = None
end_date = None
# 收集数据
data_dict = {
'export_info':{
'username':request.user.username,
'email':request.user.email,
'export_time':now.strftime('%Y-%m-%d %H:%M:%S'),
'format':format_type,
'content_type':content_type,
'time_range':time_range,
},
'statistics':{}
}
# 获取预测记录
predictions_qs = PredictionRecord.objects.filter(user=request.user, is_deleted=False)
if start_date and end_date:
predictions_qs = predictions_qs.filter(created_at__range=[start_date, end_date])
predictions_qs = predictions_qs.order_by('-created_at')
predictions = list(predictions_qs.values('id', 'input_data', 'result', 'confidence', 'created_at'))
# 转换datetime为字符串
for pred in predictions:
pred['created_at'] = pred['created_at'].strftime('%Y-%m-%d %H:%M:%S')
data_dict['predictions'] = predictions
data_dict['statistics']['prediction_count'] = len(predictions)
# 如果需要AI对话历史
if content_type in ['predictions_and_chats', 'full_account']:
chats_qs = ChatHistory.objects.filter(user=request.user)
if start_date and end_date:
chats_qs = chats_qs.filter(created_at__range=[start_date, end_date])
chats_qs = chats_qs.order_by('-created_at')
chats = list(chats_qs.values('id', 'question', 'answer', 'created_at'))
for chat in chats:
chat['created_at'] = chat['created_at'].strftime('%Y-%m-%d %H:%M:%S')
data_dict['chat_history'] = chats
data_dict['statistics']['chat_count'] = len(chats)
# 如果是完整账户数据
if content_type == 'full_account':
data_dict['user_info'] = {
'username':request.user.username,
'email':request.user.email,
'first_name':request.user.first_name,
'last_name':request.user.last_name,
'date_joined':request.user.date_joined.strftime('%Y-%m-%d %H:%M:%S'),
'is_active':request.user.is_active,
}
# 计算统计信息
if predictions:
dates = [p['created_at'] for p in predictions]
data_dict['statistics']['time_span'] = f'{min(dates)} 至 {max(dates)}'
benign_count = sum(1 for p in predictions if '良性' in p['result'])
malignant_count = sum(1 for p in predictions if '恶性' in p['result'])
data_dict['statistics']['benign_count'] = benign_count
data_dict['statistics']['malignant_count'] = malignant_count
avg_confidence = sum(p['confidence'] for p in predictions) / len(predictions)
data_dict['statistics']['avg_confidence'] = f'{avg_confidence:.2f}%'
# 根据格式生成文件
timestamp = now.strftime('%Y%m%d_%H%M%S')
if format_type == 'csv':
response = generate_csv_export(data_dict, timestamp, password_protected, password)
elif format_type == 'json':
response = generate_json_export(data_dict, timestamp, password_protected, password)
elif format_type == 'pdf':
response = generate_pdf_export_advanced(data_dict, timestamp, password_protected, password)
else:
export_task.status = 'failed'
export_task.error_message = '不支持的格式'
export_task.save()
return JsonResponse({'success':False, 'error':'不支持的导出格式'}, status=400)
# 更新导出任务状态
export_task.status = 'completed'
export_task.completed_at = now
export_task.save()
# 记录日志
log_action(request, 'EXPORT_DATA',
f'用户导出数据:格式={format_type}, 内容={content_type}, 时间范围={time_range}, '
f'记录数={len(predictions)}, 密码保护={password_protected}')
return response
except Exception as e:
# 更新任务状态为失败
try:
export_task.status = 'failed'
export_task.error_message = str(e)
export_task.save()
except:
pass
return JsonResponse({'success':False, 'error':f'导出失败:{str(e)}'}, status=500)
CSV加密导出示例:
def generate_csv_export(data_dict, timestamp, password_protected=False, password=None):
"""生成 CSV 格式导出(支持传统ZIP密码保护)"""
import csv
import io
output = io.StringIO()
writer = csv.writer(output)
# 写入统计摘要
writer.writerow(['=== 数据导出统计摘要 ==='])
writer.writerow(['用户名', data_dict['export_info']['username']])
writer.writerow(['导出时间', data_dict['export_info']['export_time']])
writer.writerow(['导出格式', 'CSV'])
writer.writerow(['内容类型', data_dict['export_info']['content_type']])
writer.writerow([])
stats = data_dict['statistics']
writer.writerow(['总预测记录数', stats.get('prediction_count', 0)])
writer.writerow(['良性记录数', stats.get('benign_count', 0)])
writer.writerow(['恶性记录数', stats.get('malignant_count', 0)])
writer.writerow(['平均置信度', stats.get('avg_confidence', 'N/A')])
writer.writerow(['时间跨度', stats.get('time_span', 'N/A')])
if 'chat_count' in stats:
writer.writerow(['AI对话记录数', stats['chat_count']])
writer.writerow([])
writer.writerow([])
# 写入预测记录
if data_dict.get('predictions'):
writer.writerow(['=== 预测记录 ==='])
writer.writerow(['序号', '预测时间', '结果', '置信度(%)'])
for idx, pred in enumerate(data_dict['predictions'], 1):
writer.writerow([
idx,
pred['created_at'],
pred['result'],
f"{pred['confidence']*100:.2f}"
])
writer.writerow([])
writer.writerow([])
# 写入AI对话历史
if data_dict.get('chat_history'):
writer.writerow(['=== AI对话历史 ==='])
writer.writerow(['序号', '时间', '问题', '回答'])
for idx, chat in enumerate(data_dict['chat_history'], 1):
# 限制问题和回答的长度
question = chat['question'][:200] if len(chat['question']) > 200 else chat['question']
answer = chat['answer'][:500] if len(chat['answer']) > 500 else chat['answer']
writer.writerow([idx, chat['created_at'], question, answer])
writer.writerow([])
csv_content = output.getvalue()
# 如果需要密码保护,创建带密码的ZIP文件
if password_protected and password:
try:
# 使用pyzipper创建带密码的ZIP(纯Python实现,无需编译)
try:
import pyzipper
# 创建内存中的ZIP文件
zip_buffer = io.BytesIO()
# 使用AES加密创建ZIP
with pyzipper.AESZipFile(
zip_buffer,
'w',
compression=pyzipper.ZIP_LZMA,
encryption=pyzipper.WZ_AES
) as zf:
zf.setpassword(password.encode('utf-8'))
zf.writestr(
f'data_export_{timestamp}.csv',
csv_content.encode('utf-8-sig')
)
zip_data = zip_buffer.getvalue()
response = HttpResponse(zip_data, content_type='application/zip')
response['Content-Disposition'] = f'attachment; filename="data_export_{timestamp}.zip"'
response['X-Password-Protected'] = 'true'
return response
except ImportError:
# 如果pyzipper未安装,尝试使用传统方法
print('警告:pyzipper未安装,尝试使用备用方案')
print('安装命令:pip install pyzipper')
raise
except Exception as e:
# 如果加密失败,返回未加密的CSV
print(f'CSV ZIP加密失败:{e}')
import traceback
traceback.print_exc()
# 未加密的CSV(或加密失败时的回退)
response = HttpResponse(csv_content, content_type='text/csv; charset=utf-8-sig')
response['Content-Disposition'] = f'attachment; filename="data_export_{timestamp}.csv"'
return response
5、消息通知系统
5.1 消息分类与危急程度
消息类型:
- 系统维护与更新
- Bug修复
- 普遍问题反馈
- 其他
危急程度:
- 无(绿色)
- 轻微(黄色)
- 严重(橙色)- 需要用户确认
- 紧急(红色)- 需要用户确认
模型定义 (System/models.py):
class Announcement(models.Model):
"""消息通知模型(原公告系统)"""
STATUS_CHOICES = [
('DRAFT', '草稿'),
('PUBLISHED', '已发布'),
('ARCHIVED', '已归档'),
]
# 消息分类
CATEGORY_CHOICES = [
('system_maintenance', '系统维护与更新'),
('bug_fix', 'Bug 修复'),
('common_feedback', '普遍问题反馈'),
('other', '其他'),
]
# 危急程度
SEVERITY_CHOICES = [
('none', '无'), # 绿色
('minor', '轻微'), # 黄色
('serious', '严重'), # 橙色
('urgent', '紧急'), # 红色
]
title = models.CharField(max_length=200, verbose_name='标题')
content = models.TextField(verbose_name='内容')
author = models.ForeignKey(User, on_delete=models.CASCADE, verbose_name='发布者', related_name='announcements')
target_user = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True,
verbose_name='目标用户', related_name='target_announcements',
help_text='留空表示对所有用户可见(公告),指定用户则仅该用户可见')
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='DRAFT', verbose_name='状态')
category = models.CharField(max_length=30, choices=CATEGORY_CHOICES, default='other', verbose_name='消息分类')
severity = models.CharField(max_length=20, choices=SEVERITY_CHOICES, default='none', verbose_name='危急程度')
is_pinned = models.BooleanField(default=False, verbose_name='是否置顶')
requires_confirmation = models.BooleanField(default=False, verbose_name='是否需要用户确认')
created_at = models.DateTimeField(auto_now_add=True, verbose_name='创建时间')
updated_at = models.DateTimeField(auto_now=True, verbose_name='更新时间')
published_at = models.DateTimeField(null=True, blank=True, verbose_name='发布时间')
class Meta:
verbose_name = '消息'
verbose_name_plural = '消息'
ordering = ['-is_pinned', '-published_at']
def __str__(self):
return f'{self.title} - {self.get_status_display()}'
def get_severity_color(self):
"""获取危急程度对应的颜色"""
color_map = {
'none':'#5cb85c', # 绿色
'minor':'#f0ad4e', # 黄色
'serious':'#ff9800', # 橙色
'urgent':'#d9534f', # 红色
}
return color_map.get(self.severity, '#5cb85c')
def needs_user_confirmation(self):
"""判断是否需要用户确认(严重及以上级别)"""
return self.severity in ['serious', 'urgent']
class AnnouncementRead(models.Model):
"""消息已读/已确认记录模型"""
user = models.ForeignKey(User, on_delete=models.CASCADE, verbose_name='用户', related_name='read_announcements')
announcement = models.ForeignKey(Announcement, on_delete=models.CASCADE, verbose_name='消息', related_name='read_by_users')
read_at = models.DateTimeField(auto_now_add=True, verbose_name='阅读时间')
confirmed = models.BooleanField(default=False, verbose_name='是否已确认')
confirmed_at = models.DateTimeField(null=True, blank=True, verbose_name='确认时间')
class Meta:
verbose_name = '消息已读记录'
verbose_name_plural = '消息已读记录'
unique_together = ('user', 'announcement') # 确保每个用户对每条消息只有一条记录
ordering = ['-read_at']
def __str__(self):
status = '已确认' if self.confirmed else '已读未确认'
return f'{self.user.username} {status} {self.announcement.title}'
5.2 系统自动消息
当用户执行敏感操作时(修改密码、修改邮箱等),系统会自动发送安全提醒消息:
def create_system_message(user, title, content, severity='serious', category='other'):
"""
创建系统自动消息
:param user:接收消息的用户
:param title:消息标题
:param content:消息内容
:param severity:危急程度(none/minor/serious/urgent)
:param category:消息分类
"""
from System.models import Announcement
from django.contrib.auth.models import User
# 获取或创建 System 用户作为发布者
system_user, created = User.objects.get_or_create(
username='System',
defaults={
'email':'system@breast-cancer-prediction.com',
'is_active':False, # 系统账户不活跃,不能登录
'is_staff':False,
}
)
if created:
print('已创建 System 用户账户')
# 严重和紧急级别自动设置需要确认
requires_confirmation = severity in ['serious', 'urgent']
message = Announcement.objects.create(
title=title,
content=content,
author=system_user,
target_user=user, # 关键修改:指定目标用户,只有该用户可见
status='PUBLISHED',
category=category,
severity=severity,
is_pinned=False,
requires_confirmation=requires_confirmation,
published_at=timezone.now()
)
print(f'[系统消息] 已为用户 {user.username} 创建消息:{title}')
return message
6、管理员后台系统
6.1 管理员功能清单
- 用户管理:查看、搜索、禁用/启用用户
- 日志管理:查看所有系统日志,支持筛选
- 日志导出:导出加密PDF格式的日志报告
- 消息管理:创建、编辑、删除消息通知
- 每日一言管理:管理首页显示的名言
- 数据大屏:可视化展示系统运行数据
- 健康监控:监控系统资源使用情况
6.2 用户管理实现
@login_required
def admin_toggle_user_status(request, user_id):
"""禁用/启用用户账号(需要密码二次确认)"""
# 检查是否为管理员
if not request.user.is_staff:
messages.error(request, '您没有权限执行此操作')
return redirect('index')
if request.method == 'POST':
# POST请求:处理禁用/启用操作
try:
import json
data = json.loads(request.body)
admin_password = data.get('password', '')
disable_reason = data.get('reason', '').strip()
# 验证管理员密码
authenticated_user = authenticate(request, username=request.user.username, password=admin_password)
if authenticated_user is None:
return JsonResponse({'success':False, 'error':'密码错误,请重新输入'}, status=403)
# 获取目标用户
user = User.objects.get(id=user_id)
if user.is_staff:
return JsonResponse({'success':False, 'error':'不能修改其他管理员的状态'}, status=403)
if user.username == 'System':
return JsonResponse({'success':False, 'error':'不能操作系统账户'}, status=403)
# 如果是禁用操作,必须有原因
if user.is_active and not disable_reason:
return JsonResponse({'success':False, 'error':'禁用账户时必须填写原因'}, status=400)
# 切换用户状态
user.is_active = not user.is_active
user.save()
# 更新UserProfile中的禁用信息
profile = user.profile
if not user.is_active:
# 禁用账户
profile.disable_reason = disable_reason
profile.disabled_at = timezone.now()
profile.disabled_by = request.user
else:
# 启用账户,清空禁用信息
profile.disable_reason = None
profile.disabled_at = None
profile.disabled_by = None
profile.save()
status = '启用' if user.is_active else '禁用'
log_action(request, 'TOGGLE_USER_STATUS', f'管理员{status}用户账号:{user.username},原因:{disable_reason if disable_reason else "无"}', log_type='WARNING')
return JsonResponse({
'success':True,
'message':f'已{status}用户 {user.username} 的账号',
'new_status':user.is_active
})
except json.JSONDecodeError:
return JsonResponse({'success':False, 'error':'请求数据格式错误'}, status=400)
except User.DoesNotExist:
return JsonResponse({'success':False, 'error':'用户不存在'}, status=404)
except Exception as e:
return JsonResponse({'success':False, 'error':f'操作失败:{str(e)}'}, status=500)
# GET请求:返回当前用户状态
try:
user = User.objects.get(id=user_id)
return JsonResponse({
'user_id':user.id,
'username':user.username,
'is_active':user.is_active,
'is_staff':user.is_staff
})
except User.DoesNotExist:
return JsonResponse({'error':'用户不存在'}, status=404)
6.3 系统健康监控
@login_required
def health_monitor(request):
"""系统健康监控面板(仅管理员)"""
import psutil
import os
from System.models import PredictionRecord, ChatHistory, SystemLog, Announcement
# 检查是否为管理员
if not request.user.is_staff:
messages.error(request, '您没有权限访问此页面')
return redirect('index')
# 获取系统信息
try:
# CPU 使用率
cpu_percent = psutil.cpu_percent(interval=1)
# 内存使用率
memory = psutil.virtual_memory()
memory_percent = memory.percent
memory_used = memory.used / (1024 ** 3) # GB
memory_total = memory.total / (1024 ** 3) # GB
# 磁盘使用率
disk = psutil.disk_usage('/')
disk_percent = disk.percent
disk_used = disk.used / (1024 ** 3) # GB
disk_total = disk.total / (1024 ** 3) # GB
except:
# Windows 系统可能需要不同的路径
try:
cpu_percent = psutil.cpu_percent(interval=1)
memory = psutil.virtual_memory()
memory_percent = memory.percent
memory_used = memory.used / (1024 ** 3)
memory_total = memory.total / (1024 ** 3)
disk = psutil.disk_usage('C:\\')
disk_percent = disk.percent
disk_used = disk.used / (1024 ** 3)
disk_total = disk.total / (1024 ** 3)
except:
cpu_percent = 0
memory_percent = 0
memory_used = 0
memory_total = 0
disk_percent = 0
disk_used = 0
disk_total = 0
# 数据库统计
# 只统计普通用户(排除管理员和系统账户)
total_users = User.objects.filter(is_staff=False).exclude(username='System').count()
active_users = User.objects.filter(is_active=True, is_staff=False).exclude(username='System').count()
total_predictions = PredictionRecord.objects.count()
total_ai_chats = ChatHistory.objects.count()
total_logs = SystemLog.objects.count()
error_logs = SystemLog.objects.filter(log_type='ERROR').count()
warning_logs = SystemLog.objects.filter(log_type='WARNING').count()
total_announcements = Announcement.objects.count()
published_announcements = Announcement.objects.filter(status='PUBLISHED').count()
# 在线用户数(活跃 session)
from django.contrib.sessions.models import Session
online_users = Session.objects.filter(expire_date__gte=timezone.now()).count()
# 模型状态
model_status = "已加载" if _model_initialized else "未加载"
context = {
'cpu_percent':cpu_percent,
'memory_percent':memory_percent,
'memory_used':round(memory_used, 2),
'memory_total':round(memory_total, 2),
'disk_percent':disk_percent,
'disk_used':round(disk_used, 2),
'disk_total':round(disk_total, 2),
'total_users':total_users,
'active_users':active_users,
'online_users':online_users,
'total_predictions':total_predictions,
'total_ai_chats':total_ai_chats,
'total_logs':total_logs,
'error_logs':error_logs,
'warning_logs':warning_logs,
'total_announcements':total_announcements,
'published_announcements':published_announcements,
'model_status':model_status,
}
log_action(request, 'VIEW_HEALTH_MONITOR', '管理员查看系统健康监控')
return render(request, 'health_monitor.html', context)
二、前后端接口交互
URL路由配置 (Breast_Cancer_Prediction_System/urls.py)
from django.contrib import admin
from django.urls import path
from django.conf import settings
from django.conf.urls.static import static
from System import views
urlpatterns = [
# 管理员功能(必须放在 admin/ 之前)
path('admin-login/', views.admin_login, name='admin_login'),
path('admin-dashboard/', views.admin_dashboard, name='admin_dashboard'),
path('myadmin/users/', views.admin_users, name='admin_users'),
path('myadmin/users/toggle/<int:user_id>/', views.admin_toggle_user_status, name='admin_toggle_user'),
path('myadmin/logs/', views.admin_logs, name='admin_logs'),
path('myadmin/logs-export/', views.admin_logs_export_page, name='admin_logs_export_page'),
path('myadmin/api/logs-export/', views.admin_logs_export_api, name='admin_logs_export_api'),
path('myadmin/log-export-history/', views.admin_log_export_history, name='admin_log_export_history'),
path('dashboard/', views.dashboard, name='dashboard'),
path('admin-change-password/', views.admin_change_password, name='admin_change_password'),
path('health-monitor/', views.health_monitor, name='health_monitor'),
# Django 自带 admin
path('admin/', admin.site.urls),
# 其他路由
path('', views.index, name='index'),
path('predict/', views.predict_cancer, name='predict'),
path('predict', views.predict_cancer, name='predict_no_slash'), # 支持不带斜杠的 URL
path('register/', views.register, name='register'),
path('login/', views.user_login, name='login'),
path('logout/', views.user_logout, name='logout'),
path('delete-account/', views.delete_account, name='delete_account'),
path('delete-account-confirm/', views.delete_account_confirm, name='delete_account_confirm'),
path('change-password/', views.change_password, name='change_password'),
path('ask-ai/', views.ask_ai, name='ask_ai'),
path('ask-ai-stream/', views.ask_ai_stream, name='ask_ai_stream'), # AI 流式问答 API
path('chat-history/', views.chat_history, name='chat_history'),
# 验证码生成
path('captcha/', views.generate_captcha, name='generate_captcha'),
# 找回密码
path('reset-password/', views.reset_password_request, name='reset_password_request'),
path('reset-password/verify/', views.reset_password_verify, name='reset_password_verify'),
path('reset-password/confirm/', views.reset_password_confirm, name='reset_password_confirm'),
# 用户个人中心与预测历史
path('profile/', views.profile, name='profile'),
path('account-management/', views.account_management, name='account_management'),
path('prediction-history/', views.prediction_history, name='prediction_history'),
path('prediction-history/delete/<int:record_id>/', views.delete_prediction_record, name='delete_prediction_record'),
path('export-report/', views.export_report, name='export_report_all'),
path('export-report/<int:record_id>/', views.export_report, name='export_report'),
# 公告系统
path('announcements/', views.announcement_list, name='announcement_list'),
path('announcements/create/', views.announcement_create, name='announcement_create'),
path('announcements/edit/<int:announcement_id>/', views.announcement_edit, name='announcement_edit'),
path('announcements/delete/<int:announcement_id>/', views.announcement_delete, name='announcement_delete'),
path('api/confirm-message/<int:message_id>/', views.confirm_message, name='confirm_message'),
# 每日一言管理(管理员)
path('admin-quotes/', views.admin_quotes, name='admin_quotes'),
path('admin-quotes/toggle/<int:quote_id>/', views.admin_toggle_quote_status, name='admin_toggle_quote_status'),
path('admin-quotes/delete/<int:quote_id>/', views.admin_delete_quote, name='admin_delete_quote'),
# 软删除与撤销
path('prediction-history/soft-delete/<int:record_id>/', views.soft_delete_prediction, name='soft_delete_prediction'),
path('prediction-history/undo-delete/<int:record_id>/', views.undo_delete_prediction, name='undo_delete_prediction'),
# 高级数据导出
path('export-data/', views.export_data_page, name='export_data_page'),
path('api/export-data/', views.export_data_api, name='export_data_api'),
]
# 开发环境下提供媒体文件服务
if settings.DEBUG:
urlpatterns += static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT)
三、前端页面详细说明
1、首页 (home.html)
核心功能:
- 显示紧急消息确认(需要用户确认的严重/紧急消息置顶)
- 最新公告提示栏(最多3条未读公告)
- 每日一言展示(渐变色卡片)
- 乳腺癌知识科普(Tab切换:什么是乳腺癌/谁会得/症状)
- 预测表单(30个输入字段,仅登录用户可见)
智能辅助功能:
// 实时验证与异常值检测
$('.field-input').on('input', function() {
const value = parseFloat($(this).val());
const config = fieldConfig[fieldId];
// 检查是否在正常范围内
if (value < config.min || value > config.max) {
$(this).css('border-color', '#dc3545'); // 红色边框
showWarning($(this), `⚠️ 警告:该值超出正常范围 (${config.min} - ${config.max})`);
} else {
$(this).css('border-color', '#28a745'); // 绿色边框
hideWarning($(this));
}
// 自动保存草稿到 localStorage
saveDraft();
});
// 填充示例数据按钮
$('#fillExampleBtn').click(function() {
// 填充良性病例的示例数据
for (let i = 1; i <= 30; i++) {
$('#value' + i).val(exampleData['value' + i]);
}
});
// 每30秒自动保存草稿
setInterval(saveDraft, 30000);
// 页面加载时恢复草稿
$(document).ready(function() {
loadDraft();
});
用户体验优化:
- 悬停显示字段医学解释(Bootstrap Tooltip)
- 输入框实时颜色反馈(绿色=正常,红色=异常)
- 自动保存草稿(LocalStorage)
- 一键填充示例数据
- 一键清空表单
- 返回顶部按钮(滚动超过300px显示)
2、登录页面 (login.html)
核心功能:
- 用户名/邮箱双模式登录
- 图形验证码(点击刷新)
- "记住我"功能(14天免登录)
- 忘记密码链接
- 注册账号入口
验证码刷新逻辑:
function refreshCaptcha() {
var img = document.getElementById('captcha_image');
img.src = '{% url "generate_captcha" %}?t=' + new Date().getTime();
}
样式特点:
- 简洁的居中卡片布局
- 响应式设计
- 错误提示高亮显示
3、预测结果页 (result.html)
核心功能:
- 超大徽章显示预测结果(良性/恶性)
- 置信度与预测用时展示
- 医疗免责声明
- 返回首页重新预测按钮
- 乳腺肿瘤相关知识卡片
动画效果:
/* 结果徽章脉冲动画 */
@keyframes pulse {
0%, 100% { transform:scale(1); }
50% { transform:scale(1.08); }
}
/* 发光效果 */
@keyframes glow {
from { box-shadow:0 10px 30px rgba(0,0,0,0.3), 0 0 20px rgba(255,255,255,0.5); }
to { box-shadow:0 10px 40px rgba(0,0,0,0.4), 0 0 40px rgba(255,255,255,0.8); }
}
.diagnosis-badge {
animation:pulse 2s infinite, glow 3s ease-in-out infinite alternate;
}
/* 良性:绿色渐变 */
.benign {
background:linear-gradient(135deg, #56ab2f 0%, #a8e063 100%);
}
/* 恶性:红色渐变 */
.malignant {
background:linear-gradient(135deg, #ff416c 0%, #ff4b2b 100%);
}
视觉设计:
- 紫色渐变背景(#667eea → #764ba2)
- 卡片滑入动画(slideIn)
- 统计卡片悬停上浮效果
- 响应式布局
4、AI问答页面 (ai_ask.html)
核心功能:
- 流式输出AI回答(逐字显示)
- Markdown渲染(支持代码块、列表等)
- 对话历史自动保存
- 上下文关联(最近10轮对话)
- 打字指示器动画
流式接收实现:
// 使用 fetch API 处理流式响应
fetch('{% url "ask_ai_stream" %}', {
method:'POST',
headers:{
'Content-Type':'application/x-www-form-urlencoded',
'X-Requested-With':'XMLHttpRequest'
},
body:new URLSearchParams({
'question':question,
'csrfmiddlewaretoken':$('input[name="csrfmiddlewaretoken"]').val()
})
})
.then(response => {
const reader = response.body.getReader();
const decoder = new TextDecoder('utf-8');
let buffer = '';
function read() {
return reader.read().then(({ done, value }) => {
if (done) {
typingIndicator.hide();
askBtn.prop('disabled', false);
questionInput.prop('disabled', false);
return;
}
// 解码二进制数据
const chunk = decoder.decode(value, { stream:true });
buffer += chunk;
// 按行分割处理JSON
const lines = buffer.split('\n');
buffer = lines.pop() || '';
lines.forEach(line => {
try {
const jsonData = JSON.parse(line);
if (!jsonData.done) {
fullAnswer += jsonData.content;
// 实时渲染Markdown
aiMessageContent.html(marked.parse(fullAnswer));
chatBox.scrollTop(chatBox[0].scrollHeight);
}
} catch (err) {
console.error('JSON解析错误:', err);
}
});
return read();
});
}
return read();
});
Markdown渲染:
<!-- 引入marked.js库 -->
<script src="https://cdn.staticfile.org/marked/4.3.0/marked.min.js"></script>
// 渲染AI回答
renderedContent = marked.parse(content);
UI特性:
- 用户消息:左侧蓝色边框
- AI消息:右侧青色边框
- 淡入动画(fadeIn)
- 打字指示器(三个闪烁的点)
- 自动滚动到底部
四、数据模型设计
核心模型概览
- UserProfile:用户扩展信息(头像、禁用信息等)
- ChatHistory:AI对话历史
- PredictionRecord:预测记录(支持软删除)
- SystemLog:系统日志(记录所有操作)
- SecurityQuestion:安全问题(找回密码用)
- Announcement:消息通知(支持分类、危急程度)
- AnnouncementRead:消息已读/确认记录
- DailyQuote:每日一言
- ExportTask:数据导出任务
- LogExportRecord:管理员日志导出记录
五、安全防护体系
1、多层安全防护
中间件配置 (settings.py):
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'System.security_middleware.SecurityMiddleware', # 安全防护中间件(SQL注入、XSS等)
'System.rate_limit_middleware.RateLimitMiddleware', # 速率限制中间件(防暴力破解、DoS)
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
'System.middleware.LanguageMiddleware', # 自定义语言中间件
]
2、防护措施
- SQL注入防护:使用Django ORM,自动转义
- XSS防护:模板自动转义 + 安全中间件
- CSRF防护:Django内置CSRF Token
- 暴力破解防护:IP锁定机制(5次警告,10次锁定30分钟)
- 速率限制:防止DoS攻击
- 密码强度验证:强制要求复杂密码
- 图形验证码:防止机器人注册/登录
- Session安全:HttpOnly Cookie,14天过期
六、技术栈总结
后端技术
- 框架:Django 5.0+
- 数据库:MySQL (mysqlclient)
- 机器学习:scikit-learn (Random Forest)
- 数据处理:pandas, numpy
- AI集成:DeepSeek API
- PDF生成:reportlab, PyPDF2
- 加密:cryptography, pyzipper
- 系统监控:psutil
前端技术
- 模板引擎:Django Templates
- 样式:Bootstrap
- 图表:Chart.js (用于数据大屏)
开发环境
- 时区:Asia/Shanghai (北京时间)
- 语言:zh-hans (简体中文)
- 缓存:LocMemCache (本地内存缓存)
七、系统特色亮点
- 智能化预测:基于随机森林算法,准确率高
- AI问答:集成DeepSeek大模型,支持上下文对话
- 安全防护:多层防护体系,企业级安全标准
- 数据导出:支持多格式、密码保护、时间筛选
- 消息系统:分级通知,重要消息需用户确认
- 软删除:数据可恢复,避免误删损失
- 健康监控:实时监控系统资源使用情况
- 日志审计:完整记录所有操作,支持导出审计
- 用户体验:流式输出、记住我、图形验证码
- 可扩展性:模块化设计,易于功能扩展
八、Linux部署建议
1、前言
- 本项目作者已经成功部署在openEuler24.03 Linux环境中,并成功运行
- 本系统在上传前删除了作者本人的deepseek api,请自行前往https://platform.deepseek.com/usage网址充值并获取deepseek api,将该api复制进入settings.py文件的DEEPSEEK_API_KEY环境变量。
- 为确保本系统能够在Linux中正常运行,请通过以下命令下载字体库
sudo dnf install wqy-zenhei-fonts wqy-microhei-fonts
2、部署方式
第一步:系统更新和基础准备
# 1. 更新系统包
sudo dnf update -y
# 2. 安装基础工具(如果尚未安装)
sudo dnf install -y wget vim git
第二步:安装 Python 3
# 1. 安装 Python 3 和 pip
sudo dnf install -y python3 python3-pip python3-devel
# 2. 验证安装
python3 --version
第三步:安装 MySQL 数据库
# 1. 安装 MySQL 服务器和客户端
sudo dnf install -y mysql-server mysql-devel
# 2. 启动 MySQL 服务
sudo systemctl start mysqld
# 3. 设置开机自启
sudo systemctl enable mysqld
# 4. 查看 MySQL 状态
sudo systemctl status mysqld
# 5. 初始化 MySQL(设置 root 密码等)
sudo mysql_secure_installation
执行 mysql_secure_installation 时的交互说明:
- 是否安装密码验证插件:N(本地使用可选)
- 设置 root 密码:输入你的密码(记住这个密码)
- 移除匿名用户:Y
- 禁止 root 远程登录:Y
- 移除测试数据库:Y
- 重新加载权限表:Y
继续在 MySQL 命令行中执行:
6. 进入数据库
mysql -u root -p
7. 创建数据库
CREATE DATABASE breast_cancer_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
8. 退出 MySQL
EXIT;
第四步:上传项目文件到虚拟机(使用 Git)
# 在虚拟机上执行
git clone <你的仓库地址>
第五步:安装 Python 依赖
# 1. 进入项目目录
cd ./Breast_Cancer_Prediction_System
# 2. 安装虚拟环境
python3 -m venv venv
# 3. 运行虚拟环境
source venv/bin/activate
# 4. 安装依赖包(必须启动虚拟环境)
pip3 install -r requirements.txt
第六步:配置数据库连接
vim Breast_Cancer_Prediction_System/settings.py
第七步:数据库迁移
# 1. 进入项目目录
cd /Breast_Cancer_Prediction_System
# 2. 执行数据库迁移
python3 manage.py makemigrations
python3 manage.py migrate
# 3. 收集静态文件
python3 manage.py collectstatic --noinput
第八步:创建管理员账户
python3 create_admin_user.py
第九步:配置防火墙(如果需要外部访问)
# 1. 开放 8000 端口
sudo firewall-cmd --permanent --add-port=8000/tcp
# 2. 重载防火墙配置
sudo firewall-cmd --reload
# 3. 查看已开放的端口
sudo firewall-cmd --list-ports
第十步:启动 Django 服务
python3 manage.py runserver 0.0.0.0:8000
3、连接Linux
- 输入Linux的<IP地址>:8000即可启动项目
- 作者本人使用的是VMware Workstation Pro 17,将openEuler Linux安装进入虚拟机,随后通过局域网IP访问
- 上述Linux部署步骤是在全新的openEuler虚拟机上部署
九、项目效果图
由于页面过多,无法一一展示,仅展示部分功能的效果页面
1、管理员页面
(1)系统健康监控面板

(2)系统健康监控面板

(3)用户管理页面

(4)系统日志查看

(5)消息中心

2、用户页面
(1)主页+乳腺癌预测表单页面

(2)预测结果页面

(3)AI问答页面
- 成功案例页面(与乳腺癌相关的问题)

- 失败案例页面(与乳腺癌无关的问题)

(4)对话历史页面

(5)预测历史页面

(6)消息页面

(7)账号管理页面

(8)修改密码页面

(9)注销页面

3、登录与注册页面
- 登录页面

- 注册页面

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)