本周我的核心工作主要围绕儿科预诊系统的多智能体协作架构展开。我负责四大核心智能体(分诊、诊断、评审、方案)以及安全兜底智能体的基座设计。为了在接入真实大模型前验证整个状态机的流转逻辑,编写了具备完整雏形架构的 Mock 代码,并在图计算与接口层完成了工作流的集成。

以下是我本周个人工作的记录与设计剖析。

目录

一、 抽象智能体基类:规范调用协议与日志

二、 四大核心业务智能体的设计与Mock实现

三、 工作流路由编排与FastAPI接口集成

四、遇到的问题及收获

五、展望


一、 抽象智能体基类:规范调用协议与日志

在构建复杂的分布式或多模块系统时,最忌讳的就是每个模块拥有各自独立的输入输出格式。PediaMind 系统初步规划了分诊、诊断、评审、方案定稿以及安全兜底等多个 Agent,为了确保后续我在扩展新智能体时具备高度的一致性,我首先设计了 BaseAgent 抽象基类。

设计意图: 这个基类的核心目的在于解耦。通过定义统一的异步 run 方法,并强制约束所有 Agent 的入参和出参必须是 LangGraph 的状态字典(State Dict),我将具体的业务逻辑与外层的图计算路由完全隔离开来。同时,在 __call__ 方法中统一封装了loguru的日志打印,这样在追踪多轮博弈轨迹时,日志输出会非常标准清晰,这极大地降低了调试成本。

# app/agents/base.py
from abc import ABC, abstractmethod
from typing import Any
from loguru import logger

class BaseAgent(ABC):
    """所有 Agent 的抽象基类,规范统一的调用协议与日志。"""
    name: str = "BaseAgent"

    @abstractmethod
    async def run(self, state: dict[str, Any]) -> dict[str, Any]:
        """执行智能体逻辑,接收并返回 LangGraph 状态字典子集。"""
        ...

    async def __call__(self, state: dict[str, Any]) -> dict[str, Any]:
        logger.info(f"[{self.name}] 开始执行")
        result = await self.run(state)
        logger.info(f"[{self.name}] 执行完毕")
        return result

二、 四大核心业务智能体的设计与Mock实现

系统流转的核心依赖于四个业务 Agent:Triage(分诊特征提取)、Diagnosis(初步方案提案)、Review(蓝军对抗审核)和 Plan(最终报告定稿)。在本周的开发中,我为它们逐一编写了Mock逻辑。

设计意图: 为什么不使用一个庞大的 Prompt 让 AI 一步到位?因为在医疗场景下,安全性与严谨性是第一位的。将诊断和评审拆分为互相博弈的“红蓝对抗”机制,是为了确保这种审核逻辑在架构上是合理的。

在当前的 Mock 实现中,硬编码了“前两轮强制拒绝”的逻辑,以此来模拟复杂病情下,系统因为“检测到潜在年龄禁忌(如某类退热药对特定月龄婴儿的限制)”而打回方案、促发多轮打磨的真实博弈过程。

评审拦截逻辑设计:retry_count < 2 时,模拟检测到年龄与药物冲突,返回 is_passed: False 并使得 retry_count 递增;当达到第三轮时才予以放行。若达到系统预设的最大循环次数(如 3 次),则会触发 FallbackAgent 进行安全兜底。

# app/agents/review.py
from typing import Any
from loguru import logger
from app.agents.base import BaseAgent

class ReviewAgent(BaseAgent):
    """评审智能体:实施年龄禁忌与药物冲突核查 (Mock 架构)"""
    name: str = "ReviewAgent"

    async def run(self, state: dict[str, Any]) -> dict[str, Any]:
        features: dict[str, Any] = state.get("triage_features", {})
        retry_count: int = state.get("retry_count", 0)
        logger.info(f"[{self.name}] 对第 {retry_count + 1} 轮方案进行评审")

        # Mock 策略:前两轮强制不通过以演示博弈循环,第三轮通过
        if retry_count < 2:
            feedback = {
                "is_passed": False,
                "reason": (f"第 {retry_count + 1} 轮评审未通过:"
                           f"检测到潜在年龄禁忌,{features.get('age_months', 0)} 月龄患儿"
                           f"需二次确认用药剂量安全范围。")
            }
            return {"review_feedback": feedback, "retry_count": retry_count + 1}

        feedback = {
            "is_passed": True,
            "reason": "诊断方案与用药建议通过年龄禁忌审核与药物冲突检测,准予定稿。"
        }
        return {"review_feedback": feedback}

