🤝 Agent团队协作 | 任务分解 + 角色分工 + 结果整合 | CrewAI/LangGraph实战 | 完整项目代码


📖 为什么需要多Agent协作?

单Agent的局限性

# 单Agent处理复杂任务
agent = create_agent(llm, tools)
result = agent.run("分析这家公司并写一份投资报告")

# 问题:
❌ 一个Agent难以兼顾多个专业领域
❌ 容易遗漏重要信息
❌ 输出质量不稳定
❌ 无法并行处理

多Agent协作的优势

# 多Agent团队协作
team = AgentTeam([
    ResearcherAgent(),      # 研究员 - 收集数据
    AnalystAgent(),         # 分析师 - 数据分析
    WriterAgent(),          # 作家 - 撰写报告
    ReviewerAgent()         # 审核员 - 质量检查
])

result = team.execute("分析这家公司并写一份投资报告")

# 优势:
✅ 专业化分工,各司其职
✅ 并行处理,效率提升
✅ 互相校验,质量更高
✅ 可扩展性强

🏗️ 多Agent协作架构

1. 中心化协调模式

                    ┌─────────────┐
                    │  Coordinator│
                    │   (协调者)   │
                    └──────┬──────┘
                           │
              ┌────────────┼────────────┐
              │            │            │
        ┌─────▼─────┐ ┌───▼────┐ ┌────▼─────┐
        │Researcher │ │Analyst │ │  Writer  │
        │ (研究员)  │ │(分析师)│ │ (作家)   │
        └───────────┘ └────────┘ └──────────┘

特点:

  • 中央协调者负责任务分配和结果整合
  • 适合结构化任务
  • 易于控制和调试

2. 去中心化协作模式

    ┌──────────┐         ┌──────────┐
    │Researcher│◄────────►│ Analyst  │
    └──────────┘         └──────────┘
         ▲                     │
         │                     ▼
    ┌──────────┐         ┌──────────┐
    │Reviewer  │◄────────►│  Writer  │
    └──────────┘         └──────────┘

特点:

  • Agent之间直接通信
  • 更灵活,适合复杂场景
  • 需要设计好通信协议

3. 层级化组织模式

              ┌──────────────┐
              │  Manager     │
              │  (管理者)     │
              └──────┬───────┘
                     │
        ┌────────────┴────────────┐
        │                         │
   ┌────▼────┐             ┌─────▼────┐
   │ Team A  │             │ Team B   │
   │(研究组) │             │(写作组)  │
   └─────────┘             └──────────┘

特点:

  • 多层级管理结构
  • 适合大型项目
  • 可以嵌套使用

🛠️ 核心技术实现

方案1:使用CrewAI框架

安装依赖
pip install crewai crewai-tools langchain-openai
创建Agent团队
from crewai import Agent, Task, Crew
from langchain_openai import ChatOpenAI

# 配置LLM
llm = ChatOpenAI(
    model="gpt-4",
    temperature=0.7
)

# 创建研究员Agent
researcher = Agent(
    role='高级研究员',
    goal='深入调研目标公司,收集全面的市场数据和财务信息',
    backstory="""你是一位经验丰富的市场研究员,擅长从多个渠道
    收集和分析公司信息。你的研究报告以详实、准确著称。""",
    verbose=True,
    allow_delegation=False,
    llm=llm
)

# 创建分析师Agent
analyst = Agent(
    role='金融分析师',
    goal='分析公司的财务状况和市场前景,提供投资建议',
    backstory="""你是一位资深金融分析师,精通财务报表分析和
    估值模型。你的分析报告逻辑清晰,结论可靠。""",
    verbose=True,
    allow_delegation=False,
    llm=llm
)

# 创建作家Agent
writer = Agent(
    role='专业作家',
    goal='基于研究和分析结果,撰写专业的投资报告',
    backstory="""你是一位专业的财经作家,擅长将复杂的数据和分析
    转化为通俗易懂的报告。你的文章结构清晰,可读性强。""",
    verbose=True,
    allow_delegation=False,
    llm=llm
)

