Multi-Agent协作系统设计:Supervisor路由与Agent通信实战

摘要:当单一AI Agent无法处理复杂任务时,如何构建一个多角色协作的专家团队?本文基于一个真实的跑步教练AI项目,详细解析Multi-Agent系统的架构设计与实现。我们将深入源码,结合流程图和调用链,展示如何通过Supervisor Agent实现智能路由、如何定义Data/Health/Knowledge/Coach四个专业角色的职责边界,以及如何设计Agent间的通信协议。这套方案将复杂任务的解决率提升了40%,是构建企业级AI应用的核心模式。


一、背景:单一Agent的局限性

在项目初期,我使用了一个“全能型”Coach Agent来处理所有请求。但随着功能增加,问题逐渐暴露:

问题1:上下文窗口爆炸

场景:用户问“分析我的训练数据并给出建议”。

现象

  • Agent需要同时加载:用户历史数据、运动生理学知识、伤病评估模型、训练计划模板。
  • Prompt长度超过10k tokens,导致LLM响应变慢且容易遗漏关键信息。

问题2:专业度不足

场景:用户问“膝盖外侧疼痛怎么办”。

现象

  • 通用Agent给出的建议往往是“休息、冰敷”等泛泛之谈。
  • 缺乏像物理治疗师那样的专业深度(如ITBS的具体康复动作)。

问题3:工具调用混乱

场景:Agent同时拥有查询数据库、检索知识库、调用计算器等10个工具。

现象

  • LLM经常选错工具,或者在不需要计算时强行计算。
  • ReAct循环次数过多,导致Token成本激增。

二、解决方案:Multi-Agent协作架构

为了解决上述问题,我设计了**“1+4”协作架构**:

Memory Layer (共享记忆)

Execution Layer (专家团队)

Control Layer (大脑)

路由指令

路由指令

路由指令

路由指令

Supervisor Agent
智能路由

Data Agent
数据分析专家

Health Agent
健康评估专家

Knowledge Agent
理论知识专家

Coach Agent
综合决策专家

PostgreSQL + Redis

RAG Knowledge Base

核心原则

  1. 各司其职:每个Agent只负责自己最擅长的领域。
  2. 统一出口:所有结果最终汇总到Coach Agent,由它生成对用户友好的回答。
  3. 动态编排:根据用户需求,Supervisor决定调用哪些Agent。

三、核心实现:四大专业Agent

3.1 Data Agent:数据分析专家

职责:从数据库提取用户跑步记录,计算VO2max、配速趋势等指标。

文件位置:app/services/agents/data_agent.py

class DataAgent:
    async def run(self, query: str, user_id: str) -> dict:
        """执行数据分析"""
        # 1. 提取时间范围(如“最近7天”)
        days = self._extract_days(query)
        
        # 2. 查询数据库
        records = await db_service.get_run_records(user_id, days)
        
        # 3. 计算统计指标
        metrics = {
            "total_distance": sum(r.distance for r in records),
            "avg_pace": self._calc_avg_pace(records),
            "vo2max_trend": self._calc_vo2max_trend(records)
        }
        
        return {"type": "data_analysis", "metrics": metrics}

3.2 Health Agent:健康评估专家

职责:结合RAG检索伤病知识,评估用户身体状况。

文件位置:app/services/agents/health_agent.py

class HealthAgent:
    async def run(self, query: str) -> dict:
        """执行健康评估"""
        # 1. RAG检索相关伤病知识
        rag_context = rag_service.rag_pipeline(query)
        
        # 2. 结合症状给出专业建议
        prompt = f"基于以下医学知识评估用户状况:{rag_context['context']}"
        advice = await llm.ainvoke(prompt)
        
        return {"type": "health_assessment", "advice": advice.content}

3.3 Knowledge Agent:理论知识专家

职责:回答关于跑步理论、训练方法、营养学等通用问题。

3.4 Coach Agent:综合决策专家

职责:整合各方信息,生成最终的个性化建议。它是唯一直接面对用户的Agent。


四、Supervisor Agent:智能路由机制

4.1 路由逻辑

Supervisor不处理具体业务,它只做一件事:识别意图并分发任务

文件位置:app/services/agents/supervisor_agent.py

