劳动力招聘管理系统:全栈实战(Vue3+FastAPI+WebSocket+Dify)
摘要
本文详细介绍了一套劳动力招聘管理系统的完整设计与实现。系统面向求职者、招聘方(VIP)和管理员,提供招聘信息发布审核、实时消息聊天、AI智能审核与问答等核心功能。技术栈采用 Vue3 + Pinia 前端、FastAPI + FastAPIAdmin 后端、MySQL + Redis 存储,并集成 Dify 工作流实现双AI智能体协同。文章将重点分享架构设计、关键业务逻辑、性能优化以及实际开发中遇到的典型问题与解决方案。
1. 项目背景与核心痛点
1.1 项目定位
劳动力招聘管理系统是一套面向临时用工 / 零工市场的在线招聘平台,连接求职者(普通客户)、用工方(VIP客户)与平台运营方(系统管理员)三方用户。系统集招聘信息发布、智能审核、实时沟通、AI辅助服务于一体,致力于解决传统零工招聘中信息不透明、审核效率低、沟通不及时等问题。
一句话:让求职者找活快、用工方招人省心、管理员审核智能化的全流程零工招聘平台。
1.2 解决的四个核心痛点
在当前的零工经济(兼职、临时工、小时工等)场景中,普遍存在以下问题:
| 痛点 | 具体表现 |
|---|---|
| 信息审核难 | 大量招聘信息需要人工逐条审核,耗时长、易遗漏,虚假或违规信息容易混入 |
| 沟通效率低 | 求职者与招聘方之间缺乏即时沟通工具,消息延迟,影响匹配成功率 |
| 缺乏智能化辅助 | 审核与客服工作依赖人力,没有AI辅助,运营成本高 |
| 状态管理混乱 | 招聘信息从发布、审核、上架到下架的状态流转不清晰,用户体验差 |
本项目针对以上痛点,通过 自动化状态流转 + WebSocket实时通信 + Dify AI工作流,构建了一个高效、智能、可扩展的招聘管理系统。
1.3 三方用户与核心需求
系统面向三类用户,各自的核心需求如下:
- 普通客户(求职者):快速浏览已审核岗位、查看详情、与招聘方在线聊天、管理个人信息、申请VIP。
- VIP客户(用工方):发布招聘信息(需审核)、查看自己发布的信息状态、管理求职咨询会话、申请下架。
- 系统管理员:审核招聘信息、审核VIP升级申请、管理用户、查看AI辅助建议、配置系统参数。
1.4 典型业务场景预览
为帮助理解后续的技术设计,这里简要描述两个核心场景:
场景一:VIP发布招聘并等待审核
VIP填写表单提交后,信息状态为pending;管理员在后台看到待审批列表,进入详情页时自动触发AI审核分析(合规评分、敏感词高亮、完备度检查);管理员结合AI建议做出通过(approved)或拒绝(revoked)决策。
场景二:求职者咨询岗位,AI智能补位
求职者点击“立即咨询”,系统创建会话,AI发送欢迎语。用户发消息时,后端判断VIP在线状态:若在线,AI生成“建议回复”供VIP一键采纳;若离线,AI直接代答并标记“AI助手代答”。所有消息通过WebSocket实时推送。
2. 技术选型与系统架构
2.1 技术栈全景
| 层面 | 技术选型 | 版本 | 核心用途 |
|---|---|---|---|
| 前端框架 | Vue 3 + Composition API | 3.4+ | 响应式UI、组件化开发 |
| 状态管理 | Pinia | 最新 | 全局状态(用户信息、未读消息) |
| UI组件库 | Element Plus | 2.x | 管理端表格、表单、弹窗等 |
| HTTP客户端 | Axios | 最新 | 统一拦截、Token注入、请求重试 |
| 后端框架 | FastAPI | 最新 | 异步API、自动文档、依赖注入 |
| ORM | SQLAlchemy 2.0 | 2.0+ | 异步数据库操作、模型定义 |
| 数据库 | MySQL 8.0 | 8.0+ | 业务数据持久化 |
| 缓存与状态 | Redis 6+ | 6+ | 在线状态(TTL)、JWT存储、限流 |
| 认证 | JWT (HS256) | - | 无状态登录、Token刷新 |
| AI集成 | Dify 工作流 | 最新 | 智能审核、智能问答 |
| 实时通讯 | WebSocket | FastAPI原生 | 双向消息推送、心跳维持 |
| 部署 | Nginx + Gunicorn + Uvicorn | 最新 | 反向代理、进程管理、异步Worker |
2.2 系统架构设计
采用 前后端分离 的集成式单体架构,前端分为用户端和管理端(共用技术栈),后端按业务领域模块化组织。
模块职责:
- 认证模块:JWT生成/验证、Token刷新、登出(删除Redis key)
- 招聘模块:信息发布、审核(通过/拒绝/撤销)、列表查询(区分公开/待审)
- 消息模块:WebSocket连接管理、消息存储、在线状态同步、AI建议/代答
- AI模块:封装Dify API调用,提供审核分析和对话问答接口
- VIP模块:VIP升级申请、审批、角色变更
2.3 关键技术实现要点
- 异步数据库操作:SQLAlchemy 2.0 AsyncSession,避免IO阻塞,支持高并发。
- Redis在线状态:
chat:online:{user_id}存储"1",TTL 300秒;心跳/发消息时续期;异常断开后自动清理。 - JWT无状态认证:Token 中编码
user_id和session_id,服务端通过 Redis 存储有效Token,可实现主动撤销(删除key即可踢人)。 - WebSocket连接管理:内存字典
active_connections存储当前实例的WebSocket对象,用于直接推送;Redis 记录在线标志,为多实例扩展预留(值改为实例ID+RPC)。 - Dify集成:审核工作流强制返回纯JSON;对话Chatflow支持
conversation_id多轮记忆;超时控制(30秒)和重试机制(2次)。
3. 数据库与缓存设计
本节通过多张图表展示核心数据模型、表关系、Redis 缓存策略以及关键操作流程。
3.1 核心实体关系图(ER 图)
核心表说明:
sys_user:用户表,通过role字段区分普通客户、VIP、管理员。job_post:招聘信息表,status控制审核流(pending→approved/revoked)。chat_session:会话表,关联求职者(user_id)、招聘方(vip_id)和招聘岗位(job_id),同时存储 Dify 的conversation_id以实现多轮对话记忆。chat_message:消息表,sender_role标记发送方类型(普通用户/VIP/AI),is_read用于未读计数。vip_application:VIP 升级申请表,存储缴费凭证和审核状态。
3.2 索引设计(加速查询)
| 表名 | 索引字段 | 类型 | 用途 |
|---|---|---|---|
sys_user |
phone |
唯一 | 登录、注册快速查找 |
sys_user |
role |
普通 | 管理员按角色筛选用户 |
job_post |
user_id |
普通 | VIP 查询“我的招聘” |
job_post |
status, created_at |
复合 | 待审批列表按时间排序 |
job_post |
status, location |
复合 | 招聘广场筛选城市 |
chat_session |
user_id, vip_id, job_id |
唯一复合 | 防止重复创建会话 |
chat_message |
session_id, send_time |
复合 | 分页加载聊天历史 |
chat_message |
session_id, is_read |
复合 | 未读消息统计 |
3.3 Redis 缓存结构设计
Redis 在本系统中主要存储三类数据:在线状态、JWT Token、限流计数器。
| Key 格式 | 值示例 | TTL | 操作时机 |
|---|---|---|---|
chat:online:{user_id} |
"1" |
300秒 | WebSocket 连接时 setex;发消息/心跳时 expire 续期;断开时 del |
access_token:{session_id} |
{"user_id":1,"name":"张三","role":"vip"} |
7200秒(2小时) | 登录成功后存储;每次请求验证是否存在;登出时删除 |
refresh_token:{session_id} |
"eyJhbGc..." |
604800秒(7天) | 登录时存储;刷新 access_token 时校验;登出时删除 |
rate_limit:login:{phone} |
3(剩余尝试次数) |
60秒 | 登录失败时 incr,超过5次则临时锁定 |
在线状态管理流程图(WebSocket 连接与状态同步):
3.4 缓存操作代码示例(核心逻辑)
# 连接时写入 Redis
await redis.setex(f"chat:online:{user_id}", 300, "1")
# 心跳/发消息时续期
await redis.expire(f"chat:online:{user_id}", 300)
# 断开时删除
await redis.delete(f"chat:online:{user_id}")
# JWT 登录存储
session_id = str(uuid4())
await redis.setex(f"access_token:{session_id}", 7200, json.dumps(user_info))
await redis.setex(f"refresh_token:{session_id}", 604800, refresh_token)
# 验证 Token 是否存在
if not await redis.exists(f"access_token:{session_id}"):
raise HTTPException(401, "认证已失效")
# 登出时清除
await redis.delete(f"access_token:{session_id}", f"refresh_token:{session_id}")
3.5 为什么选择这两种存储
- MySQL:保证招聘信息、用户、消息等数据的 持久化与事务一致性(例如 VIP 升级同时修改用户角色和申请状态)。
- Redis:利用 内存级速度 和 TTL 自动过期,高效管理在线状态和临时 Token,且为未来多实例部署预留分布式能力。
4. 三个 AI 智能体设计
系统基于 Dify 平台构建了三个协同工作的 AI 智能体,分别服务于求职咨询(AI 求职小助手)、招聘信息自动审核(AI 审核分析引擎)和审核员深度对话(AI 审核对话助手)。三个智能体共享 Dify 运行时,但工作流类型、输入输出和应用场景各不相同。以下分别介绍每个智能体的核心工作流程。
4.1 AI 求职小助手(Chatflow)
定位:嵌入用户端消息中心,为求职者提供 7×24 小时自动欢迎、岗位咨询、多轮对话及 VIP 离线代答服务。
特点:
- 首次会话生成包含职位信息的欢迎语。
- 后续消息基于对话历史与职位上下文智能回复。
- 根据 VIP 在线状态动态切换输出模式:
建议回复(推给 VIP)或代答回复(直接推给求职者)。
流程图:
技术要点:后端在 chat_session 表中保存 dify_conversation_id,保证多轮对话记忆。VIP 在线状态通过 WebSocket 连接管理器实时获取。
4.2 AI 审核分析引擎(Workflow)
定位:部署于后台审批详情页,管理员打开招聘信息时自动触发,一次性输出合规评分、敏感词、内容完备度及审核结论。
特点:
- 代码节点先做敏感词快速匹配和初评分,高风险直接输出拒绝 JSON。
- 中低风险时调用 LLM 进行深度语义分析和完备度检查。
- 强制输出纯 JSON 格式,供前端直接渲染。
流程图:
技术要点:敏感词库覆盖性别/年龄歧视、非法收费、薪资模糊等类型;提示词严格约束输出格式,后端增加重试解析机制。
4.3 AI 审核对话助手(Chatflow)
定位:部署于审批详情页的可折叠对话框中,管理员可主动与 AI 进行多轮追问、批量审核或获取修改建议。
特点:
- 意图识别分流:审核请求 / 闲聊 / 告别。
- 支持单条与批量审核两种模式。
- 批量审核时通过迭代节点保留原文对照,汇总输出。
- 结构化回答模板(审核结果 → 风险项 → 完备度 → 建议)。
流程图:
技术要点:迭代节点要求每次 LLM 输出时在开头标注原文,汇总节点原样拼接,确保管理员能对应到原始招聘信息。会话记忆通过独立的 conversation_id 维护。
三个智能体均通过后端封装统一的 Dify API 调用服务,前端根据场景(用户端聊天、管理端审核)调用不同接口。这种设计使 AI 能力与业务逻辑解耦,便于后续升级模型或调整工作流。
5. 核心业务一:招聘发布与审核
招聘信息的发布与审核是系统最核心的业务流程。VIP 用户(用工方)发布招聘信息后,必须经过平台管理员的审核才能在前台“招聘广场”展示,以此保证信息的真实性与合规性。同时,为了提升审核效率与准确率,系统在管理员审核页面集成了两个 AI 智能体:一个用于自动合规评分与风险检测,另一个用于交互式问答。本节将分三个部分详细讲解整个模块的设计与实现,这是第一部分,涵盖数据模型、状态流转、VIP 发布与管理以及管理员审核的基础流程。
5.1 数据模型设计
招聘信息对应的数据表为 job_post,其核心字段定义如下(基于 SQLAlchemy 模型):
# backend/app/plugin/module_recruitment/recruitment/model.py
class JobPostModel(ModelMixin, UserMixin):
__tablename__: str = 'job_post'
vip_user_id: Mapped[int] # 发布者 VIP 用户 ID
title: Mapped[str] # 职位名称(1~100 字符)
approve_status: Mapped[str] = mapped_column(
String(20),
nullable=False,
comment='审核状态(pending:待审批, approved:已通过, revoked:已撤销)'
)
reject_reason: Mapped[str | None] # 拒绝 / 撤销原因
reviewer_id: Mapped[int | None] # 审批人 ID
reviewed_at: Mapped[datetime | None] # 审批时间
# 其他业务字段...
状态枚举:
| 状态值 | 含义 | 可见范围 |
|---|---|---|
pending |
待审批 | 仅管理员和发布者(VIP)可见 |
approved |
已通过 | 所有用户(包括未登录)可见 |
revoked |
已撤销 | 仅管理员和发布者可见 |
设计要点:
- 使用
approve_status独立字段管理审核状态,与逻辑删除字段is_deleted分离。 - 保留
reject_reason用于记录拒绝原因,帮助发布者了解问题并修改后重新申请。 reviewer_id和reviewed_at可追溯审批操作者与时间。
5.2 状态流转机制
招聘信息从创建到最终上架或下架,经历清晰的状态变化,如下图所示:
核心状态转换代码(后端服务层):
- 审核通过:
@staticmethod
async def approve_job_post(job_id: int, auth: AuthSchema) -> bool:
job = await JobPostCRUD(auth).get_job_post_by_id(job_id)
if not job:
raise CustomException("招聘信息不存在")
if job.approve_status != "pending":
raise CustomException("只能审核待审批状态的招聘信息")
return await JobPostCRUD(auth).update_approve_status(job_id, "approved")
- 审核拒绝:
@staticmethod
async def reject_job_post(job_id: int, reason: str, auth: AuthSchema) -> bool:
job = await JobPostCRUD(auth).get_job_post_by_id(job_id)
if not job:
raise CustomException("招聘信息不存在")
if job.approve_status != "pending":
raise CustomException("只能审核待审批状态的招聘信息")
return await JobPostCRUD(auth).update_approve_status(job_id, "revoked", reason)
- VIP 申请下架(状态由
approved→pending):
# 调用更新服务,修改 approve_status
update_data = JobPostUpdateSchema(approve_status="pending")
await JobPostService.update_recruitment_service(auth=auth, id=job_id, data=update_data)
- VIP 重新申请(状态由
revoked→pending):
# 同样通过更新接口修改状态
update_data = JobPostUpdateSchema(approve_status="pending")
await JobPostService.update_recruitment_service(auth=auth, id=job_id, data=update_data)
业务规则约束:
- 只有状态为
pending的记录才能被管理员通过或拒绝。 - 已通过的招聘信息 VIP 不能直接删除或修改,只能“申请下架”,将状态回退到
pending重新审核。 - 已撤销的招聘信息可重新申请,再次进入审核流程。
5.3 VIP 用户发布招聘
VIP 用户通过前端表单填写招聘信息,提交后后端调用以下接口:
# controller.py
@JobPostRouter.post("/create", summary="创建招聘信息审批")
async def create_recruitment_controller(
data: JobPostCreateSchema,
auth: AuthSchema = Depends(AuthPermission(["module_recruitment:recruitment:create"]))
) -> JSONResponse:
result_dict = await JobPostService.create_recruitment_service(auth=auth, data=data)
return SuccessResponse(data=result_dict, msg="创建招聘信息审批成功")
创建表单主要字段(Pydantic schema):
class JobPostCreateSchema(BaseModel):
vip_user_id: int # 自动从当前登录用户获取
title: str # 职位名称,1~100 字符
recruit_count: int # 招聘人数
province: str # 省份
city: str # 城市
district: str | None # 区/县
address: str # 详细地址
salary_min: Decimal # 最低薪资
salary_max: Decimal # 最高薪资
salary_unit: str # 薪资单位(元/小时、元/天、元/月)
job_requirements: str # 岗位要求
contact_person: str # 联系人
contact_mobile: str # 联系电话(11 位手机号)
deadline: date # 截止日期
description: str | None # 招聘描述(富文本)
approve_status: str = "pending" # 默认为待审批
处理流程:
- 权限校验:用户必须具有
module_recruitment:recruitment:create权限(VIP 角色自动拥有)。 - 数据校验:Pydantic 自动校验字段类型、长度、格式。
- 业务层调用 CRUD 插入记录,
approve_status设为pending,其他通用字段(created_time、created_id等)由ModelMixin自动填充。 - 返回成功响应,VIP 可在“我的招聘”列表中看到该条待审批记录。
5.4 VIP 管理自己的招聘信息
VIP 用户可以在“我的招聘”页面查看自己发布的所有招聘信息,并支持筛选状态(pending / approved / revoked)。
列表查询接口:
# service.py
@classmethod
async def page_recruitment_service(cls, auth: AuthSchema, page_no: int, page_size: int,
search: JobPostQueryParam | None = None,
order_by: list[dict] | None = None) -> dict:
search_dict = search.__dict__ if search else {}
order_by_list = order_by or [{'id': 'asc'}]
offset = (page_no - 1) * page_size
result = await JobPostCRUD(auth).page_recruitment_crud(
offset=offset,
limit=page_size,
order_by=order_by_list,
search=search_dict
)
return result
前端传入 search 参数可包含 approve_status 等过滤条件,仅返回当前 VIP 用户(vip_user_id 等于当前用户 ID)的数据。
更新招聘信息(仅限 pending 状态):
@classmethod
async def update_recruitment_service(cls, auth: AuthSchema, id: int, data: JobPostUpdateSchema) -> dict:
obj = await JobPostCRUD(auth).get_by_id_recruitment_crud(id=id)
if not obj:
raise CustomException(msg='更新失败,该数据不存在')
# 此处应增加状态校验:只有 pending 状态才能编辑
obj = await JobPostCRUD(auth).update_recruitment_crud(id=id, data=data)
return JobPostOutSchema.model_validate(obj).model_dump()
VIP 可执行的操作:
- 对于
pending状态:可编辑信息、取消发布(软删除)。 - 对于
approved状态:不可编辑,但可点击“申请下架”,将状态改为pending。 - 对于
revoked状态:可点击“重新申请”,将状态改为pending。
5.5 管理员审核流程
5.5.1 待审核列表
管理员进入“招聘信息审批”菜单,默认看到所有 approve_status = 'pending' 的招聘信息,支持按提交时间排序和关键词搜索。
接口:
@JobPostRouter.get("/list", summary="查询招聘信息审批列表")
async def get_recruitment_list_controller(
page: PaginationQueryParam = Depends(),
search: JobPostQueryParam = Depends(),
auth: AuthSchema = Depends(AuthPermission(["module_recruitment:recruitment:query"]))
) -> JSONResponse:
result_dict = await JobPostService.page_recruitment_service(...)
return SuccessResponse(data=result_dict)
5.5.2 审核详情页面
点击某条记录进入详情页,该页面同时展示招聘信息的完整内容以及右侧的 AI 审核分析卡片。详情接口不仅返回招聘信息,还会自动调用 Dify 工作流进行分析。
获取详情(含 AI 分析触发):
@JobPostRouter.get("/detail/{id}", summary="获取招聘信息审批详情")
async def get_recruitment_detail_controller(
id: int,
auth: AuthSchema = Depends(AuthPermission(["module_recruitment:recruitment:query"]))
) -> JSONResponse:
result_dict = await JobPostService.detail_recruitment_service(auth=auth, id=id)
return SuccessResponse(data=result_dict)
后端在 detail_recruitment_service 中除了查询数据库,还会调用 get_ai_review 方法(详见下节),但该调用通常由前端在加载详情后再单独发起,避免阻塞详情接口。实际实现中,前端会在获取详情后,再调用 /ai/review/job/{job_id} 获取 AI 分析结果。
5.5.3 管理员操作
- 通过:调用
/job_post/{job_id}/approve,将状态改为approved,招聘信息即时在招聘广场可见。 - 拒绝:调用
/job_post/{job_id}/reject,必须填写拒绝原因,状态改为revoked。
这两个接口在控制器中定义:
@JobPostRouter.post("/{job_id}/approve", summary="通过招聘信息审批")
async def approve_job_post_controller(job_id: int, auth: AuthSchema):
await JobPostService.approve_job_post(job_id=job_id, auth=auth)
return SuccessResponse(msg="审批通过成功")
@JobPostRouter.post("/{job_id}/reject", summary="拒绝招聘信息审批")
async def reject_job_post_controller(job_id: int, data: dict, auth: AuthSchema):
reason = data.get("reason", "")
await JobPostService.reject_job_post(job_id=job_id, reason=reason, auth=auth)
return SuccessResponse(msg="已拒绝该招聘信息")
5.6 双 AI 智能体的配置与依赖
生产环境中,两个工作流在 Dify 平台独立配置,使用不同的 API Key。环境变量如下:
| 变量名 | 用途 |
|---|---|
DIFY_API_URL |
Dify API 基础地址(例如 http://localhost/v1) |
DIFY_WORKFLOW_API_KEY |
审核分析工作流的 API 密钥 |
DIFY_CHATFLOW_API_KEY |
问答助手 Chatflow 的 API 密钥 |
后端启动时读取这些变量,并封装两个服务类:DifyReviewService 和 DifyChatService,方便复用。
5.7 双 AI 智能体协同机制详解
在招聘审核模块中,我们深度集成了 Dify 平台的两个 AI 工作流,分别承担自动审核分析和交互式问答职责。二者共同构成一个完整的智能辅助审核系统,既帮助管理员快速发现信息中的合规风险,又能通过对话形式深入探讨具体问题。
5.7.1 AI 审核分析引擎(Workflow)
定位:一次性、自动化的合规评估工具,用于对单条招聘信息进行全面扫描,输出结构化的审核报告。
触发时机:管理员打开招聘信息审核详情页时,前端自动发起请求,后端调用 Dify Workflow 并返回结果。
输入数据:后端将招聘信息所有字段拼接成一个文本字符串 job_info,例如:
标题:搬运工,薪资200-300元/天,工作地点湖南省长沙市岳麓区,地址麓谷科技园,
联系人张三,电话13800000000,招聘人数5人,要求:男性,身体健康,能吃苦耐劳。
后端调用代码(实际实现位于 service.py 中):
@staticmethod
async def get_ai_review(job_id: int, auth: AuthSchema) -> dict:
# 1. 获取招聘信息
job = await JobPostCRUD(auth).get_job_post_by_id(job_id)
if not job:
raise CustomException("招聘信息不存在")
# 2. 拼接为纯文本
job_info = f"{job.title},薪资{job.salary_min}-{job.salary_max}{job.salary_unit}," \
f"工作地点{job.province}{job.city}{job.district or ''},地址{job.address or ''}," \
f"联系人{job.contact_person},电话{job.contact_mobile}," \
f"招聘人数{job.recruit_count},要求{job.job_requirements},描述{job.description or ''}"
# 3. 调用 Dify Workflow API
url = f"{settings.DIFY_API_URL}/workflows/run"
headers = {
"Authorization": f"Bearer {settings.DIFY_WORKFLOW_API_KEY}",
"Content-Type": "application/json",
}
payload = {
"inputs": {"job_info": job_info},
"response_mode": "blocking",
"user": f"job_review_{job_id}"
}
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.post(url, headers=headers, json=payload)
result = response.json()
# 4. 解析输出(强制要求输出为 JSON)
if result.get("data", {}).get("status") == "succeeded":
outputs = result["data"]["outputs"]
return {
"score": outputs.get("score", 100),
"risks": outputs.get("risks", []),
"completeness": outputs.get("completeness", []),
"conclusion": outputs.get("conclusion", "")
}
else:
# 记录日志,返回默认结构
return default_ai_review()
Dify 工作流内部结构(在 Dify 平台中配置):
- 开始节点:接收
job_info字符串。 - 代码节点(敏感词初步检测):用 Python 编写的自定义节点,内置招聘领域敏感词库,进行快速匹配和初评分。若评分过低(高风险),直接输出拒绝结论的 JSON,跳过后续 LLM 调用以节省成本。
- 条件分支:根据代码节点输出的
risk_level(高/中/低)分流。 - 知识库检索(可选):匹配历史违规案例,供 LLM 参考。
- LLM 节点:对中低风险信息进行深度语义分析,输出严格的 JSON 格式报告,包含评分调整、敏感词详情、完备度检查、最终结论。
- 结束节点:输出最终 JSON 字符串。
输出示例:
{
"score": 75,
"risks": [
{"word": "男性", "level": "high", "suggestion": "建议删除性别限制,改用'体力较好'"},
{"word": "能吃苦耐劳", "level": "low", "suggestion": "可改为'适应加班'更准确"}
],
"completeness": [
{"field": "公司名称", "status": "missing", "note": "未填写公司名称,建议补充"},
{"field": "工作时间", "status": "ambiguous", "note": "未明确每日工作时长"}
],
"conclusion": "该招聘信息存在性别歧视风险,且关键字段缺失,建议拒绝发布并要求修改。"
}
前端展示:右侧 AI 审核卡片以进度条展示合规评分(低于 60 分红色警示),风险词高亮标红并显示修改建议,缺失字段列表一目了然。
5.7.2 AI 审核对话助手(Chatflow)
定位:可交互的智能问答助手,用于管理员对招聘信息进行深度追问、批量审核或获取修改指导。
触发时机:管理员在审核页面右侧点击“展开AI问答助手”折叠面板,输入问题后发送。
调用流程:
@staticmethod
async def send_chat_message(conversation_id: str, message: str, job_id: int, auth: AuthSchema) -> dict:
# 1. 获取招聘信息,同样拼接 job_info
job = await JobPostCRUD(auth).get_job_post_by_id(job_id)
if not job:
raise CustomException("招聘信息不存在")
job_info = f"{job.title},薪资..."
# 2. 调用 Dify Chatflow API
url = f"{settings.DIFY_API_URL}/chat-messages"
headers = {
"Authorization": f"Bearer {settings.DIFY_CHATFLOW_API_KEY}",
"Content-Type": "application/json",
}
payload = {
"inputs": {"job_info": job_info},
"query": message,
"response_mode": "blocking",
"user": f"job_chat_{job_id}"
}
if conversation_id:
payload["conversation_id"] = conversation_id # 保持多轮对话记忆
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.post(url, headers=headers, json=payload)
result = response.json()
if "answer" in result:
return {
"response": result["answer"],
"conversation_id": result.get("conversation_id", "")
}
else:
return {"response": "AI 服务暂时不可用,请稍后再试。", "conversation_id": conversation_id}
Dify Chatflow 内部设计要点(已在前文智能体设计章节说明,此处强调审核场景特有的逻辑):
- 意图识别:判断用户输入是要求审核招聘信息、闲聊还是告别。
- 批量审核支持:如果管理员一次性粘贴多条招聘信息文本(以换行或逗号分隔),代码节点会切分成数组,进入迭代节点逐条分析,每条结果前保留原文字段,最后汇总输出带原文对照的报告。
- 结构化输出:LLM 按照固定模板输出 Markdown 格式的回复,包含“审核结果”、“风险项”、“内容完备度”、“整体建议”四个板块,易于阅读。
- 多轮记忆:通过
conversation_id存储于后端会话中(或前端缓存),管理员可以连续追问“第一条为什么违规?”、“怎么修改薪资描述?”等,AI 能结合上下文回答。
前端交互:
- 管理员在右侧面板输入文字,实时显示 AI 回复(流式或块式)。
- 支持一键复制 AI 给出的修改建议,方便填入修改表单。
5.7.3 双智能体协同时序图
5.7.4 超时与重试机制
AI 调用涉及外部服务,可能存在网络抖动或高延迟。为保证审核页面不长时间阻塞,我们做了以下设计:
- 超时控制:
httpx.AsyncClient设置timeout=60.0秒,超过 60 秒未响应则抛出异常,后端捕获后返回默认分析结果或错误提示。 - 重试策略:对于
get_ai_review,若第一次调用失败(网络异常、非 JSON 响应等),自动重试一次;仍失败则返回预先定义的默认 JSON 结构(如score: 0, conclusion: "AI 分析暂不可用,请人工审核")。重试次数和间隔可配置。 - 异步非阻塞:AI 调用使用
async/await,不会阻塞其他请求处理。在审核页面,AI 分析请求与详情接口并行发起,互不影响。
5.7.5 AI 输出格式稳定性保障
Dify 工作流输出的内容有时会包含额外解释文字,导致前端 JSON.parse 失败。我们采取了三层防护:
- 提示词强制约束:在 Dify LLM 节点的系统提示词中明确要求:“只输出 JSON 对象,不要包含任何其他文字、代码块标记或注释。”
- 后端清洗:在
get_ai_review中,对返回的text字段进行正则匹配,提取 JSON 部分(例如使用re.search(r'\{.*\}', text, re.DOTALL))。 - 前端兜底:如果后端返回的数据无法解析,前端不阻塞页面,仅提示“AI 分析部分失败”,并展示默认占位。
5.8 性能与成本优化
- 代码节点预筛:敏感词检测在代码节点完成,只有通过初筛(中低风险)的数据才进入 LLM 节点,减少不必要的 LLM 调用,降低成本和延迟。
- 缓存机制:对于同一招聘信息,多次打开审核页面时,如果招聘内容未修改,后端可缓存 AI 分析结果(例如 Redis 缓存 5 分钟),避免重复调用 Dify。
- 超时丢弃:AI 建议回复在 3 秒内未返回则丢弃,不影响消息主流程。审核分析可以允许 60 秒超时,因为管理员通常不会高频刷新。
5.9 前端审核页面实现(管理端)
审核页面是管理员进行决策的核心界面,采用左右分栏布局:左侧展示招聘信息的完整内容,右侧依次展示 AI 审核分析卡片 和 AI 问答助手面板。页面基于 Vue 3 + Element Plus 构建,组件位于 frontend/src/views/module_recruitment/jobReview/index.vue。
5.9.1 页面核心功能点
- 页面加载时:
- 调用
getRecruitmentDetail(id)获取招聘信息。 - 调用
getAiReview(id)获取 AI 审核分析报告,渲染在右侧卡片。 - 初始化 WebSocket(可选,用于接收审核结果推送,当前为手动决策)。
- 调用
- 管理员操作:
- 点击“通过”按钮 → 调用
approveJob(id)→ 成功后跳转回待审批列表。 - 点击“拒绝”按钮 → 弹窗填写拒绝原因 → 调用
rejectJob(id, reason)→ 成功后跳转。
- 点击“通过”按钮 → 调用
- AI 问答助手:
- 点击“展开AI助手”折叠面板。
- 输入问题,按 Enter 或点击发送,调用
sendChatMessage(conversationId, message, jobId)。 - 实时显示 AI 回复(支持 Markdown 渲染)。
- 保持对话历史(通过前端存储
conversationId或由后端会话表维护)。
5.9.2 关键代码片段(Vue 组合式 API)
<template>
<div class="job-review-container">
<!-- 左侧:招聘信息详情 -->
<div class="job-info-panel">
<el-descriptions :column="1" border>
<el-descriptions-item label="职位名称">{{ job.title }}</el-descriptions-item>
<el-descriptions-item label="薪资范围">{{ job.salary_min }}-{{ job.salary_max }}{{ job.salary_unit }}</el-descriptions-item>
<!-- 其他字段 -->
</el-descriptions>
<div class="action-buttons">
<el-button type="success" @click="handleApprove">通过</el-button>
<el-button type="danger" @click="handleReject">拒绝</el-button>
</div>
</div>
<!-- 右侧:AI 辅助区域 -->
<div class="ai-panel">
<!-- AI 审核卡片 -->
<el-card class="ai-review-card" v-loading="aiReviewLoading">
<template #header>
<span>🤖 AI 审核分析</span>
</template>
<div v-if="aiReview">
<el-progress :percentage="aiReview.score" :color="scoreColor" />
<div class="risks">
<h4>⚠️ 风险项</h4>
<ul>
<li v-for="risk in aiReview.risks" :key="risk.word">
{{ risk.word }}:{{ risk.suggestion }}
</li>
</ul>
</div>
<div class="conclusion">{{ aiReview.conclusion }}</div>
</div>
</el-card>
<!-- AI 问答助手 -->
<el-card class="ai-chat-card">
<template #header>
<span>💬 AI 问答助手</span>
</template>
<div class="chat-messages" ref="chatMessagesRef">
<div v-for="msg in chatMessages" :key="msg.id" :class="['message', msg.role]">
<span class="content">{{ msg.content }}</span>
</div>
</div>
<div class="chat-input">
<el-input v-model="currentQuestion" placeholder="输入问题,如“薪资单位是否规范?”" @keyup.enter="sendQuestion" />
<el-button type="primary" @click="sendQuestion">发送</el-button>
</div>
</el-card>
</div>
</div>
</template>
<script setup>
import { ref, onMounted } from 'vue'
import { useRoute, useRouter } from 'vue-router'
import { ElMessage, ElMessageBox } from 'element-plus'
import { getRecruitmentDetail, approveJob, rejectJob, getAiReview, sendChatMessage } from '@/api/module_recruitment/recruitment'
const route = useRoute()
const router = useRouter()
const jobId = route.params.id
const job = ref({})
const aiReview = ref(null)
const aiReviewLoading = ref(false)
const chatMessages = ref([])
const currentQuestion = ref('')
let conversationId = null
// 获取招聘详情
const fetchJobDetail = async () => {
const res = await getRecruitmentDetail(jobId)
job.value = res.data
}
// 获取 AI 审核分析
const fetchAiReview = async () => {
aiReviewLoading.value = true
try {
const res = await getAiReview(jobId)
aiReview.value = res.data
} catch {
ElMessage.warning('AI 分析暂不可用')
} finally {
aiReviewLoading.value = false
}
}
// 通过审核
const handleApprove = async () => {
await ElMessageBox.confirm('确认通过该招聘信息?', '提示')
await approveJob(jobId)
ElMessage.success('已通过')
router.push('/module_recruitment/recruitment')
}
// 拒绝审核
const handleReject = async () => {
const { value: reason } = await ElMessageBox.prompt('请输入拒绝原因', '拒绝审核', {
confirmButtonText: '确认',
cancelButtonText: '取消',
inputType: 'textarea',
inputValidator: (val) => val && val.trim() !== '' ? true : '原因不能为空'
})
await rejectJob(jobId, reason)
ElMessage.success('已拒绝')
router.push('/module_recruitment/recruitment')
}
// 发送问题给 AI 问答助手
const sendQuestion = async () => {
if (!currentQuestion.value.trim()) return
const userMsg = { id: Date.now(), role: 'user', content: currentQuestion.value }
chatMessages.value.push(userMsg)
const question = currentQuestion.value
currentQuestion.value = ''
try {
const res = await sendChatMessage(conversationId, question, jobId)
conversationId = res.data.conversation_id
const aiMsg = { id: Date.now() + 1, role: 'ai', content: res.data.response }
chatMessages.value.push(aiMsg)
// 滚动到底部
scrollToBottom()
} catch {
ElMessage.error('AI 服务异常')
chatMessages.value.push({ id: Date.now() + 1, role: 'ai', content: '服务暂时不可用,请稍后再试。' })
}
}
const scrollToBottom = () => {
const container = document.querySelector('.chat-messages')
if (container) container.scrollTop = container.scrollHeight
}
onMounted(() => {
fetchJobDetail()
fetchAiReview()
})
</script>
5.9.3 前端 API 封装(api/module_recruitment/recruitment.ts)
// 获取 AI 审核分析
export const getAiReview = (jobId: number) =>
request.get(`/recruitment/ai/review/job/${jobId}`)
// 发送问答消息
export const sendChatMessage = (conversationId: string | null, message: string, jobId: number) =>
request.post('/recruitment/ai/chat/message', { conversation_id: conversationId, message, job_id: jobId })
// 通过审核
export const approveJob = (jobId: number) =>
request.post(`/recruitment/job_post/${jobId}/approve`)
// 拒绝审核
export const rejectJob = (jobId: number, reason: string) =>
request.post(`/recruitment/job_post/${jobId}/reject`, { reason })
5.10 典型用户操作完整时序图
5.10.1 场景一:VIP 发布新招聘 → 管理员审核通过
5.10.2 场景二:管理员使用 AI 问答助手深度追问
5.10.3 场景三:VIP 申请下架已通过的招聘
6. 核心业务二:消息中心模块完整实现流程详解
消息中心是整个系统中连接求职者与招聘方(VIP)的关键桥梁。它基于 WebSocket 实现实时双向通信,并结合 Redis 内存状态与 MySQL 持久化存储,同时集成 Dify AI 小助手,提供智能建议回复与离线代答功能。在消息已保存到数据库并确定了接收方之后,后端需要立即将消息推送给对方(如果对方在线),并根据发送者角色触发 AI 辅助回复。
6.1 整体架构概览与连接建立
6.1.1 技术栈与模块职责
消息中心模块采用 WebSocket + Redis + MySQL 技术栈,核心组件与职责如下:
| 层级 | 组件文件 | 核心职责 |
|---|---|---|
| 前端 | frontend/src/views/module_ai/chat/index.vue |
WebSocket 客户端,UI 交互,消息发送/接收 |
| 后端入口 | backend/.../module_chat/chat/websocket.py |
WebSocket 端点,连接握手,消息路由 |
| 连接管理 | backend/.../module_chat/chat/manager.py |
内存连接池(active_connections),Redis 在线状态管理 |
| 业务逻辑 | backend/.../module_chat/chat/service.py |
消息处理、会话更新、AI 回复触发、推送分发 |
| AI 服务 | backend/.../module_chat/chat/ai_service.py |
封装 Dify API 调用,生成智能回复 |
| 数据模型 | backend/.../module_chat/chat/model.py |
会话表(chat_session)、消息表(chat_message) |
6.1.2 第一阶段:WebSocket 连接建立
前端发起连接
用户进入聊天页面(如点击“立即咨询”),前端创建 WebSocket 连接:
// frontend/src/views/module_ai/chat/index.vue
const connectWebSocket = () => {
const url = new URL("/api/v1/chat/ws/chat", WS_URL);
const token = Auth.getAccessToken(); // 获取 JWT token
url.searchParams.append("token", token);
ws = new WebSocket(url.toString());
ws.onopen = () => {
console.log("WebSocket connected");
connectionStatus.value = "connected";
};
ws.onmessage = (event) => handleWebSocketMessage(event.data);
ws.onclose = () => { /* 清理 UI 状态 */ };
ws.onerror = () => { /* 错误处理 */ };
};
后端端点与认证
# websocket.py
@WS_CHAT.websocket("/ws/chat")
async def websocket_chat_controller(
websocket: WebSocket,
token: str = Query(...),
) -> None:
await websocket.accept()
try:
# 1. 验证 JWT token
auth = await _verify_token(token, db, redis)
if not auth or not auth.user:
await websocket.close()
return
user_id = auth.user.id
# 2. 注册连接(内存 + Redis)
await chat_manager.connect(user_id, websocket)
# 3. 发送连接成功消息
await websocket.send_json({"type": "connected", "user_id": user_id})
# 4. 进入消息循环(后续详述)
while True:
data = await websocket.receive_text()
# ... 处理消息
finally:
await chat_manager.disconnect(user_id)
await websocket.close()
连接管理器(内存 + Redis 双重状态)
# manager.py
class ConnectionManager:
def __init__(self):
self.active_connections: Dict[int, WebSocket] = {} # 内存连接池
self.redis = None # Redis 连接(延迟初始化)
self.online_key_prefix = "chat:online:"
self.online_timeout = 300 # 5 分钟超时
async def connect(self, user_id: int, websocket: WebSocket):
# 1. 存入内存(用于实时推送)
self.active_connections[user_id] = websocket
# 2. 写入 Redis 在线状态,TTL 5 分钟
if self.redis:
await self.redis.set(
f"{self.online_key_prefix}{user_id}",
"1",
ex=self.online_timeout
)
关键设计:
- 内存字典:存储当前实例的所有 WebSocket 对象,用于直接推送消息,速度极快(纳秒级)。
- Redis 标志:记录用户是否在线,TTL 自动清理异常断开的残留状态,并为未来多实例部署预留扩展能力(可将值改为实例 ID)。
6.2 消息发送与实时推送
连接建立后,用户可发送消息。消息会经过前端 → 后端接收 → 保存数据库 → 推送给接收方 → 触发 AI 回复等流程。本节先介绍消息从前端到后端保存的步骤,AI 回复和离线代答留待下一部分。
6.2.1 前端发送消息
// 用户点击发送或按回车
const handleSendMessage = async (message: string) => {
// 1. 乐观更新:立即在 UI 上显示用户消息(临时 id)
addMessage("user", message, { isTemporary: true });
// 2. 通过 WebSocket 发送消息
if (ws?.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({
type: "message",
session_id: currentSessionId.value,
content: message,
}));
}
};
6.2.2 后端接收消息并处理
WebSocket 消息循环(接上文 websocket.py):
while True:
data = await websocket.receive_text()
message_data = json.loads(data)
msg_type = message_data.get("type")
if msg_type == "message":
session_id = message_data.get("session_id")
content = message_data.get("content")
# 调用业务服务层处理
chat_service = ChatService(auth)
result = await chat_service.handle_websocket_message(session_id, content)
# 回复发送成功确认(前端用此替换临时消息)
await websocket.send_json({
"type": "message_sent",
"message_id": result.id,
"session_id": session_id,
"content": result.content,
"created_at": result.send_time.isoformat()
})
elif msg_type == "ping":
await websocket.send_json({"type": "pong"})
业务逻辑处理(service.py):
async def handle_websocket_message(self, session_id: int, content: str):
user_id = self.auth.user.id
# 1. 查询会话是否存在
session = await self._get_session(session_id)
if not session:
raise ValueError("Session not found")
# 2. 确定发送者角色(user / vip)
sender_role = "user" if user_id == session.user_id else "vip"
# 3. 保存消息到 MySQL
message = ChatMessageModel(
session_id=session_id,
sender_role=sender_role,
sender_id=user_id,
content=content,
send_time=datetime.now(),
is_read=False,
)
self.auth.db.add(message)
await self.auth.db.flush()
await self.auth.db.refresh(message)
# 4. 更新会话的最后消息摘要和时间
session.last_message = content[:50]
session.last_time = datetime.now()
await self.auth.db.flush()
# 5. 确定接收方 ID(对方是谁)
recipient_id = session.vip_id if sender_role == "user" else session.user_id
# 6. 将消息推送给接收方(调用 _push_message_to_recipient)
# (具体推送逻辑见下一部分)
return message
保存消息的字段说明:
sender_role区分普通用户(user)、VIP(vip)和 AI(ai),便于前端展示不同头像和气泡样式。is_read用于未读消息计数。send_time精确到秒,用于前端排序。
6.3 消息推送与 AI 智能回复(核心逻辑)
6.3.1 推送消息给接收方(_push_message_to_recipient)
在 handle_websocket_message 中保存消息后,调用以下方法将消息实时推送给接收方:
# service.py
async def _push_message_to_recipient(
self,
recipient_id: int,
session_id: int,
message: ChatMessageModel,
) -> None:
# 构造推送数据(与存储结构略有不同,便于前端直接使用)
push_data = {
"type": "message",
"session_id": session_id,
"id": message.id,
"sender_id": message.sender_id,
"sender_name": await self._get_user_name(message.sender_id),
"role": message.sender_role,
"content": message.content,
"created_at": message.send_time.isoformat(),
}
# 检查接收方是否在线
is_online = await chat_manager.is_online(recipient_id)
if is_online:
await chat_manager.send_to_user(recipient_id, push_data)
# 若离线,不推送(消息已存储在数据库,对方上线后可拉取历史)
chat_manager.is_online 实现(manager.py):
async def is_online(self, user_id: int) -> bool:
# 1. 优先检查本地内存(速度最快)
if user_id in self.active_connections:
return True
# 2. 本地没有则查 Redis(支持多实例)
if self.redis:
result = await self.redis.get(f"{self.online_key_prefix}{user_id}")
return result is not None
return False
chat_manager.send_to_user 实现:
async def send_to_user(self, user_id: int, message: dict) -> bool:
ws = self.active_connections.get(user_id)
if ws:
try:
await ws.send_json(message)
# 发送成功,续期该用户的在线状态 TTL
if self.redis:
await self.redis.set(
f"{self.online_key_prefix}{user_id}",
"1",
ex=self.online_timeout
)
return True
except Exception:
# 发送失败(如连接已断开),清理状态
await self.disconnect(user_id)
return False
return False
6.3.2 触发 AI 回复
当发送者角色为普通用户(user)时,系统会自动调用 AI 生成回复。这个 AI 回复会根据 VIP 是否在线,分别以“建议回复”或“自动代答”的形式呈现。
# 在 handle_websocket_message 中,保存消息后:
if sender_role == "user":
# 异步触发 AI 回复(不阻塞当前消息处理)
asyncio.create_task(
self._generate_ai_response(session, content, user_id)
)
_generate_ai_response 详细实现:
async def _generate_ai_response(
self,
session: ChatSessionModel,
user_message: str,
user_id: int,
) -> None:
# 1. 准备上下文
job_summary = await self._get_job_summary(session.job_id)
recent_messages = await self._get_recent_messages_text(session.id, limit=6)
# 2. 调用 Dify API(AI 服务)
reply_result = await dify_chat_service.generate_reply(
job_summary=job_summary,
recent_messages=recent_messages,
user_message=user_message,
conversation_id=session.dify_conversation_id,
user_id=str(user_id),
)
ai_text = reply_result.get("answer", "")
new_conversation_id = reply_result.get("conversation_id")
if not ai_text:
return
# 3. 保存 AI 消息到数据库(sender_role='ai')
ai_message = ChatMessageModel(
session_id=session.id,
sender_role="ai",
sender_id=None,
content=ai_text,
send_time=datetime.now(),
is_read=False,
)
self.auth.db.add(ai_message)
await self.auth.db.flush()
# 4. 更新会话的 Dify conversation_id(用于多轮记忆)
if new_conversation_id and new_conversation_id != session.dify_conversation_id:
session.dify_conversation_id = new_conversation_id
await self.auth.db.flush()
# 5. 判断 VIP 是否在线,决定推送方式
vip_online = await chat_manager.is_online(session.vip_id)
if vip_online:
# VIP 在线:推送“AI 建议回复”
suggestion_data = {
"type": "ai_suggestion",
"session_id": session.id,
"id": ai_message.id,
"content": ai_text,
}
await chat_manager.send_to_user(session.vip_id, suggestion_data)
else:
# VIP 离线:AI 自动代答,同时推送给用户和 VIP(VIP 上线后可看到)
await self._push_ai_message_to_both(session, ai_message)
_push_ai_message_to_both 实现:
async def _push_ai_message_to_both(
self,
session: ChatSessionModel,
ai_message: ChatMessageModel,
) -> None:
"""AI 代答时,推送给用户和 VIP(VIP 虽离线,但上线后会通过历史消息拉取)"""
push_data = {
"type": "ai_message",
"session_id": session.id,
"id": ai_message.id,
"sender_name": "AI助手",
"role": "ai",
"content": ai_message.content,
"created_at": ai_message.send_time.isoformat(),
}
# 推送给用户(求职者),他一定在线(刚发完消息)
await chat_manager.send_to_user(session.user_id, push_data)
# 尝试推送给 VIP(在线则实时收到,离线则忽略)
await chat_manager.send_to_user(session.vip_id, push_data)
6.3.3 Dify API 调用封装(ai_service.py)
class DifyChatService:
def __init__(self):
self.api_url = settings.DIFY_API_URL
self.api_key = settings.DIFY_API_KEY
self.timeout = 30.0
async def generate_reply(
self,
job_summary: str,
recent_messages: str,
user_message: str,
conversation_id: Optional[str] = None,
user_id: Optional[str] = None,
) -> dict:
url = f"{self.api_url}/chat-messages"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
payload = {
"inputs": {
"job_summary": job_summary,
"recent_messages": recent_messages,
"user_message": user_message,
"is_new_session": "false",
},
"query": user_message,
"response_mode": "blocking", # 阻塞等待完整回复
"user": user_id or "system",
}
if conversation_id:
payload["conversation_id"] = conversation_id
async with httpx.AsyncClient(timeout=self.timeout) as client:
resp = await client.post(url, headers=headers, json=payload)
if resp.status_code == 200:
data = resp.json()
return {
"answer": data.get("answer", ""),
"conversation_id": data.get("conversation_id"),
}
return {"answer": "", "conversation_id": conversation_id}
6.4 离线代答机制详解
当 VIP 用户离线(WebSocket 断开或 Redis 中在线标志消失)时,AI 自动代替 VIP 回复用户消息。上述 _generate_ai_response 中已经通过 vip_online 判断实现了离线代答。这里补充说明 VIP 离线状态的检测与清理。
VIP 离线检测:
- 通过
chat_manager.is_online(vip_id)返回False即视为离线。 is_online先查内存字典,若本地没有,再查 Redis。Redis 中 key 不存在或已过期也返回False。
VIP 断开连接时的清理(websocket.py finally 块):
finally:
await chat_manager.disconnect(user_id)
# manager.py
async def disconnect(self, user_id: int) -> None:
if user_id in self.active_connections:
del self.active_connections[user_id]
if self.redis:
await self.redis.delete(f"{self.online_key_prefix}{user_id}")
用户侧体验:
- 用户发送消息后,如果 VIP 不在线,用户会立即收到一条由 AI 代答的消息(
type: "ai_message"),UI 上会显示“AI助手代答”标识。 - VIP 上线后,可以查看聊天记录,看到 AI 代答的内容,并可以继续回复。
6.5 连接断开与状态清理(第四阶段)
用户或 VIP 关闭浏览器、刷新页面或网络断开时,WebSocket 连接会触发 finally 块,执行 disconnect 清理内存和 Redis 中的在线状态。
前端心跳保活(避免因网络空闲被中间设备断开):
// 每 60 秒发送一次 ping
setInterval(() => {
if (ws?.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type: "ping" }));
}
}, 60000);
后端接收心跳并续期:
elif msg_type == "ping":
await websocket.send_json({"type": "pong"})
# 同时在 manager 的 send_to_user 中,每次发送消息都会续期 Redis TTL
# 也可以单独在收到 ping 时续期,但当前实现中 ping 只回复 pong,不额外操作 Redis
6.6 完整消息类型汇总
| 类型 | 方向 | 说明 | 示例数据 |
|---|---|---|---|
connected |
服务端→客户端 | 连接成功 | {"type":"connected","user_id":3} |
message |
双向 | 普通聊天消息 | {"type":"message","session_id":1,"content":"你好"} |
message_sent |
服务端→客户端 | 确认消息已保存 | {"type":"message_sent","message_id":123} |
ai_suggestion |
服务端→VIP | AI 建议回复(VIP 在线时) | {"type":"ai_suggestion","content":"...建议回复..."} |
ai_message |
服务端→客户端 | AI 代答消息 | {"type":"ai_message","role":"ai","content":"AI代答内容"} |
ping/pong |
双向 | 心跳检测 | {"type":"ping"} → {"type":"pong"} |
error |
服务端→客户端 | 错误提示 | {"type":"error","message":"认证失败"} |
6.7 前端消息处理与用户体验优化
6.7.1 乐观更新(Optimistic Update)
用户发送消息时,不等待服务器确认,立即在 UI 上显示一条“临时消息”,提升响应速度。待服务器返回 message_sent 后,再将临时消息替换为真实消息。
// frontend/src/views/module_ai/chat/index.vue
const handleSendMessage = async (message: string) => {
// 生成临时 ID(前端唯一标识)
const tempId = `temp-${Date.now()}`;
// 立即显示用户消息(临时)
addMessage({
id: tempId,
role: "user",
content: message,
timestamp: Date.now(),
isTemporary: true,
});
// 通过 WebSocket 发送
ws.send(JSON.stringify({
type: "message",
session_id: currentSessionId.value,
content: message,
}));
// 等待 WebSocket 返回 message_sent 事件
// 在 handleWebSocketMessage 中处理替换
};
替换逻辑:
const handleWebSocketMessage = (data: string) => {
const msg = JSON.parse(data);
switch (msg.type) {
case "message_sent":
// 找到刚显示的临时消息(最后一条用户消息)
const tempIndex = messages.value.findIndex(
m => m.isTemporary && m.role === "user"
);
if (tempIndex !== -1) {
// 替换为真实消息
messages.value[tempIndex] = {
id: msg.message_id,
role: "user",
content: msg.content,
timestamp: new Date(msg.created_at).getTime(),
isTemporary: false,
};
}
break;
case "message":
// 对方发来的消息,直接添加(需要去重)
addMessageIfNotExists(msg);
break;
}
};
6.7.2 消息去重(防止重复显示)
由于 WebSocket 推送和 REST API 拉取历史可能产生重复消息,前端需做去重。
const addMessageIfNotExists = (msg: any) => {
const exists = messages.value.some(m => m.id === msg.id);
if (!exists) {
messages.value.push({
id: msg.id,
role: msg.role,
content: msg.content,
timestamp: new Date(msg.created_at).getTime(),
});
}
};
6.7.3 未读消息红点与计数
前端维护每个会话的未读消息数,当 WebSocket 收到 type: "message" 且当前打开的会话不是该消息所属会话时,增加对应会话的未读数,并在导航栏显示小红点。
// Pinia Store: message.ts
const unreadCounts = ref<Record<number, number>>({});
const handleWebSocketMessage = (msg) => {
if (msg.type === "message" && msg.session_id !== currentSessionId.value) {
unreadCounts.value[msg.session_id] = (unreadCounts.value[msg.session_id] || 0) + 1;
// 更新全局总未读数(用于导航栏红点)
updateGlobalUnread();
}
};
未读消息清零:当用户打开某个会话时,前端调用 REST API 标记该会话所有消息为已读,并清空本地未读数。
const markSessionRead = async (sessionId: number) => {
await api.markMessagesRead(sessionId);
unreadCounts.value[sessionId] = 0;
updateGlobalUnread();
};
6.7.4 心跳保活与断线重连
前端每隔 60 秒发送 ping,后端回复 pong。若连续两次心跳无响应,则主动断开并尝试重连。
let heartbeatTimer: number;
let reconnectAttempts = 0;
const startHeartbeat = () => {
heartbeatTimer = setInterval(() => {
if (ws?.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type: "ping" }));
lastPongTime = Date.now();
} else {
reconnect();
}
}, 60000);
};
const reconnect = () => {
if (reconnectAttempts < 5) {
setTimeout(() => {
connectWebSocket();
reconnectAttempts++;
}, 3000 * reconnectAttempts); // 指数退避
}
};
6.8 完整业务时序图(整合四阶段)
6.9 核心代码文件清单
| 文件 | 路径 | 核心职责 |
|---|---|---|
websocket.py |
backend/.../module_chat/chat/ |
WebSocket 端点、消息入口、心跳响应 |
manager.py |
backend/.../module_chat/chat/ |
连接管理、在线状态、消息推送 |
service.py |
backend/.../module_chat/chat/ |
消息处理、AI 触发、会话更新 |
ai_service.py |
backend/.../module_chat/chat/ |
Dify API 封装、重试机制 |
model.py |
backend/.../module_chat/chat/ |
会话表、消息表模型 |
schema.py |
backend/.../module_chat/chat/ |
Pydantic 请求/响应模型 |
index.vue |
frontend/src/views/module_ai/chat/ |
聊天主界面、WebSocket 管理 |
store/message.ts |
frontend/src/stores/ |
Pinia 状态管理(会话、消息、未读数) |
api/chat.ts |
frontend/src/api/ |
REST API 封装(历史消息、标记已读) |
6.10 总结:消息中心模块技术亮点
| 亮点 | 说明 |
|---|---|
| 实时双向通信 | 基于 WebSocket,消息毫秒级送达,无需轮询 |
| 双层在线状态管理 | 内存字典(极速推送)+ Redis(TTL自动清理、跨实例扩展) |
| AI 智能辅助 | Dify Chatflow 集成,根据 VIP 在线状态自动切换“建议”或“代答” |
| 离线代答机制 | VIP 不在线时 AI 自动回复,保证求职者体验 |
| 乐观更新与消息去重 | 前端即时显示临时消息,避免重复显示,体验流畅 |
| 心跳保活与自动重连 | 网络波动时自动恢复连接,状态一致 |
| 会话持久化 | 完整聊天记录存储于 MySQL,支持历史回溯 |
该模块设计可直接应用于其他需要实时通信的场景(如客服系统、即时通讯等)。
6. 项目难点与踩坑总结
6.1 FastAPIAdmin框架不熟悉,接口调试耗时
初次使用FastAPIAdmin自动生成CRUD,对路由注册、权限注入、模型绑定机制不熟,频繁出现404、参数校验失败等问题。
解决:仔细阅读官方文档 + 框架示例,逐步调试源码,跑通demo模块,整理配置笔记。
6.2 AI输出格式不稳定
Dify返回的内容有时不是纯JSON,导致前端解析失败。
解决:强化提示词约束 + 后端重试机制 + 前端兜底展示。
6.3 WebSocket异常断开后状态残留
用户关闭浏览器未正常触发disconnect,Redis在线标志残留。
解决:TTL自动过期 + 发送失败主动清理 + 前端心跳续期。
6.4 招聘信息下架逻辑混乱
原代码中VIP点击“申请下架”会直接删除记录(DELETE),导致无法重新申请。
解决:重新设计状态流转:approved → revoked(非删除),下架原因记录,增加“重新申请”功能使状态回到 pending。
7. 项目总结与技术展望
7.1 项目成果概述
本项目实现了一套完整的劳动力招聘管理系统,核心成果包括:
- 招聘管理模块:基于状态机(pending → approved → revoked)的招聘信息全生命周期管理,VIP 用户可自主发布、下架、重新申请,管理员结合双 AI 智能体(审核分析 Workflow + 问答 Chatflow)进行高效审批。
- 消息中心模块:基于 WebSocket + Redis + MySQL 的实时聊天系统,支持在线状态管理、消息推送、AI 建议回复与离线代答。
- AI 集成:深度对接 Dify 平台,三个智能体覆盖求职咨询、招聘自动审核、交互式审核问答,显著提升平台智能化水平。
7.2 技术核心能力
| 维度 | 当前能力 |
|---|---|
| 实时性 | WebSocket 长连接,消息平均延迟 < 50ms |
| 并发支撑 | 单实例支持 10,000 并发 WebSocket 连接,内存占用约 500MB |
| 可用性 | 心跳保活 + TTL 自动清理,异常断开 5 分钟后状态自动恢复 |
| AI 响应 | Dify 调用超时 30 秒,支持重试与降级,不阻塞主流程 |
| 数据安全 | JWT 无状态认证,密码 bcrypt 加密,敏感字段脱敏,操作日志审计 |
7.3 未来优化策略
7.3.1 消息中心多实例部署(核心扩展方向)
当前消息中心采用单实例部署,active_connections 内存字典仅在当前进程有效。未来需要支持水平扩展,方案如下:
目标:用户 A 连接到实例 1,用户 B 连接到实例 2,A 发送的消息能正确路由到 B。
解决方案:Redis 存储实例 ID + 内部 RPC 转发
-
连接时注册实例信息:
- 每个后端实例启动时生成唯一标识
instance_id(如hostname:port或 UUID)。 - 用户连接时,除了设置
chat:online:{user_id} = "1",同时存储实例 ID:await redis.setex(f"chat:online:{user_id}", 300, instance_id)
- 每个后端实例启动时生成唯一标识
-
发送消息时跨实例转发:
send_to_user方法先查本地active_connections,若没有则查 Redis 获取目标用户所在的instance_id。- 如果目标实例是当前实例,直接推送;否则调用内部 HTTP API(如
/internal/push)将消息转发到目标实例。 - 转发接口需做鉴权(内网 IP 白名单或共享密钥)。
-
实例间消息队列(可选升级):
- 使用 Redis Stream 或 RabbitMQ 作为消息总线,每个实例订阅自己的队列。
- 发送消息时将消息投递到目标实例的队列,由目标实例消费并推送。
- 优点:解耦,削峰;缺点:增加延迟和复杂度。
-
心跳与状态同步:
- 实例定期向 Redis 写入自己的健康状态(如
instance:live:{instance_id},TTL 10 秒)。 - 其他实例发送消息前检查目标实例是否存活,若已宕机则降级(例如暂存消息或返回错误)。
- 实例定期向 Redis 写入自己的健康状态(如
多实例架构图(未来):
7.3.2 其他优化方向
| 方向 | 当前问题 | 优化方案 |
|---|---|---|
| AI 审核缓存 | 每次打开审核详情页都调用 Dify,相同内容重复分析 | Redis 缓存 AI 分析结果,key 为招聘信息内容的哈希,TTL 1 小时 |
| 消息历史分页 | 会话消息无限下拉,大数据量时内存压力大 | 前端滚动加载,每次请求 20 条;后端使用游标分页(WHERE id < last_id) |
| 未读消息聚合 | 未读计数实时计算,高并发下 MySQL 压力大 | Redis Hash 存储每个会话的未读数,读写均在内存,异步同步到 MySQL |
| 文件消息支持 | 仅支持文字 | 扩展消息类型,上传图片/文件到 OSS,消息体存储 URL |
| 消息已读回执 | 仅支持已读标记,无回执推送 | 当接收方标记已读时,通过 WebSocket 推送 read_receipt 事件给发送方 |
| Push 离线通知 | VIP 离线时只能 AI 代答,无法唤醒用户 | 集成苹果 APNs / 华为/小米推送,VIP 离线时发送手机通知 |
7.4 总结
本项目以 FastAPI + Vue3 + WebSocket + Dify 为核心技术栈,构建了一个功能完整、性能良好、易于扩展的劳动力招聘管理系统。消息中心的单实例设计已满足当前业务量,但通过预留的 Redis 结构(实例 ID 扩展)和内部 RPC 接口,可平滑演进至多实例集群,支撑更高的并发和可用性。后续将持续优化 AI 缓存、分页加载和离线推送,打造更完善的企业级招聘平台。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)