# 定义任务
research_task = Task(
    description="""
    调研{company}公司,包括:
    1. 公司基本信息和发展历程
    2. 主要业务和产品线
    3. 竞争对手分析
    4. 最新的市场动态
    """,
    agent=researcher,
    expected_output="详细的研究调研报告"
)

analysis_task = Task(
    description="""
    基于研究结果,分析{company}公司:
    1. 财务状况(营收、利润、现金流)
    2. 竞争优势和风险因素
    3. 市场估值和投资价值
    """,
    agent=analyst,
    context=[research_task],  # 依赖研究任务的结果
    expected_output="专业的财务分析报告"
)

writing_task = Task(
    description="""
    综合研究和分析结果,撰写{company}公司的投资报告:
    1. 执行摘要
    2. 公司概况
    3. 行业分析
    4. 财务分析
    5. 投资建议
    6. 风险提示
    """,
    agent=writer,
    context=[research_task, analysis_task],  # 依赖前两个任务
    expected_output="完整的投资报告(Markdown格式)"
)

# 创建团队
crew = Crew(
    agents=[researcher, analyst, writer],
    tasks=[research_task, analysis_task, writing_task],
    verbose=2
)

# 执行任务
result = crew.kickoff(inputs={"company": "特斯拉"})
print(result)

方案2:使用LangGraph实现

安装依赖
pip install langgraph langchain langchain-openai
构建多Agent工作流
from langgraph.graph import StateGraph, END
from typing import TypedDict, List
from langchain_openai import ChatOpenAI

# 定义状态
class MultiAgentState(TypedDict):
    company: str
    research_result: str
    analysis_result: str
    final_report: str
    messages: List[str]

# 初始化LLM
llm = ChatOpenAI(model="gpt-4", temperature=0.7)

# 研究员节点
def researcher_node(state: MultiAgentState) -> MultiAgentState:
    """研究员:收集公司信息"""
    company = state["company"]
    
    prompt = f"""
    作为高级研究员,请调研{company}公司:
    
    1. 公司基本信息和发展历程
    2. 主要业务和产品线
    3. 竞争对手分析
    4. 最新的市场动态
    
    请提供详细的研究报告。
    """
    
    response = llm.invoke(prompt)
    
    return {
        **state,
        "research_result": response.content,
        "messages": state["messages"] + [f"研究员完成对{company}的调研"]
    }

# 分析师节点
def analyst_node(state: MultiAgentState) -> MultiAgentState:
    """分析师:进行财务分析"""
    company = state["company"]
    research = state["research_result"]
    
    prompt = f"""
    作为金融分析师,请基于以下研究结果分析{company}公司:
    
    研究结果:
    {research}
    
    请分析:
    1. 财务状况(营收、利润、现金流)
    2. 竞争优势和风险因素
    3. 市场估值和投资价值
    """
    
    response = llm.invoke(prompt)
    
    return {
        **state,
        "analysis_result": response.content,
        "messages": state["messages"] + ["分析师完成财务分析"]
    }

# 作家节点
def writer_node(state: MultiAgentState) -> MultiAgentState:
    """作家:撰写投资报告"""
    company = state["company"]
    research = state["research_result"]
    analysis = state["analysis_result"]
    
    prompt = f"""
    作为专业作家,请基于以下材料撰写{company}公司的投资报告:
    
    研究结果:
    {research}
    
    分析结果:
    {analysis}
    
    报告结构:
    1. 执行摘要
    2. 公司概况
    3. 行业分析
    4. 财务分析
    5. 投资建议
    6. 风险提示
    
    请使用Markdown格式,确保专业性和可读性。
    """
    
    response = llm.invoke(prompt)
    
    return {
        **state,
        "final_report": response.content,
        "messages": state["messages"] + ["作家完成投资报告"]
    }

# 构建工作流
workflow = StateGraph(MultiAgentState)

# 添加节点
workflow.add_node("researcher", researcher_node)
workflow.add_node("analyst", analyst_node)
workflow.add_node("writer", writer_node)

# 设置入口点
workflow.set_entry_point("researcher")

# 添加边
workflow.add_edge("researcher", "analyst")
workflow.add_edge("analyst", "writer")
workflow.add_edge("writer", END)

# 编译
app = workflow.compile()

