山东大学软件学院创新实训 个人博客2
本周我的核心工作主要围绕儿科预诊系统的多智能体协作架构展开。我负责四大核心智能体(分诊、诊断、评审、方案)以及安全兜底智能体的基座设计。为了在接入真实大模型前验证整个状态机的流转逻辑,编写了具备完整雏形架构的 Mock 代码,并在图计算与接口层完成了工作流的集成。
以下是我本周个人工作的记录与设计剖析。
目录
一、 抽象智能体基类:规范调用协议与日志
在构建复杂的分布式或多模块系统时,最忌讳的就是每个模块拥有各自独立的输入输出格式。PediaMind 系统初步规划了分诊、诊断、评审、方案定稿以及安全兜底等多个 Agent,为了确保后续我在扩展新智能体时具备高度的一致性,我首先设计了 BaseAgent 抽象基类。
设计意图: 这个基类的核心目的在于解耦。通过定义统一的异步 run 方法,并强制约束所有 Agent 的入参和出参必须是 LangGraph 的状态字典(State Dict),我将具体的业务逻辑与外层的图计算路由完全隔离开来。同时,在 __call__ 方法中统一封装了loguru的日志打印,这样在追踪多轮博弈轨迹时,日志输出会非常标准清晰,这极大地降低了调试成本。
# app/agents/base.py
from abc import ABC, abstractmethod
from typing import Any
from loguru import logger
class BaseAgent(ABC):
"""所有 Agent 的抽象基类,规范统一的调用协议与日志。"""
name: str = "BaseAgent"
@abstractmethod
async def run(self, state: dict[str, Any]) -> dict[str, Any]:
"""执行智能体逻辑,接收并返回 LangGraph 状态字典子集。"""
...
async def __call__(self, state: dict[str, Any]) -> dict[str, Any]:
logger.info(f"[{self.name}] 开始执行")
result = await self.run(state)
logger.info(f"[{self.name}] 执行完毕")
return result
二、 四大核心业务智能体的设计与Mock实现
系统流转的核心依赖于四个业务 Agent:Triage(分诊特征提取)、Diagnosis(初步方案提案)、Review(蓝军对抗审核)和 Plan(最终报告定稿)。在本周的开发中,我为它们逐一编写了Mock逻辑。
设计意图: 为什么不使用一个庞大的 Prompt 让 AI 一步到位?因为在医疗场景下,安全性与严谨性是第一位的。将诊断和评审拆分为互相博弈的“红蓝对抗”机制,是为了确保这种审核逻辑在架构上是合理的。
在当前的 Mock 实现中,硬编码了“前两轮强制拒绝”的逻辑,以此来模拟复杂病情下,系统因为“检测到潜在年龄禁忌(如某类退热药对特定月龄婴儿的限制)”而打回方案、促发多轮打磨的真实博弈过程。
评审拦截逻辑设计: 在
retry_count < 2时,模拟检测到年龄与药物冲突,返回is_passed: False并使得retry_count递增;当达到第三轮时才予以放行。若达到系统预设的最大循环次数(如 3 次),则会触发FallbackAgent进行安全兜底。
# app/agents/review.py
from typing import Any
from loguru import logger
from app.agents.base import BaseAgent
class ReviewAgent(BaseAgent):
"""评审智能体:实施年龄禁忌与药物冲突核查 (Mock 架构)"""
name: str = "ReviewAgent"
async def run(self, state: dict[str, Any]) -> dict[str, Any]:
features: dict[str, Any] = state.get("triage_features", {})
retry_count: int = state.get("retry_count", 0)
logger.info(f"[{self.name}] 对第 {retry_count + 1} 轮方案进行评审")
# Mock 策略:前两轮强制不通过以演示博弈循环,第三轮通过
if retry_count < 2:
feedback = {
"is_passed": False,
"reason": (f"第 {retry_count + 1} 轮评审未通过:"
f"检测到潜在年龄禁忌,{features.get('age_months', 0)} 月龄患儿"
f"需二次确认用药剂量安全范围。")
}
return {"review_feedback": feedback, "retry_count": retry_count + 1}
feedback = {
"is_passed": True,
"reason": "诊断方案与用药建议通过年龄禁忌审核与药物冲突检测,准予定稿。"
}
return {"review_feedback": feedback}
三、 工作流路由编排与FastAPI接口集成
各个智能体开发完成后,相当于我们有了精良的零部件,接下来需要一条总线将它们串联起来,并对外提供服务入口。
我使用 LangGraph 在 app/graph/workflow.py 中编译了整个状态机,定义了条件路由(如根据 is_passed 决定是生成最终报告还是回退重试)。随后,我在 app/api/diagnosis.py 中实现了系统的总入口 /diagnosis/run。
设计意图: 前端传来的数据是结构化的(如姓名、月龄、体温等 JSON 字段),而大模型的输入更倾向于自然语言上下文。因此,我在接口层拦截了请求,将这些结构化体征拼接转化为自然语言描述,作为状态机的初始 user_input 启动工作流。同时,为了防止高并发场景下不同用户的诊断状态发生串联,我严格使用 request_id 作为图计算的 thread_id,实现了请求上下文的物理隔离。
# app/api/diagnosis.py
import uuid
from typing import Any
from fastapi import APIRouter
from loguru import logger
from app.graph.workflow import build_diagnosis_graph
from app.schema.vital_signs import DiagnosisRequest, DiagnosisResult, UrgencyLevel
router = APIRouter(prefix="/diagnosis", tags=["预诊"])
_workflow = build_diagnosis_graph() # 编译一次即可复用
@router.post("/run", response_model=DiagnosisResult, summary="执行预诊流程")
async def run_diagnosis(req: DiagnosisRequest) -> DiagnosisResult:
request_id = req.request_id or uuid.uuid4().hex
vs = req.vital_signs
# 将结构化体征拼接为自然语言描述,作为状态机入口
symptom_text = "、".join(vs.accompanying_symptoms) if vs.accompanying_symptoms else ""
user_input = (f"患儿{vs.name},{'男' if vs.gender == 'male' else '女'},"
f"{vs.age_months}月龄,体温{vs.temperature}℃,主诉:{vs.chief_complaint}。")
if symptom_text:
user_input += f"伴随症状:{symptom_text}。"
initial_state: dict[str, Any] = {"user_input": user_input}
# 驱动 LangGraph 工作流,使用 request_id 隔离上下文
config = {"configurable": {"thread_id": request_id}}
final_state = await _workflow.ainvoke(initial_state, config=config)
# 从状态机结果中提取字段,映射为标准的响应体返回
triage = final_state.get("triage_features", {})
return DiagnosisResult(
request_id=request_id,
urgency=UrgencyLevel(triage.get("urgency_level", "medium")),
triage_suggestion=triage.get("suggested_department", "儿科门诊"),
preliminary_diagnosis=[final_state.get("diagnosis_proposal", "")] if final_state.get("diagnosis_proposal") else [],
treatment_plan=final_state.get("final_report"),
reasoning=f"博弈轮次: {final_state.get('retry_count', 0)}"
)
四、遇到的问题及收获
-
状态字典(State)的数据空值异常: 在测试工作流闭环时,我发现如果某次诊断直接触发了
Fallback熔断节点(例如重试超过 3 次),系统会跳过PlanAgent,导致最终的final_state中缺少diagnosis_proposal字段,进而在接口层引发字典取值报错。为了解决这个问题,我全面排查了状态提取逻辑,将所有的直接键访问(state["key"])改写为带有默认值的防守型编程(state.get("key", default)),确保了接口响应的健壮性。 -
个人工程能力的沉淀 :本周的代码量并不算特别庞大,但逻辑非常绕。从基类抽象、Mock 逻辑的严谨性推敲,再到最后工作流的无缝接入,通过一遍遍运行workflow测试程序,看着终端里日志严格按照我设定的重试逻辑精准跳跃,我深刻体会到了架构设计“高内聚、低耦合”的魅力。
五、展望
目前在大家的努力下,系统的整体框架已经搭建完成。下周,我将着手把这些Mock节点替换为真实的LLM调用,让系统真正具备“思考”的能力。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)