三、 工作流路由编排与FastAPI接口集成

各个智能体开发完成后,相当于我们有了精良的零部件,接下来需要一条总线将它们串联起来,并对外提供服务入口。

我使用 LangGraph 在 app/graph/workflow.py 中编译了整个状态机,定义了条件路由(如根据 is_passed 决定是生成最终报告还是回退重试)。随后,我在 app/api/diagnosis.py 中实现了系统的总入口 /diagnosis/run

设计意图: 前端传来的数据是结构化的(如姓名、月龄、体温等 JSON 字段),而大模型的输入更倾向于自然语言上下文。因此,我在接口层拦截了请求,将这些结构化体征拼接转化为自然语言描述,作为状态机的初始 user_input 启动工作流。同时,为了防止高并发场景下不同用户的诊断状态发生串联,我严格使用 request_id 作为图计算的 thread_id,实现了请求上下文的物理隔离。

# app/api/diagnosis.py
import uuid
from typing import Any
from fastapi import APIRouter
from loguru import logger
from app.graph.workflow import build_diagnosis_graph
from app.schema.vital_signs import DiagnosisRequest, DiagnosisResult, UrgencyLevel

router = APIRouter(prefix="/diagnosis", tags=["预诊"])
_workflow = build_diagnosis_graph() # 编译一次即可复用

@router.post("/run", response_model=DiagnosisResult, summary="执行预诊流程")
async def run_diagnosis(req: DiagnosisRequest) -> DiagnosisResult:
    request_id = req.request_id or uuid.uuid4().hex
    vs = req.vital_signs
    
    # 将结构化体征拼接为自然语言描述,作为状态机入口
    symptom_text = "、".join(vs.accompanying_symptoms) if vs.accompanying_symptoms else ""
    user_input = (f"患儿{vs.name},{'男' if vs.gender == 'male' else '女'},"
                  f"{vs.age_months}月龄,体温{vs.temperature}℃,主诉:{vs.chief_complaint}。")
    if symptom_text:
        user_input += f"伴随症状:{symptom_text}。"

    initial_state: dict[str, Any] = {"user_input": user_input}
    
    # 驱动 LangGraph 工作流,使用 request_id 隔离上下文
    config = {"configurable": {"thread_id": request_id}}
    final_state = await _workflow.ainvoke(initial_state, config=config)

    # 从状态机结果中提取字段,映射为标准的响应体返回
    triage = final_state.get("triage_features", {})
    return DiagnosisResult(
        request_id=request_id,
        urgency=UrgencyLevel(triage.get("urgency_level", "medium")),
        triage_suggestion=triage.get("suggested_department", "儿科门诊"),
        preliminary_diagnosis=[final_state.get("diagnosis_proposal", "")] if final_state.get("diagnosis_proposal") else [],
        treatment_plan=final_state.get("final_report"),
        reasoning=f"博弈轮次: {final_state.get('retry_count', 0)}"
    )

四、遇到的问题及收获

  1. 状态字典(State)的数据空值异常: 在测试工作流闭环时,我发现如果某次诊断直接触发了 Fallback 熔断节点(例如重试超过 3 次),系统会跳过 PlanAgent,导致最终的 final_state 中缺少 diagnosis_proposal 字段,进而在接口层引发字典取值报错。为了解决这个问题,我全面排查了状态提取逻辑,将所有的直接键访问(state["key"])改写为带有默认值的防守型编程(state.get("key", default)),确保了接口响应的健壮性。

  2. 个人工程能力的沉淀 :本周的代码量并不算特别庞大,但逻辑非常绕。从基类抽象、Mock 逻辑的严谨性推敲,再到最后工作流的无缝接入,通过一遍遍运行workflow测试程序,看着终端里日志严格按照我设定的重试逻辑精准跳跃,我深刻体会到了架构设计“高内聚、低耦合”的魅力。

五、展望

目前在大家的努力下,系统的整体框架已经搭建完成。下周,我将着手把这些Mock节点替换为真实的LLM调用,让系统真正具备“思考”的能力。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