# 执行
initial_state = {
    "company": "特斯拉",
    "research_result": "",
    "analysis_result": "",
    "final_report": "",
    "messages": []
}

result = app.invoke(initial_state)
print(result["final_report"])

方案3:自定义多Agent框架

核心架构
import asyncio
from typing import List, Dict, Any, Callable
from dataclasses import dataclass


@dataclass
class Agent:
    """Agent定义"""
    name: str
    role: str
    expertise: str
    process_fn: Callable
    
    def process(self, task: str, context: Dict[str, Any] = None) -> str:
        """处理任务"""
        return self.process_fn(task, context or {})


class AgentCoordinator:
    """Agent协调器"""
    
    def __init__(self):
        self.agents: List[Agent] = []
        self.task_history: List[Dict[str, Any]] = []
    
    def add_agent(self, agent: Agent):
        """添加Agent"""
        self.agents.append(agent)
    
    def assign_task(self, task: str, agent_name: str, 
                   context: Dict[str, Any] = None) -> str:
        """分配任务给指定Agent"""
        agent = next((a for a in self.agents if a.name == agent_name), None)
        
        if not agent:
            raise ValueError(f"Agent '{agent_name}' not found")
        
        result = agent.process(task, context)
        
        # 记录任务历史
        self.task_history.append({
            "task": task,
            "agent": agent_name,
            "result": result
        })
        
        return result
    
    def parallel_execute(self, tasks: List[Dict[str, Any]]) -> List[str]:
        """并行执行多个任务
        
        Args:
            tasks: 任务列表,每个任务包含:
                - task: 任务描述
                - agent: Agent名称
                - context: 上下文(可选)
        
        Returns:
            结果列表
        """
        results = []
        
        for task_info in tasks:
            result = self.assign_task(
                task_info["task"],
                task_info["agent"],
                task_info.get("context")
            )
            results.append(result)
        
        return results
    
    def sequential_execute(self, tasks: List[Dict[str, Any]]) -> List[str]:
        """顺序执行任务链
        
        Args:
            tasks: 任务列表,后一个任务可以使用前一个任务的结果
        
        Returns:
            结果列表
        """
        results = []
        context = {}
        
        for i, task_info in enumerate(tasks):
            # 将之前的结果加入上下文
            context["previous_results"] = results
            
            result = self.assign_task(
                task_info["task"],
                task_info["agent"],
                context
            )
            results.append(result)
            
            # 更新上下文
            context[f"result_{i}"] = result
        
        return results
    
    def get_task_history(self) -> List[Dict[str, Any]]:
        """获取任务历史"""
        return self.task_history


# ==================== 使用示例 ====================

def create_research_agent():
    """创建研究员Agent"""
    def process(task: str, context: Dict[str, Any]) -> str:
        # 这里可以调用LLM API
        return f"[研究员] 完成调研:{task}"
    
    return Agent(
        name="researcher",
        role="研究员",
        expertise="市场调研、数据分析",
        process_fn=process
    )


def create_analyst_agent():
    """创建分析师Agent"""
    def process(task: str, context: Dict[str, Any]) -> str:
        previous = context.get("previous_results", [])
        return f"[分析师] 基于{len(previous)}个前期结果进行分析:{task}"
    
    return Agent(
        name="analyst",
        role="分析师",
        expertise="财务分析、风险评估",
        process_fn=process
    )


def create_writer_agent():
    """创建作家Agent"""
    def process(task: str, context: Dict[str, Any]) -> str:
        previous = context.get("previous_results", [])
        return f"[作家] 基于{len(previous)}个前期结果撰写报告:{task}"
    
    return Agent(
        name="writer",
        role="作家",
        expertise="报告撰写、内容创作",
        process_fn=process
    )


# 创建协调器
coordinator = AgentCoordinator()

# 添加Agent
coordinator.add_agent(create_research_agent())
coordinator.add_agent(create_analyst_agent())
coordinator.add_agent(create_writer_agent())

# 顺序执行任务链
tasks = [
    {"task": "调研特斯拉公司", "agent": "researcher"},
    {"task": "分析财务状况", "agent": "analyst"},
    {"task": "撰写投资报告", "agent": "writer"}
]

