多Agent协作系统:让AI团队协同工作
·
🤝 Agent团队协作 | 任务分解 + 角色分工 + 结果整合 | CrewAI/LangGraph实战 | 完整项目代码
📖 为什么需要多Agent协作?
单Agent的局限性
# 单Agent处理复杂任务
agent = create_agent(llm, tools)
result = agent.run("分析这家公司并写一份投资报告")
# 问题:
❌ 一个Agent难以兼顾多个专业领域
❌ 容易遗漏重要信息
❌ 输出质量不稳定
❌ 无法并行处理
多Agent协作的优势
# 多Agent团队协作
team = AgentTeam([
ResearcherAgent(), # 研究员 - 收集数据
AnalystAgent(), # 分析师 - 数据分析
WriterAgent(), # 作家 - 撰写报告
ReviewerAgent() # 审核员 - 质量检查
])
result = team.execute("分析这家公司并写一份投资报告")
# 优势:
✅ 专业化分工,各司其职
✅ 并行处理,效率提升
✅ 互相校验,质量更高
✅ 可扩展性强
🏗️ 多Agent协作架构
1. 中心化协调模式
┌─────────────┐
│ Coordinator│
│ (协调者) │
└──────┬──────┘
│
┌────────────┼────────────┐
│ │ │
┌─────▼─────┐ ┌───▼────┐ ┌────▼─────┐
│Researcher │ │Analyst │ │ Writer │
│ (研究员) │ │(分析师)│ │ (作家) │
└───────────┘ └────────┘ └──────────┘
特点:
- 中央协调者负责任务分配和结果整合
- 适合结构化任务
- 易于控制和调试
2. 去中心化协作模式
┌──────────┐ ┌──────────┐
│Researcher│◄────────►│ Analyst │
└──────────┘ └──────────┘
▲ │
│ ▼
┌──────────┐ ┌──────────┐
│Reviewer │◄────────►│ Writer │
└──────────┘ └──────────┘
特点:
- Agent之间直接通信
- 更灵活,适合复杂场景
- 需要设计好通信协议
3. 层级化组织模式
┌──────────────┐
│ Manager │
│ (管理者) │
└──────┬───────┘
│
┌────────────┴────────────┐
│ │
┌────▼────┐ ┌─────▼────┐
│ Team A │ │ Team B │
│(研究组) │ │(写作组) │
└─────────┘ └──────────┘
特点:
- 多层级管理结构
- 适合大型项目
- 可以嵌套使用
🛠️ 核心技术实现
方案1:使用CrewAI框架
安装依赖
pip install crewai crewai-tools langchain-openai
创建Agent团队
from crewai import Agent, Task, Crew
from langchain_openai import ChatOpenAI
# 配置LLM
llm = ChatOpenAI(
model="gpt-4",
temperature=0.7
)
# 创建研究员Agent
researcher = Agent(
role='高级研究员',
goal='深入调研目标公司,收集全面的市场数据和财务信息',
backstory="""你是一位经验丰富的市场研究员,擅长从多个渠道
收集和分析公司信息。你的研究报告以详实、准确著称。""",
verbose=True,
allow_delegation=False,
llm=llm
)
# 创建分析师Agent
analyst = Agent(
role='金融分析师',
goal='分析公司的财务状况和市场前景,提供投资建议',
backstory="""你是一位资深金融分析师,精通财务报表分析和
估值模型。你的分析报告逻辑清晰,结论可靠。""",
verbose=True,
allow_delegation=False,
llm=llm
)
# 创建作家Agent
writer = Agent(
role='专业作家',
goal='基于研究和分析结果,撰写专业的投资报告',
backstory="""你是一位专业的财经作家,擅长将复杂的数据和分析
转化为通俗易懂的报告。你的文章结构清晰,可读性强。""",
verbose=True,
allow_delegation=False,
llm=llm
)
# 定义任务
research_task = Task(
description="""
调研{company}公司,包括:
1. 公司基本信息和发展历程
2. 主要业务和产品线
3. 竞争对手分析
4. 最新的市场动态
""",
agent=researcher,
expected_output="详细的研究调研报告"
)
analysis_task = Task(
description="""
基于研究结果,分析{company}公司:
1. 财务状况(营收、利润、现金流)
2. 竞争优势和风险因素
3. 市场估值和投资价值
""",
agent=analyst,
context=[research_task], # 依赖研究任务的结果
expected_output="专业的财务分析报告"
)
writing_task = Task(
description="""
综合研究和分析结果,撰写{company}公司的投资报告:
1. 执行摘要
2. 公司概况
3. 行业分析
4. 财务分析
5. 投资建议
6. 风险提示
""",
agent=writer,
context=[research_task, analysis_task], # 依赖前两个任务
expected_output="完整的投资报告(Markdown格式)"
)
# 创建团队
crew = Crew(
agents=[researcher, analyst, writer],
tasks=[research_task, analysis_task, writing_task],
verbose=2
)
# 执行任务
result = crew.kickoff(inputs={"company": "特斯拉"})
print(result)
方案2:使用LangGraph实现
安装依赖
pip install langgraph langchain langchain-openai
构建多Agent工作流
from langgraph.graph import StateGraph, END
from typing import TypedDict, List
from langchain_openai import ChatOpenAI
# 定义状态
class MultiAgentState(TypedDict):
company: str
research_result: str
analysis_result: str
final_report: str
messages: List[str]
# 初始化LLM
llm = ChatOpenAI(model="gpt-4", temperature=0.7)
# 研究员节点
def researcher_node(state: MultiAgentState) -> MultiAgentState:
"""研究员:收集公司信息"""
company = state["company"]
prompt = f"""
作为高级研究员,请调研{company}公司:
1. 公司基本信息和发展历程
2. 主要业务和产品线
3. 竞争对手分析
4. 最新的市场动态
请提供详细的研究报告。
"""
response = llm.invoke(prompt)
return {
**state,
"research_result": response.content,
"messages": state["messages"] + [f"研究员完成对{company}的调研"]
}
# 分析师节点
def analyst_node(state: MultiAgentState) -> MultiAgentState:
"""分析师:进行财务分析"""
company = state["company"]
research = state["research_result"]
prompt = f"""
作为金融分析师,请基于以下研究结果分析{company}公司:
研究结果:
{research}
请分析:
1. 财务状况(营收、利润、现金流)
2. 竞争优势和风险因素
3. 市场估值和投资价值
"""
response = llm.invoke(prompt)
return {
**state,
"analysis_result": response.content,
"messages": state["messages"] + ["分析师完成财务分析"]
}
# 作家节点
def writer_node(state: MultiAgentState) -> MultiAgentState:
"""作家:撰写投资报告"""
company = state["company"]
research = state["research_result"]
analysis = state["analysis_result"]
prompt = f"""
作为专业作家,请基于以下材料撰写{company}公司的投资报告:
研究结果:
{research}
分析结果:
{analysis}
报告结构:
1. 执行摘要
2. 公司概况
3. 行业分析
4. 财务分析
5. 投资建议
6. 风险提示
请使用Markdown格式,确保专业性和可读性。
"""
response = llm.invoke(prompt)
return {
**state,
"final_report": response.content,
"messages": state["messages"] + ["作家完成投资报告"]
}
# 构建工作流
workflow = StateGraph(MultiAgentState)
# 添加节点
workflow.add_node("researcher", researcher_node)
workflow.add_node("analyst", analyst_node)
workflow.add_node("writer", writer_node)
# 设置入口点
workflow.set_entry_point("researcher")
# 添加边
workflow.add_edge("researcher", "analyst")
workflow.add_edge("analyst", "writer")
workflow.add_edge("writer", END)
# 编译
app = workflow.compile()
# 执行
initial_state = {
"company": "特斯拉",
"research_result": "",
"analysis_result": "",
"final_report": "",
"messages": []
}
result = app.invoke(initial_state)
print(result["final_report"])
方案3:自定义多Agent框架
核心架构
import asyncio
from typing import List, Dict, Any, Callable
from dataclasses import dataclass
@dataclass
class Agent:
"""Agent定义"""
name: str
role: str
expertise: str
process_fn: Callable
def process(self, task: str, context: Dict[str, Any] = None) -> str:
"""处理任务"""
return self.process_fn(task, context or {})
class AgentCoordinator:
"""Agent协调器"""
def __init__(self):
self.agents: List[Agent] = []
self.task_history: List[Dict[str, Any]] = []
def add_agent(self, agent: Agent):
"""添加Agent"""
self.agents.append(agent)
def assign_task(self, task: str, agent_name: str,
context: Dict[str, Any] = None) -> str:
"""分配任务给指定Agent"""
agent = next((a for a in self.agents if a.name == agent_name), None)
if not agent:
raise ValueError(f"Agent '{agent_name}' not found")
result = agent.process(task, context)
# 记录任务历史
self.task_history.append({
"task": task,
"agent": agent_name,
"result": result
})
return result
def parallel_execute(self, tasks: List[Dict[str, Any]]) -> List[str]:
"""并行执行多个任务
Args:
tasks: 任务列表,每个任务包含:
- task: 任务描述
- agent: Agent名称
- context: 上下文(可选)
Returns:
结果列表
"""
results = []
for task_info in tasks:
result = self.assign_task(
task_info["task"],
task_info["agent"],
task_info.get("context")
)
results.append(result)
return results
def sequential_execute(self, tasks: List[Dict[str, Any]]) -> List[str]:
"""顺序执行任务链
Args:
tasks: 任务列表,后一个任务可以使用前一个任务的结果
Returns:
结果列表
"""
results = []
context = {}
for i, task_info in enumerate(tasks):
# 将之前的结果加入上下文
context["previous_results"] = results
result = self.assign_task(
task_info["task"],
task_info["agent"],
context
)
results.append(result)
# 更新上下文
context[f"result_{i}"] = result
return results
def get_task_history(self) -> List[Dict[str, Any]]:
"""获取任务历史"""
return self.task_history
# ==================== 使用示例 ====================
def create_research_agent():
"""创建研究员Agent"""
def process(task: str, context: Dict[str, Any]) -> str:
# 这里可以调用LLM API
return f"[研究员] 完成调研:{task}"
return Agent(
name="researcher",
role="研究员",
expertise="市场调研、数据分析",
process_fn=process
)
def create_analyst_agent():
"""创建分析师Agent"""
def process(task: str, context: Dict[str, Any]) -> str:
previous = context.get("previous_results", [])
return f"[分析师] 基于{len(previous)}个前期结果进行分析:{task}"
return Agent(
name="analyst",
role="分析师",
expertise="财务分析、风险评估",
process_fn=process
)
def create_writer_agent():
"""创建作家Agent"""
def process(task: str, context: Dict[str, Any]) -> str:
previous = context.get("previous_results", [])
return f"[作家] 基于{len(previous)}个前期结果撰写报告:{task}"
return Agent(
name="writer",
role="作家",
expertise="报告撰写、内容创作",
process_fn=process
)
# 创建协调器
coordinator = AgentCoordinator()
# 添加Agent
coordinator.add_agent(create_research_agent())
coordinator.add_agent(create_analyst_agent())
coordinator.add_agent(create_writer_agent())
# 顺序执行任务链
tasks = [
{"task": "调研特斯拉公司", "agent": "researcher"},
{"task": "分析财务状况", "agent": "analyst"},
{"task": "撰写投资报告", "agent": "writer"}
]
results = coordinator.sequential_execute(tasks)
for i, result in enumerate(results):
print(f"\n任务{i+1}结果:")
print(result)
🎯 实战案例:智能投研系统
系统架构
用户输入公司名称
│
▼
┌──────────────┐
│ 任务分解Agent │ ← 分析需求,拆分子任务
└──────┬───────┘
│
┌────┴────┬────────┬────────┐
▼ ▼ ▼ ▼
┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐
│新闻 │ │财务 │ │竞争 │ │行业 │
│采集 │ │分析 │ │对手 │ │研究 │
└──┬───┘ └──┬───┘ └──┬───┘ └──┬───┘
│ │ │ │
└────────┴────────┴────────┘
│
▼
┌─────────────┐
│ 整合Agent │ ← 汇总所有信息
└──────┬──────┘
│
▼
┌─────────────┐
│ 写作Agent │ ← 生成最终报告
└──────┬──────┘
│
▼
输出投资报告
完整实现
from crewai import Agent, Task, Crew, Process
from langchain_openai import ChatOpenAI
# 配置LLM
llm = ChatOpenAI(model="gpt-4", temperature=0.7)
# ==================== 创建Agent团队 ====================
# 1. 任务分解专家
task_decomposer = Agent(
role='任务分解专家',
goal='将复杂的投研需求分解为可执行的子任务',
backstory="""你是一位经验丰富的项目经理,擅长将复杂的大任务
分解为清晰的、可执行的子任务。你能识别任务之间的依赖关系。""",
llm=llm,
verbose=True
)
# 2. 新闻采集专家
news_collector = Agent(
role='新闻采集专家',
goal='收集目标公司的最新新闻动态和市场舆情',
backstory="""你是一位资深的新闻记者,擅长从各种渠道收集
最新的公司新闻和市场信息。你对市场敏感度高。""",
llm=llm,
verbose=True
)
# 3. 财务分析专家
financial_analyst = Agent(
role='财务分析专家',
goal='深入分析公司的财务报表和关键财务指标',
backstory="""你是一位CFA持证的财务分析师,精通财务报表分析、
比率分析和估值模型。你的分析严谨、专业。""",
llm=llm,
verbose=True
)
# 4. 竞争对手分析专家
competitor_analyst = Agent(
role='竞争对手分析专家',
goal='分析目标公司的竞争对手和市场竞争格局',
backstory="""你是一位战略咨询顾问,擅长竞争分析和市场定位。
你能识别公司的竞争优势和劣势。""",
llm=llm,
verbose=True
)
# 5. 行业研究专家
industry_researcher = Agent(
role='行业研究专家',
goal='研究目标公司所在行业的发展趋势和前景',
backstory="""你是一位行业研究专家,深入了解各个行业的发展
动态、政策环境和未来趋势。你的视野宏观、前瞻。""",
llm=llm,
verbose=True
)
# 6. 报告整合专家
report_integrator = Agent(
role='报告整合专家',
goal='整合各方分析结果,形成完整的投资报告',
backstory="""你是一位资深投行分析师,擅长整合多方信息,
形成逻辑清晰、结论明确的投资报告。""",
llm=llm,
verbose=True
)
# 7. 报告写作专家
report_writer = Agent(
role='报告写作专家',
goal='将整合的分析结果转化为专业的投资报告文档',
backstory="""你是一位专业的财经作家,擅长将复杂的分析
转化为通俗易懂、结构清晰的报告文档。""",
llm=llm,
verbose=True
)
# ==================== 定义任务 ====================
def create_tasks(company: str):
"""创建任务列表"""
# 任务1:分解投研需求
decompose_task = Task(
description=f"""
将"{company}公司投资分析"这个需求分解为具体的子任务,
包括需要收集的信息类型和分析维度。
""",
agent=task_decomposer,
expected_output="详细的任务分解清单"
)
# 任务2:收集新闻
news_task = Task(
description=f"""
收集{company}公司的最新新闻动态,包括:
1. 最近3个月的重要新闻
2. 市场舆情和投资者情绪
3. 重大事件和影响
""",
agent=news_collector,
expected_output="新闻和舆情汇总报告"
)
# 任务3:财务分析
financial_task = Task(
description=f"""
分析{company}公司的财务状况,包括:
1. 近3年的营收、利润、现金流趋势
2. 关键财务比率(ROE、ROA、负债率等)
3. 盈利能力和成长性评估
""",
agent=financial_analyst,
expected_output="详细的财务分析报告"
)
# 任务4:竞争对手分析
competitor_task = Task(
description=f"""
分析{company}公司的竞争格局,包括:
1. 主要竞争对手及其市场份额
2. 竞争优势和劣势对比
3. 竞争策略分析
""",
agent=competitor_analyst,
expected_output="竞争对手分析报告"
)
# 任务5:行业研究
industry_task = Task(
description=f"""
研究{company}公司所在行业,包括:
1. 行业规模和发展趋势
2. 政策环境和监管要求
3. 行业机会和挑战
""",
agent=industry_researcher,
expected_output="行业研究报告"
)
# 任务6:整合分析
integrate_task = Task(
description=f"""
整合以下分析结果,形成{company}公司的综合分析:
- 新闻和舆情
- 财务分析
- 竞争对手分析
- 行业研究
提炼关键发现和投资要点。
""",
agent=report_integrator,
context=[news_task, financial_task, competitor_task, industry_task],
expected_output="综合分析摘要"
)
# 任务7:撰写报告
write_task = Task(
description=f"""
基于综合分析,撰写{company}公司的投资报告,包括:
## 1. 执行摘要
- 核心观点和建议
## 2. 公司概况
- 基本信息和业务模式
## 3. 行业分析
- 行业现状和趋势
## 4. 竞争分析
- 竞争格局和优势
## 5. 财务分析
- 财务状况和估值
## 6. 投资建议
- 评级和目标价
## 7. 风险提示
- 主要风险因素
请使用专业的投行报告格式。
""",
agent=report_writer,
context=[integrate_task],
expected_output="完整的投资报告(Markdown格式)"
)
return [
decompose_task,
news_task,
financial_task,
competitor_task,
industry_task,
integrate_task,
write_task
]
# ==================== 执行投研流程 ====================
def run_investment_research(company: str):
"""运行智能投研系统"""
print(f"🚀 开始对{company}进行智能投研分析...\n")
# 创建任务
tasks = create_tasks(company)
# 创建团队(使用层级化流程)
crew = Crew(
agents=[
task_decomposer,
news_collector,
financial_analyst,
competitor_analyst,
industry_researcher,
report_integrator,
report_writer
],
tasks=tasks,
process=Process.hierarchical, # 层级化执行
verbose=2
)
# 执行
result = crew.kickoff()
print("\n✅ 投研分析完成!")
print("\n" + "="*60)
print("投资报告:")
print("="*60)
print(result)
return result
# 运行
if __name__ == "__main__":
report = run_investment_research("特斯拉")
📊 性能优化技巧
1. 并行执行独立任务
# 串行执行(慢)
result1 = agent1.run(task1)
result2 = agent2.run(task2)
result3 = agent3.run(task3)
# 并行执行(快)
import asyncio
async def run_parallel():
tasks = [
agent1.arun(task1),
agent2.arun(task2),
agent3.arun(task3)
]
results = await asyncio.gather(*tasks)
return results
2. 缓存中间结果
from functools import lru_cache
@lru_cache(maxsize=100)
def cached_research(company: str) -> str:
"""缓存研究结果"""
return perform_research(company)
3. 任务优先级调度
from queue import PriorityQueue
class TaskScheduler:
def __init__(self):
self.task_queue = PriorityQueue()
def add_task(self, priority: int, task: dict):
"""添加任务(优先级越小越优先)"""
self.task_queue.put((priority, task))
def execute_all(self):
"""按优先级执行所有任务"""
results = []
while not self.task_queue.empty():
priority, task = self.task_queue.get()
result = execute_task(task)
results.append(result)
return results
🔍 调试与监控
1. 添加日志
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('MultiAgentSystem')
def researcher_node(state):
logger.info(f"开始调研:{state['company']}")
# ... 执行调研
logger.info("调研完成")
return state
2. 追踪执行流程
import time
class ExecutionTracker:
def __init__(self):
self.timeline = []
def record(self, agent: str, action: str, duration: float):
"""记录执行信息"""
self.timeline.append({
"timestamp": time.time(),
"agent": agent,
"action": action,
"duration": duration
})
def visualize(self):
"""可视化执行时间线"""
for event in self.timeline:
print(f"[{event['agent']}] {event['action']} ({event['duration']:.2f}s)")
3. 异常处理
def safe_execute(agent, task, max_retries=3):
"""安全执行任务,带重试机制"""
for attempt in range(max_retries):
try:
result = agent.run(task)
return result
except Exception as e:
logger.warning(f"Attempt {attempt+1} failed: {e}")
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt) # 指数退避
💡 最佳实践总结
1. Agent设计原则
| 原则 | 说明 | 示例 |
|---|---|---|
| 单一职责 | 每个Agent专注一个领域 | 研究员只负责调研 |
| 明确边界 | 清晰定义输入输出 | 指定expected_output |
| 专业化 | 赋予专业背景和角色 | 设定backstory |
| 可复用 | Agent可在不同任务中复用 | 通用分析师 |
2. 任务设计原则
| 原则 | 说明 | 技巧 |
|---|---|---|
| 原子性 | 任务足够小且明确 | 避免模糊描述 |
| 依赖性 | 明确任务间的依赖 | 使用context参数 |
| 可验证 | 有明确的验收标准 | 定义expected_output |
| 容错性 | 考虑失败情况 | 添加重试机制 |
3. 协作模式选择
| 场景 | 推荐模式 | 原因 |
|---|---|---|
| 线性流程 | 顺序执行 | 简单直接 |
| 独立任务 | 并行执行 | 效率高 |
| 复杂依赖 | 层级化 | 易管理 |
| 动态调整 | 去中心化 | 灵活性强 |
🎓 常见问题解答
Q1: 如何确定需要多少个Agent?
A: 根据任务的复杂度:
- 简单任务:1-2个Agent
- 中等复杂:3-5个Agent
- 复杂项目:5-10个Agent
原则:宁少勿多,先从少量Agent开始,根据需要逐步增加。
Q2: Agent之间如何共享信息?
A: 三种方式:
- 通过Task context - CrewAI的方式
- 通过State - LangGraph的方式
- 通过共享存储 - 数据库或文件系统
Q3: 如何处理Agent执行失败?
A:
# 1. 重试机制
for attempt in range(3):
try:
result = agent.run(task)
break
except:
continue
# 2. 降级策略
try:
result = advanced_agent.run(task)
except:
result = basic_agent.run(task)
# 3. 人工介入
if confidence < threshold:
escalate_to_human()
Q4: 如何评估多Agent系统的效果?
A: 关键指标:
- 任务完成率 - 成功完成的任务比例
- 执行时间 - 总耗时和单个任务耗时
- 输出质量 - 人工评分或自动化评估
- 成本 - API调用费用
- 稳定性 - 失败率和重试次数
🔗 相关资源
📝 总结
多Agent协作系统是AI应用的重要发展方向,它通过:
✅ 专业化分工 - 每个Agent专注自己的领域
✅ 并行处理 - 提高整体效率
✅ 质量保证 - 互相校验和监督
✅ 可扩展性 - 轻松添加新Agent
掌握了多Agent协作的核心技术,你就能构建出强大的AI团队,解决复杂的实际问题!
下一步: 动手实践,从简单的双Agent协作开始,逐步构建更复杂的系统。
专栏: AI Agent实战专栏
日期: 2026年5月9日
系列: AI Agent高级进阶系列第1篇
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)