手写多智能体协作系统:从零构建 Multi-Agent 框架
引言
单个 AI Agent 能做不少事,但当任务变复杂——比如「分析一份财报 → 提取关键指标 → 对比行业均值 → 生成投资建议报告」——单一 Agent 的上下文窗口、工具集、推理深度都会到达瓶颈。
这就是 Multi-Agent 系统登场的场景。把复杂任务拆解为多个专业 Agent 协作完成,每个 Agent 各司其职:搜索 Agent 专注找信息,分析 Agent 专注推理,写作 Agent 专注输出。
2025 年以来,Multi-Agent 从学术概念走向了生产实践。微软的 AutoGen、Google 的 Agent2Agent、Anthropic 的 MCP 协议,都在解决同一个问题:多个 AI 系统如何高效协作。
本文从最底层开始,手写一个完整的 Multi-Agent 编排框架。读完你将掌握:
- Multi-Agent 的核心架构模式(Sequential / Hierarchical / Debate / Swarm)
- 如何用 Python 实现 Agent 间消息传递与任务调度
- 如何使用路由、广播、排队等模式控制协作流程
- 真实场景中的扩缩容、容错、监控方案
一、Multi-Agent 的本质:分工与通信
1.1 为什么需要多个 Agent?
先看一个现实场景:
用户: "帮我分析一下特斯拉2025年Q4的财报,对比比亚迪的表现,最后写成一篇500字的市场分析"
单个 Agent 完成这个任务需要的能力:
- ✅ 理解自然语言请求
- ❌ 需要搜索财报数据(需要网络搜索工具)
- ❌ 需要对比分析(需要数据分析能力)
- ❌ 需要结构化输出(需要长文写作能力)
更关键的问题是上下文污染——搜索过程的中间数据会挤占写作阶段可用的上下文空间。如果搜索阶段耗时很长,到写作时模型可能已经「忘了」最初的任务要求。
1.2 四种主流架构模式
在动手写代码之前,先理解四种基本模式:
模式 1:Sequential(流水线)
Agent A → Agent B → Agent C
最简单、最可控。前一个 Agent 的输出是下一个的输入。适合有明确流程的任务。
模式 2:Hierarchical(管理者-执行者)
Orchestrator
/ | \
Agent A Agent B Agent C
一个 Orchestrator Agent 负责任务分解、分发和结果汇总。适合复杂任务的智能拆解。
模式 3:Debate(辩论模式)
Agent A ←→ Agent B
↕
Evaluator
多个 Agent 从不同角度分析同一个问题,由裁决者汇总并达成共识。适合需要多角度分析的高风险决策。
模式 4:Swarm(群集模式)
Agent A → Agent B
↑ ↓
Agent D ← Agent C
Agent 之间可以自由通信和转发任务。灵活但难控制,适合探索性任务。
二、从零实现 Multi-Agent 框架
我们从最通用也最强大的 Hierarchical 模式开始,然后扩展到其他模式。
2.1 Agent 基类——所有 Agent 的抽象
import json
import logging
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional
from dataclasses import dataclass, field
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("multi_agent")
@dataclass
class Message:
"""Agent 间传递的消息"""
sender: str
recipient: str # "all" 表示广播
content: str
message_type: str = "text" # text / task / result / error / command
metadata: Dict[str, Any] = field(default_factory=dict)
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
def to_dict(self) -> dict:
return {
"sender": self.sender,
"recipient": self.recipient,
"content": self.content,
"message_type": self.message_type,
"metadata": self.metadata,
"timestamp": self.timestamp
}
class BaseAgent(ABC):
"""所有 Agent 的抽象基类"""
def __init__(self, name: str, system_prompt: str,
model_client: Any, max_turns: int = 3):
self.name = name
self.system_prompt = system_prompt
self.model = model_client
self.max_turns = max_turns
self.mailbox: List[Message] = [] # 收件箱
self.history: List[Message] = [] # 已处理的消息
def receive(self, message: Message):
"""接收消息到收件箱"""
self.mailbox.append(message)
@abstractmethod
def process(self) -> List[Message]:
"""处理收件箱中的消息,返回发往外部的消息列表"""
pass
def _call_llm(self, messages: list) -> str:
"""调用 LLM(简化的工具调用)"""
payload = {
"model": "gpt-4o",
"messages": messages,
"temperature": 0.3,
}
# 实际的模型调用
response = self.model.chat_completion(payload)
return response["content"]
def get_conversation_context(self) -> list:
"""构建包含系统提示和历史上下文的对话"""
ctx = [{"role": "system", "content": self.system_prompt}]
for msg in self.history[-10:]: # 保留最近 10 条
role = "user" if msg.sender != self.name else "assistant"
ctx.append({"role": role, "content": f"[来自 {msg.sender}]: {msg.content}"})
return ctx
def clear_mailbox(self):
"""清空收件箱"""
self.mailbox.clear()
2.2 专用 Agent 实现
class ResearcherAgent(BaseAgent):
"""搜索研究员——负责查找和整理信息"""
def __init__(self, name: str, model_client: Any):
super().__init__(
name=name,
system_prompt=(
"你是研究分析员。你的职责是:\n"
"1. 理解收到的研究任务\n"
"2. 利用可用的工具查找信息\n"
"3. 整理高质量的研究摘要返回\n"
"如果无法找到信息,明确说明不确定性,不要编造数据。"
),
model_client=model_client
)
def process(self) -> List[Message]:
outputs = []
for msg in self.mailbox:
self.history.append(msg)
if msg.message_type == "task":
context = self.get_conversation_context()
context.append({"role": "user", "content": msg.content})
# 研究分析
research_result = self._call_llm(context)
outputs.append(Message(
sender=self.name,
recipient=msg.sender, # 回复给发送者
content=research_result,
message_type="result",
metadata={"original_task": msg.content}
))
self.clear_mailbox()
return outputs
class AnalystAgent(BaseAgent):
"""分析员——负责深度分析和对比"""
def __init__(self, name: str, model_client: Any):
super().__init__(
name=name,
system_prompt=(
"你是数据分析师。你的专长是:\n"
"1. 对比不同来源的数据\n"
"2. 识别趋势和异常\n"
"3. 给出数据驱动的洞察\n"
"输出结构化分析,包含明确的数据支撑。"
),
model_client=model_client
)
def process(self) -> List[Message]:
outputs = []
for msg in self.mailbox:
self.history.append(msg)
if msg.message_type in ("task", "result"):
context = self.get_conversation_context()
context.append({"role": "user", "content": msg.content})
analysis = self._call_llm(context)
outputs.append(Message(
sender=self.name,
recipient=msg.sender,
content=analysis,
message_type="result"
))
self.clear_mailbox()
return outputs
class WriterAgent(BaseAgent):
"""写手——负责最终输出格式化"""
def __init__(self, name: str, model_client: Any):
super().__init__(
name=name,
system_prompt=(
"你是专业写手。你的职责:\n"
"1. 基于研究结果和分析输出,组织成通顺的文章\n"
"2. 确保逻辑连贯、语言流畅\n"
"3. 保持客观中立,不要添加原文中没有的数据\n"
"注意:引用时注明数据来源。"
),
model_client=model_client
)
def process(self) -> List[Message]:
outputs = []
for msg in self.mailbox:
self.history.append(msg)
if msg.message_type == "result":
context = self.get_conversation_context()
context.append({"role": "user", "content":
f"请基于以下信息,撰写最终报告:\n\n{msg.content}"})
final_text = self._call_llm(context)
outputs.append(Message(
sender=self.name,
recipient=msg.sender,
content=final_text,
message_type="result"
))
self.clear_mailbox()
return outputs
2.3 Orchestrator——系统的智能大脑
Orchestrator 是整个框架的灵魂。它负责:
1. 接收用户请求
2. 决定如何拆解任务
3. 分派任务给合适的 Agent
4. 收集结果并选择下一轮动作(分发 or 汇总)
class Orchestrator:
"""多 Agent 系统的编排器"""
def __init__(self, agents: Dict[str, BaseAgent],
model_client: Any,
max_rounds: int = 10):
self.agents = agents
self.model = model_client
self.max_rounds = max_rounds
self.orchestrator_prompt = (
"你是多 Agent 系统的编排器。你的角色:\n"
"1. 拆解用户的任务为子任务\n"
"2. 为每个子任务选择最合适的 Agent\n"
f"可用 Agent: {', '.join(agents.keys())}\n"
"3. 根据 Agent 的返回结果决定下一步\n"
" - 结果为 'SUMMARY' 类型时,整理最终输出\n"
" - 结果仍需处理时,继续分派\n"
"输出格式:\n"
" ASSIGN:<agent_name>:<task_description>\n"
" SUMMARY:<final_output>"
)
def _plan(self, user_request: str, context: list) -> str:
"""编排器推理决策"""
messages = [
{"role": "system", "content": self.orchestrator_prompt},
*context,
{"role": "user", "content": user_request}
]
return self._call_llm(messages)
def _call_llm(self, messages: list) -> str:
payload = {
"model": "gpt-4o",
"messages": messages,
"temperature": 0.2, # 编排决策用低温度
}
resp = self.model.chat_completion(payload)
return resp["content"]
def run(self, user_request: str) -> str:
"""执行 Multi-Agent 流程"""
# 消息总线:存储所有 Agent 产生的消息
message_queue: List[Message] = []
context_history = []
# Step 1: 编排器首次规划
plan = self._plan(user_request, [])
logger.info(f"[Orchestrator] 规划: {plan}")
round_count = 0
while round_count < self.max_rounds:
round_count += 1
logger.info(f"\n=== 第 {round_count} 轮 ===")
# Step 2: 解析编排指令
if plan.startswith("SUMMARY:"):
# 编排器认为可以输出了
final = plan[len("SUMMARY:"):].strip()
logger.info(f"[完成] 最终输出")
return final
if plan.startswith("ASSIGN:"):
# 解析 ASSIGN:agent_name:task
parts = plan.split(":", 2)
if len(parts) < 3:
raise ValueError(f"无效的编排指令: {plan}")
_, target_agent, task = parts
target_agent = target_agent.strip()
# 发送任务消息
task_msg = Message(
sender="orchestrator",
recipient=target_agent,
content=task.strip(),
message_type="task",
metadata={"round": round_count}
)
# 将消息发送给目标 Agent
if target_agent in self.agents:
self.agents[target_agent].receive(task_msg)
logger.info(f" → 派发任务给 {target_agent}: {task[:100]}...")
# Step 3: Agent 处理并返回
outputs = self.agents[target_agent].process()
# Step 4: 收集结果
for output in outputs:
result_summary = output.content[:200]
logger.info(f" ← {output.sender} 返回: {result_summary}...")
message_queue.append(output)
context_history.append({
"role": "assistant",
"content": f"[{output.sender} 完成]: {output.content}"
})
# Step 5: 编排器根据结果再做下一步规划
context_summary = "\n".join(
[c["content"][:500] for c in context_history[-5:]]
)
plan = self._plan(
f"当前进度:已完成第 {round_count} 轮。\n"
f"最新结果:{outputs[-1].content[:500]}\n"
f"如果还有未完成的分析,继续分派。如果信息足够,输出 SUMMARY。",
[]
)
logger.info(f"[Orchestrator] 下一步规划: {plan}")
else:
raise ValueError(f"未知 Agent: {target_agent}")
else:
raise ValueError(f"编排器输出格式错误: {plan}")
return "任务未在规定轮数内完成,请简化请求。"
2.4 完整运行示例
# 初始化所有组件
model = ModelClient(api_key="your-api-key")
# 创建三个专业的 Agent
researcher = ResearcherAgent("researcher", model)
analyst = AnalystAgent("analyst", model)
writer = WriterAgent("writer", model)
# 注册到编排器
orchestrator = Orchestrator(
agents={
"researcher": researcher,
"analyst": analyst,
"writer": writer
},
model_client=model
)
# 执行任务
result = orchestrator.run(
"帮我分析一下2025年全球AI芯片市场格局,"
"对比NVIDIA、AMD、华为三家公司的技术路线,"
"最后写一个300字的市场分析摘要。"
)
print("\n最终输出:" + "-" * 40)
print(result)
执行过程中 Orchestrator 的输出类似:
=== 第 1 轮 ===
[Orchestrator] 规划: ASSIGN:researcher:研究2025年全球AI芯片市场格局,重点查找NVIDIA、AMD、华为三家的市场份额和技术路线
→ 派发任务给 researcher
← researcher 返回: [研究摘要]...
=== 第 2 轮 ===
[Orchestrator] 规划: ASSIGN:analyst:对比NVIDIA、AMD、华为三家的技术路线差异和市场表现
→ 派发任务给 analyst
← analyst 返回: [对比分析]...
=== 第 3 轮 ===
[Orchestrator] 规划: ASSIGN:writer:基于研究和分析结果,撰写300字市场分析摘要
→ 派发任务给 writer
← writer 返回: [最终报告]...
=== 第 4 轮 ===
[Orchestrator] 规划: SUMMARY:... (最终输出)
三、扩展模式:更多协作架构
3.1 Sequential Pipeline
class SequentialPipeline:
"""按顺序依次执行的流水线"""
def __init__(self, stages: List[BaseAgent]):
self.stages = stages
def run(self, initial_input: str) -> str:
current = initial_input
for stage in self.stages:
logger.info(f"[Pipeline] → {stage.name} 处理中")
# 发送消息
msg = Message(
sender="pipeline",
recipient=stage.name,
content=current,
message_type="task"
)
stage.receive(msg)
# 处理
outputs = stage.process()
# 取最后一个输出作为下一阶段的输入
if outputs:
current = outputs[-1].content
return current
# 使用示例
pipeline = SequentialPipeline([
ResearcherAgent("researcher", model),
AnalystAgent("analyst", model),
WriterAgent("writer", model)
])
result = pipeline.run("分析RISC-V架构在AI推理芯片中的应用前景。")
3.2 Debate 模式:多角度分析
class DebateOrchestrator:
"""辩论模式——多 Agent 从不同角度分析同一问题"""
def __init__(self, debaters: List[BaseAgent],
evaluator: BaseAgent,
model_client: Any,
max_rounds: int = 3):
self.debaters = debaters
self.evaluator = evaluator
self.model = model_client
self.max_rounds = max_rounds
def run(self, question: str) -> str:
"""执行多轮辩论"""
# 第一轮:各自分析
arguments = []
for agent in self.debaters:
system_prompt_override = (
f"你是 {agent.name},请从你的专业角度分析以下问题。"
"给出观点和数据支撑。"
)
agent.system_prompt = system_prompt_override
msg = Message("debate", agent.name, question, "task")
agent.receive(msg)
outputs = agent.process()
arguments.append({
"agent": agent.name,
"argument": outputs[-1].content
})
# 多轮辩论
for round_num in range(1, self.max_rounds):
logger.info(f"\n=== 辩论第 {round_num} 轮 ===")
# 让每个 Agent 看到其他 Agent 的观点并回应
for i, agent in enumerate(self.debaters):
other_views = [
a for j, a in enumerate(arguments) if j != i
]
critique_prompt = (
f"其他人的观点:\n"
+ "\n".join([f"- {v['agent']}: {v['argument'][:500]}"
for v in other_views])
+ "\n\n请针对以上观点,补充或反驳。"
)
msg = Message("debate", agent.name, critique_prompt, "task")
agent.receive(msg)
outputs = agent.process()
arguments[i]["argument"] += f"\n[反驳]: {outputs[-1].content}"
# 裁决者汇总
summary_prompt = (
f"问题: {question}\n\n"
+ "\n\n".join([
f"### {a['agent']} 的观点:\n{a['argument']}"
for a in arguments
])
+ "\n\n请综合以上观点,给出一个平衡的分析结论。"
)
msg = Message("debate", self.evaluator.name, summary_prompt, "task")
self.evaluator.receive(msg)
final = self.evaluator.process()
return final[-1].content
四、生产环境的关键考量
4.1 容错机制
Agent 可能出错(LLM 幻觉、API 超时、参数错误)。需要一个容错层:
class FaultTolerantOrchestrator(Orchestrator):
"""带容错的编排器"""
def __init__(self, *args, retry_count=2, fallback_agent=None, **kwargs):
super().__init__(*args, **kwargs)
self.retry_count = retry_count
self.fallback_agent = fallback_agent
def _safe_execute(self, agent_name: str, task: str,
retries: int = None) -> Message:
"""带重试的 Agent 执行"""
retries = retries or self.retry_count
for attempt in range(1, retries + 2): # 首次 + 重试
try:
agent = self.agents[agent_name]
msg = Message("orchestrator", agent_name, task, "task")
agent.receive(msg)
outputs = agent.process()
return outputs[-1]
except Exception as e:
logger.warning(f"{agent_name} 第 {attempt} 次执行失败: {e}")
if attempt <= retries:
# 重试时告知前次失败
task = f"前次执行失败 ({e})。请重新完成任务:\n{task}"
continue
# 超过重试次数,使用 fallback
if self.fallback_agent:
fallback_msg = Message(
"orchestrator", self.fallback_agent,
f"[{agent_name} 执行失败] 原始任务: {task}",
"task"
)
self.agents[self.fallback_agent].receive(fallback_msg)
return self.agents[self.fallback_agent].process()[-1]
# 没有 fallback,返回错误信息
return Message(
agent_name, "orchestrator",
f"错误: {agent_name} 在 {retries + 1} 次尝试后仍然失败: {e}",
"error"
)
4.2 消息路由策略
class MessageRouter:
"""灵活的消息路由"""
def __init__(self):
self.routes: Dict[str, List[str]] = {} # topic → [agent_names]
def add_route(self, topic: str, agents: List[str]):
"""注册主题到 Agent 的映射"""
self.routes[topic] = agents
def route(self, message: Message) -> List[str]:
"""根据消息内容决定发给谁"""
if message.recipient != "all":
return [message.recipient]
# 广播:根据主题关键词分发
content_lower = message.content.lower()
matched_agents = set()
for topic, agents in self.routes.items():
if topic.lower() in content_lower:
matched_agents.update(agents)
return list(matched_agents) if matched_agents else ["default"]
4.3 监控与可观测性
class MonitoredOrchestrator(Orchestrator):
"""带指标监控的编排器"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.metrics = {
"total_tasks": 0,
"successful_tasks": 0,
"failed_tasks": 0,
"total_llm_calls": 0,
"total_tokens": 0,
"agent_stats": {},
"latencies": []
}
def run(self, user_request: str) -> str:
import time
start_time = time.time()
result = super().run(user_request)
self.metrics["latencies"].append(time.time() - start_time)
self.metrics["total_tasks"] += 1
logger.info(f"性能指标:")
logger.info(f" 总耗时: {self.metrics['latencies'][-1]:.2f}s")
logger.info(f" LLM 调用次数: {self.metrics['total_llm_calls']}")
logger.info(f" 成功/失败: "
f"{self.metrics['successful_tasks']}/"
f"{self.metrics['failed_tasks']}")
return result
五、常见踩坑与最佳实践
5.1 编排器决策瓶颈
问题:编排器本身也是 LLM,如果编排器决策不准确,整个系统都会出问题。
解决:
- 编排器使用更强的模型(如 GPT-4o / DeepSeek-V3)
- 给编排器提供明确的决策规则(few-shot examples)
- 对常见任务使用 fixed pipeline 而不是编排器动态决策
# 混合策略:常见任务用固定流水线,复杂任务用编排器
def smart_router(user_request: str):
TASK_PATTERNS = {
"分析": ("research", "analyst", "writer"), # sequential
"对比": ("research", "analyst", "writer"),
"写": ("writer",), # 单一
"搜索": ("research",),
}
for keyword, pipeline in TASK_PATTERNS.items():
if keyword in user_request:
return SequentialPipeline([AGENTS[k] for k in pipeline])
# 兜底:编排器动态决策
return DynamicOrchestrator()
5.2 上下文窗口管理
Multi-Agent 系统最大的敌人是上下文膨胀。每个 Agent 的输入包含:
- 系统提示(用来配置行为)
- 历史对话(用来维持上下文)
- 当前任务(核心输入)
如果在消息中传递「所有历史」,很快撑爆上下文。
最佳实践:
- 每次只传递关键信息,不要传递完整日志
- Agent 返回结构化的摘要而非原始数据
- 设置独立的消息大小限制(单条不超过 4K tokens)
5.3 Agent 数量选择
不是 Agent 越多越好。经验法则:
| 任务复杂度 | 推荐 Agent 数 | 架构模式 |
|---|---|---|
| 单步操作 | 1-2 | 单一 Agent + 工具 |
| 多步骤流程 | 3-5 | Sequential Pipeline |
| 多维度分析 | 4-6 | Orchestrator + Specialist |
| 高风险评估 | 5-8 | Debate + Evaluator |
| 大规模批处理 | 10+ | Swarm / Map-Reduce |
超过 5 个 Agent 后,编排本身的复杂性可能超过任务本身的复杂。
六、总结
Multi-Agent 不是玄学,它是一种工程分治策略——把复杂的 AI 任务拆解为多个专业模块的协作。
核心要点:
- 架构选择:Sequential 最可控,Hierarchical 最通用,Debate 最可靠,Swarm 最灵活
- 消息协议:统一的消息格式(Message 类)是系统内通信的基础
- 编排决策:编排器的质量决定了整个系统的上限,在关键路径上用强模型
- 容错设计:Agent 会出错,必须有重试、回退、降级的机制
- 监控优先:没有可观测性的 Multi-Agent 系统就是黑盒子
从零手写一遍之后你会发现,LangChain 的 AgentExecutor、AutoGen 的 GroupChat 本质上就是对本文这些模式的封装。理解了底层原理,你才能真正地用对框架,而不是被框架限制。
本文是「手写 AI 系统」系列的第 6 篇。系列包含:Embedding 搜索 → AI 记忆 → 结构化输出 → Function Calling → Multi-Agent → 更多主题持续更新中。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)