results = coordinator.sequential_execute(tasks)

for i, result in enumerate(results):
    print(f"\n任务{i+1}结果:")
    print(result)

🎯 实战案例:智能投研系统

系统架构

用户输入公司名称
       │
       ▼
┌──────────────┐
│ 任务分解Agent │ ← 分析需求,拆分子任务
└──────┬───────┘
       │
  ┌────┴────┬────────┬────────┐
  ▼         ▼        ▼        ▼
┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐
│新闻  │ │财务  │ │竞争  │ │行业  │
│采集  │ │分析  │ │对手  │ │研究  │
└──┬───┘ └──┬───┘ └──┬───┘ └──┬───┘
   │        │        │        │
   └────────┴────────┴────────┘
            │
            ▼
     ┌─────────────┐
     │  整合Agent   │ ← 汇总所有信息
     └──────┬──────┘
            │
            ▼
     ┌─────────────┐
     │  写作Agent   │ ← 生成最终报告
     └──────┬──────┘
            │
            ▼
      输出投资报告

完整实现

from crewai import Agent, Task, Crew, Process
from langchain_openai import ChatOpenAI

# 配置LLM
llm = ChatOpenAI(model="gpt-4", temperature=0.7)

# ==================== 创建Agent团队 ====================

# 1. 任务分解专家
task_decomposer = Agent(
    role='任务分解专家',
    goal='将复杂的投研需求分解为可执行的子任务',
    backstory="""你是一位经验丰富的项目经理,擅长将复杂的大任务
    分解为清晰的、可执行的子任务。你能识别任务之间的依赖关系。""",
    llm=llm,
    verbose=True
)

# 2. 新闻采集专家
news_collector = Agent(
    role='新闻采集专家',
    goal='收集目标公司的最新新闻动态和市场舆情',
    backstory="""你是一位资深的新闻记者,擅长从各种渠道收集
    最新的公司新闻和市场信息。你对市场敏感度高。""",
    llm=llm,
    verbose=True
)

# 3. 财务分析专家
financial_analyst = Agent(
    role='财务分析专家',
    goal='深入分析公司的财务报表和关键财务指标',
    backstory="""你是一位CFA持证的财务分析师,精通财务报表分析、
    比率分析和估值模型。你的分析严谨、专业。""",
    llm=llm,
    verbose=True
)

# 4. 竞争对手分析专家
competitor_analyst = Agent(
    role='竞争对手分析专家',
    goal='分析目标公司的竞争对手和市场竞争格局',
    backstory="""你是一位战略咨询顾问,擅长竞争分析和市场定位。
    你能识别公司的竞争优势和劣势。""",
    llm=llm,
    verbose=True
)

# 5. 行业研究专家
industry_researcher = Agent(
    role='行业研究专家',
    goal='研究目标公司所在行业的发展趋势和前景',
    backstory="""你是一位行业研究专家,深入了解各个行业的发展
    动态、政策环境和未来趋势。你的视野宏观、前瞻。""",
    llm=llm,
    verbose=True
)

# 6. 报告整合专家
report_integrator = Agent(
    role='报告整合专家',
    goal='整合各方分析结果,形成完整的投资报告',
    backstory="""你是一位资深投行分析师,擅长整合多方信息,
    形成逻辑清晰、结论明确的投资报告。""",
    llm=llm,
    verbose=True
)

# 7. 报告写作专家
report_writer = Agent(
    role='报告写作专家',
    goal='将整合的分析结果转化为专业的投资报告文档',
    backstory="""你是一位专业的财经作家,擅长将复杂的分析
    转化为通俗易懂、结构清晰的报告文档。""",
    llm=llm,
    verbose=True
)

# ==================== 定义任务 ====================