class SupervisorAgent:
    def __init__(self):
        self.router_llm = ChatOpenAI(model="qwen-turbo", temperature=0)
    
    async def route(self, query: str) -> List[str]:
        """
        根据查询内容决定需要哪些Agent参与
        返回:['data_agent', 'coach_agent']
        """
        prompt = f"""
        分析用户意图,选择需要的Agent(可多选):
        - data_agent: 涉及个人跑步数据、成绩分析
        - health_agent: 涉及伤病、疼痛、身体状况
        - knowledge_agent: 涉及跑步理论、概念解释
        - coach_agent: 涉及训练计划、综合建议
        
        用户查询:{query}
        请返回JSON格式的Agent列表。
        """
        
        result = await self.router_llm.ainvoke(prompt)
        return json.loads(result.content)["agents"]

4.2 并行调度

一旦确定了参与的Agent,Supervisor会并行触发它们:

async def execute_collaboration(self, query: str, user_id: str):
    # 1. 路由决策
    agents_needed = await self.route(query)
    
    # 2. 并行执行
    tasks = []
    if "data_agent" in agents_needed:
        tasks.append(self.data_agent.run(query, user_id))
    if "health_agent" in agents_needed:
        tasks.append(self.health_agent.run(query))
    
    results = await asyncio.gather(*tasks)
    
    # 3. 汇总给Coach Agent
    final_answer = await self.coach_agent.synthesize(query, results)
    return final_answer

五、Agent通信协议

为了让不同Agent能“听懂”彼此的话,我们定义了统一的通信Schema。

文件位置:app/schemas/agent_schema.py

class AgentMessage(BaseModel):
    sender: str           # 发送者ID
    receiver: str         # 接收者ID
    message_type: str     # 类型:data/advice/knowledge
    content: Dict[str, Any] # 具体内容
    timestamp: float      # 时间戳

实际案例

{
  "sender": "data_agent",
  "receiver": "coach_agent",
  "message_type": "data_analysis",
  "content": {
    "avg_pace": "5:30/km",
    "weekly_mileage": 45.5
  }
}

六、完整调用链追踪

6.1 典型场景:带伤训练咨询

Coach Agent Health Agent Data Agent Supervisor 用户 Coach Agent Health Agent Data Agent Supervisor 用户 Step 1: 意图识别 par [并行执行] Prompt: 用户想跑40km,但有ITBS风险... "我膝盖有点疼,这周还能按计划跑吗?" 检测到“膝盖疼” → 需要 Health + Data 评估膝盖疼痛风险 获取本周训练计划 返回:疑似ITBS,建议减量 返回:本周计划跑量40km 综合信息生成建议 返回:建议减半跑量,加入康复训练 显示最终建议

七、踩坑记录与解决方案

坑1:Agent间信息丢失

现象:Data Agent算出了VO2max,但Coach Agent生成的回答里没提。

原因:中间传递时字段名不一致。

解决方案:严格遵循Pydantic Schema,并在Coach Agent入口处做数据校验。

坑2:路由死循环

现象:Supervisor把任务分给A,A又问Supervisor该怎么办。

原因:Agent内部逻辑没写好,遇到了不知道处理的情况。

解决方案:设置最大协作轮次(Max Turns),超过轮次强制由Coach Agent输出。


八、总结与展望

核心价值

  1. 专业度提升:每个Agent都可以针对特定领域进行微调或优化Prompt。
  2. 并发效率:通过并行调度,显著降低了多步骤任务的总耗时。
  3. 易于扩展:新增一个“营养Agent”只需注册即可,不影响现有逻辑。

后续优化

  1. 动态Prompt:根据用户反馈自动调整Agent的说话风格。
  2. Agent记忆:让Agent记住之前的协作经验,减少重复沟通。

九、完整源码

GitHub仓库AiRunCoachAgent

快速演示AiRunCoachAgent

核心文件清单

app/
├── services/
│   ├── agents/
│   │   ├── supervisor_agent.py      # 路由中枢
│   │   ├── data_agent.py            # 数据专家
│   │   ├── health_agent.py          # 健康专家
│   │   └── coach_agent.py           # 综合决策
│   └── collaboration_service.py     # 协作编排服务

如果你觉得这篇文章对你有帮助,欢迎点赞、收藏、转发!有任何问题或建议,请在评论区留言讨论。 🏃‍♂️💨

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