def create_tasks(company: str):
    """创建任务列表"""
    
    # 任务1:分解投研需求
    decompose_task = Task(
        description=f"""
        将"{company}公司投资分析"这个需求分解为具体的子任务,
        包括需要收集的信息类型和分析维度。
        """,
        agent=task_decomposer,
        expected_output="详细的任务分解清单"
    )
    
    # 任务2:收集新闻
    news_task = Task(
        description=f"""
        收集{company}公司的最新新闻动态,包括:
        1. 最近3个月的重要新闻
        2. 市场舆情和投资者情绪
        3. 重大事件和影响
        """,
        agent=news_collector,
        expected_output="新闻和舆情汇总报告"
    )
    
    # 任务3:财务分析
    financial_task = Task(
        description=f"""
        分析{company}公司的财务状况,包括:
        1. 近3年的营收、利润、现金流趋势
        2. 关键财务比率(ROE、ROA、负债率等)
        3. 盈利能力和成长性评估
        """,
        agent=financial_analyst,
        expected_output="详细的财务分析报告"
    )
    
    # 任务4:竞争对手分析
    competitor_task = Task(
        description=f"""
        分析{company}公司的竞争格局,包括:
        1. 主要竞争对手及其市场份额
        2. 竞争优势和劣势对比
        3. 竞争策略分析
        """,
        agent=competitor_analyst,
        expected_output="竞争对手分析报告"
    )
    
    # 任务5:行业研究
    industry_task = Task(
        description=f"""
        研究{company}公司所在行业,包括:
        1. 行业规模和发展趋势
        2. 政策环境和监管要求
        3. 行业机会和挑战
        """,
        agent=industry_researcher,
        expected_output="行业研究报告"
    )
    
    # 任务6:整合分析
    integrate_task = Task(
        description=f"""
        整合以下分析结果,形成{company}公司的综合分析:
        - 新闻和舆情
        - 财务分析
        - 竞争对手分析
        - 行业研究
        
        提炼关键发现和投资要点。
        """,
        agent=report_integrator,
        context=[news_task, financial_task, competitor_task, industry_task],
        expected_output="综合分析摘要"
    )
    
    # 任务7:撰写报告
    write_task = Task(
        description=f"""
        基于综合分析,撰写{company}公司的投资报告,包括:
        
        ## 1. 执行摘要
        - 核心观点和建议
        
        ## 2. 公司概况
        - 基本信息和业务模式
        
        ## 3. 行业分析
        - 行业现状和趋势
        
        ## 4. 竞争分析
        - 竞争格局和优势
        
        ## 5. 财务分析
        - 财务状况和估值
        
        ## 6. 投资建议
        - 评级和目标价
        
        ## 7. 风险提示
        - 主要风险因素
        
        请使用专业的投行报告格式。
        """,
        agent=report_writer,
        context=[integrate_task],
        expected_output="完整的投资报告(Markdown格式)"
    )
    
    return [
        decompose_task,
        news_task,
        financial_task,
        competitor_task,
        industry_task,
        integrate_task,
        write_task
    ]

# ==================== 执行投研流程 ====================

def run_investment_research(company: str):
    """运行智能投研系统"""
    
    print(f"🚀 开始对{company}进行智能投研分析...\n")
    
    # 创建任务
    tasks = create_tasks(company)
    
    # 创建团队(使用层级化流程)
    crew = Crew(
        agents=[
            task_decomposer,
            news_collector,
            financial_analyst,
            competitor_analyst,
            industry_researcher,
            report_integrator,
            report_writer
        ],
        tasks=tasks,
        process=Process.hierarchical,  # 层级化执行
        verbose=2
    )
    
    # 执行
    result = crew.kickoff()
    
    print("\n✅ 投研分析完成!")
    print("\n" + "="*60)
    print("投资报告:")
    print("="*60)
    print(result)
    
    return result

# 运行
if __name__ == "__main__":
    report = run_investment_research("特斯拉")

📊 性能优化技巧

1. 并行执行独立任务

# 串行执行(慢)
result1 = agent1.run(task1)
result2 = agent2.run(task2)
result3 = agent3.run(task3)

# 并行执行(快)
import asyncio

async def run_parallel():
    tasks = [
        agent1.arun(task1),
        agent2.arun(task2),
        agent3.arun(task3)
    ]
    results = await asyncio.gather(*tasks)
    return results

2. 缓存中间结果

from functools import lru_cache

@lru_cache(maxsize=100)
def cached_research(company: str) -> str:
    """缓存研究结果"""
    return perform_research(company)

3. 任务优先级调度

from queue import PriorityQueue

class TaskScheduler:
    def __init__(self):
        self.task_queue = PriorityQueue()
    
    def add_task(self, priority: int, task: dict):
        """添加任务(优先级越小越优先)"""
        self.task_queue.put((priority, task))
    
    def execute_all(self):
        """按优先级执行所有任务"""
        results = []
        while not self.task_queue.empty():
            priority, task = self.task_queue.get()
            result = execute_task(task)
            results.append(result)
        return results

🔍 调试与监控

1. 添加日志

import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

logger = logging.getLogger('MultiAgentSystem')

def researcher_node(state):
    logger.info(f"开始调研:{state['company']}")
    # ... 执行调研
    logger.info("调研完成")
    return state

2. 追踪执行流程

import time

class ExecutionTracker:
    def __init__(self):
        self.timeline = []
    
    def record(self, agent: str, action: str, duration: float):
        """记录执行信息"""
        self.timeline.append({
            "timestamp": time.time(),
            "agent": agent,
            "action": action,
            "duration": duration
        })
    
    def visualize(self):
        """可视化执行时间线"""
        for event in self.timeline:
            print(f"[{event['agent']}] {event['action']} ({event['duration']:.2f}s)")

3. 异常处理

def safe_execute(agent, task, max_retries=3):
    """安全执行任务,带重试机制"""
    for attempt in range(max_retries):
        try:
            result = agent.run(task)
            return result
        except Exception as e:
            logger.warning(f"Attempt {attempt+1} failed: {e}")
            if attempt == max_retries - 1:
                raise
            time.sleep(2 ** attempt)  # 指数退避

💡 最佳实践总结

1. Agent设计原则

原则 说明 示例
单一职责 每个Agent专注一个领域 研究员只负责调研
明确边界 清晰定义输入输出 指定expected_output
专业化 赋予专业背景和角色 设定backstory
可复用 Agent可在不同任务中复用 通用分析师

2. 任务设计原则

原则 说明 技巧
原子性 任务足够小且明确 避免模糊描述
依赖性 明确任务间的依赖 使用context参数
可验证 有明确的验收标准 定义expected_output
容错性 考虑失败情况 添加重试机制

3. 协作模式选择

场景 推荐模式 原因
线性流程 顺序执行 简单直接
独立任务 并行执行 效率高
复杂依赖 层级化 易管理
动态调整 去中心化 灵活性强

🎓 常见问题解答

Q1: 如何确定需要多少个Agent?

A: 根据任务的复杂度:

  • 简单任务:1-2个Agent
  • 中等复杂:3-5个Agent
  • 复杂项目:5-10个Agent

原则:宁少勿多,先从少量Agent开始,根据需要逐步增加。

Q2: Agent之间如何共享信息?

A: 三种方式:

  1. 通过Task context - CrewAI的方式
  2. 通过State - LangGraph的方式
  3. 通过共享存储 - 数据库或文件系统

Q3: 如何处理Agent执行失败?

A:

# 1. 重试机制
for attempt in range(3):
    try:
        result = agent.run(task)
        break
    except:
        continue

# 2. 降级策略
try:
    result = advanced_agent.run(task)
except:
    result = basic_agent.run(task)

# 3. 人工介入
if confidence < threshold:
    escalate_to_human()

Q4: 如何评估多Agent系统的效果?

A: 关键指标:

  • 任务完成率 - 成功完成的任务比例
  • 执行时间 - 总耗时和单个任务耗时
  • 输出质量 - 人工评分或自动化评估
  • 成本 - API调用费用
  • 稳定性 - 失败率和重试次数

🔗 相关资源


📝 总结

多Agent协作系统是AI应用的重要发展方向,它通过:

✅ 专业化分工 - 每个Agent专注自己的领域
✅ 并行处理 - 提高整体效率
✅ 质量保证 - 互相校验和监督
✅ 可扩展性 - 轻松添加新Agent

掌握了多Agent协作的核心技术,你就能构建出强大的AI团队,解决复杂的实际问题!

下一步: 动手实践,从简单的双Agent协作开始,逐步构建更复杂的系统。


专栏: AI Agent实战专栏
日期: 2026年5月9日
系列: AI Agent高级进阶系列第1篇

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