从零搭建生产级 Multi-Agent 系统:MCP 协议 + LangGraph 实战全解析

文章背景:2026 年上半年,GitHub Trending 中 AI Agent 相关项目单周新增 Star 动辄破万;掘金热榜连续数周由多 Agent 协作、MCP 协议、Spec-First 编程工作流占据前列。本文结合当前最热技术实践,系统梳理 Multi-Agent 系统的工程落地路径,附完整可运行代码。


目录


一、为什么 2026 年是 Multi-Agent 爆发的元年

1.1 从单 Agent 到多 Agent 的范式迁移

2024 年以前,"AI Agent"这个词听起来很酷,但实际落地形态通常是:

用户输入 → Prompt Engineering → LLM → 输出

有时候绑一两个工具(搜索、代码执行),但整体逻辑还是单点调用。这类 Agent 遇到复杂任务就会出现几个典型问题:

  1. 上下文爆炸:复杂任务需要积累大量中间状态,单个 Agent 的上下文窗口撑不住
  2. 能力单一:一个 Agent 很难同时是「好的代码生成器」和「好的业务分析师」
  3. 缺乏校验:没有独立的审核节点,输出错误时无从发现

Multi-Agent 的核心思路是分治——把一个复杂任务拆给多个专注的 Agent 协作完成,每个 Agent 专注自己的领域,通过明确的接口相互通信。

1.2 三大驱动力

驱动力 2024 年状态 2026 年状态
模型能力 Tool Calling 不稳定,幻觉率高 旗舰模型原生支持电脑控制,工具调用准确率大幅提升
协议标准化 各框架自造轮子,互不兼容 MCP 协议被微软、Anthropic、OpenAI 共同采纳成为事实标准
框架成熟度 LangChain 早期 API 频繁变动 LangGraph 1.x 稳定,CrewAI v0.8 原生支持 MCP

这三个条件同时成熟,使得 Multi-Agent 从"实验室玩具"真正走向"生产可用"。


二、MCP 协议:Agent 世界的「USB 标准」

2.1 MCP 解决了什么问题

问题背景:在 MCP 之前,如果你要让一个 Agent 能调用数据库、搜索引擎、GitHub API,需要为每个框架(LangChain、AutoGPT、CrewAI)单独写适配器。换一个框架,所有工具适配代码全部重写。

MCP 的解法:定义一个统一的协议层,工具只需实现一次,任何支持 MCP 的 Agent 都能直接调用。类比 USB 接口——设备(工具)和主机(Agent)各自实现标准接口,彼此无需了解对方内部实现。

2.2 MCP 核心架构拆解

┌─────────────────────────────────────────────────┐
│                  Agent / LLM                     │
│               (MCP Client 角色)                   │
└────────────────────┬────────────────────────────┘
                     │  MCP 协议(JSON-RPC over stdio/HTTP)
                     ▼
┌─────────────────────────────────────────────────┐
│                  MCP Server                      │
│  ┌────────────┐  ┌──────────┐  ┌─────────────┐  │
│  │ 工具注册表  │  │ 调用路由  │  │  权限/审计层  │  │
│  └────────────┘  └──────────┘  └─────────────┘  │
└──────┬──────────────┬──────────────┬────────────┘
       │              │              │
       ▼              ▼              ▼
  ┌─────────┐   ┌─────────┐   ┌──────────┐
  │ 搜索 API │   │  数据库  │   │ 文件系统  │
  └─────────┘   └─────────┘   └──────────┘

三个核心概念

  • MCP Client:集成在 Agent 中,负责发现和调用 MCP Server 暴露的工具
  • MCP Server:独立进程,暴露工具接口,处理鉴权、限流、审计等横切关注点
  • Transport:Client 和 Server 之间的通信方式,支持 stdio(本地进程)和 HTTP+SSE(远程服务)两种模式

2.3 MCP vs 传统 Tool Calling 对比

特性 传统 Function Calling MCP 协议
工具定义方式 每个框架单独定义 JSON Schema 统一的 MCP 协议规范
跨框架复用 ❌ 每个框架写适配器 ✅ 一次实现,到处可用
工具自动发现 ❌ 手动注册 ✅ Agent 启动时自动枚举
安全控制 几乎没有 内置鉴权 + 调用审计日志
调试体验 print 大法 MCP Inspector 可视化追踪
工具生态 框架私有 公共 MCP Store,开箱即用 200+ 工具

2.4 动手实现一个 MCP Server(Python)

以下是一个完整可运行的 MCP Server 示例,暴露三个工具:数据库查询、图表生成、邮件发送。

# mcp_server.py
# 依赖:pip install mcp aiofiles

from mcp.server import Server
from mcp.server.models import InitializationOptions
import mcp.server.stdio
import mcp.types as types
import json
import asyncio
from typing import Any

app = Server("data-analysis-agent")

# ──────────────────────────────────────────
# 工具定义
# ──────────────────────────────────────────

@app.list_tools()
async def list_tools() -> list[types.Tool]:
    """MCP Client 启动时会调用此接口枚举所有可用工具"""
    return [
        types.Tool(
            name="query_database",
            description="查询业务数据库,返回 JSON 格式的结果集",
            inputSchema={
                "type": "object",
                "properties": {
                    "sql": {
                        "type": "string",
                        "description": "只读 SELECT 语句,禁止 DDL/DML"
                    },
                    "limit": {
                        "type": "integer",
                        "description": "最大返回行数,默认 100",
                        "default": 100
                    }
                },
                "required": ["sql"]
            }
        ),
        types.Tool(
            name="generate_chart",
            description="根据数据生成图表,返回图表访问 URL",
            inputSchema={
                "type": "object",
                "properties": {
                    "data": {
                        "type": "string",
                        "description": "JSON 格式的数据数组"
                    },
                    "chart_type": {
                        "type": "string",
                        "enum": ["bar", "line", "pie", "scatter"],
                        "description": "图表类型"
                    },
                    "title": {
                        "type": "string",
                        "description": "图表标题"
                    }
                },
                "required": ["data", "chart_type"]
            }
        ),
        types.Tool(
            name="send_email",
            description="发送邮件给指定收件人",
            inputSchema={
                "type": "object",
                "properties": {
                    "to": {"type": "string", "description": "收件人邮箱"},
                    "subject": {"type": "string", "description": "邮件主题"},
                    "body": {"type": "string", "description": "邮件正文(支持 HTML)"},
                    "attachments": {
                        "type": "array",
                        "items": {"type": "string"},
                        "description": "附件 URL 列表(可选)"
                    }
                },
                "required": ["to", "subject", "body"]
            }
        )
    ]


@app.call_tool()
async def call_tool(name: str, arguments: Any) -> list[types.TextContent]:
    """工具调用的统一入口,MCP Server 在这里做路由和权限校验"""
    
    # 安全校验:只允许只读 SQL
    if name == "query_database":
        sql = arguments.get("sql", "").strip().upper()
        if not sql.startswith("SELECT"):
            return [types.TextContent(
                type="text",
                text=json.dumps({"error": "只允许 SELECT 语句,拒绝执行"})
            )]
        # 实际生产中替换为真实 DB 连接
        mock_result = [
            {"date": "2026-06-15", "sales": 12580, "orders": 234},
            {"date": "2026-06-16", "sales": 13920, "orders": 267},
            {"date": "2026-06-17", "sales": 11340, "orders": 198},
        ]
        return [types.TextContent(type="text", text=json.dumps(mock_result))]

    elif name == "generate_chart":
        # 实际生产中调用图表服务
        chart_url = f"https://charts.example.com/{hash(arguments['data'])}.png"
        return [types.TextContent(type="text", text=chart_url)]

    elif name == "send_email":
        # 实际生产中调用邮件服务
        print(f"[MCP] 发送邮件至 {arguments['to']},主题:{arguments['subject']}")
        return [types.TextContent(
            type="text",
            text=json.dumps({"status": "ok", "message_id": "msg_20260622_001"})
        )]
    
    return [types.TextContent(type="text", text=json.dumps({"error": f"未知工具: {name}"}))]


# ──────────────────────────────────────────
# 启动 Server(stdio 模式,适合本地 Agent 调用)
# ──────────────────────────────────────────

async def main():
    async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
        await app.run(
            read_stream,
            write_stream,
            InitializationOptions(
                server_name="data-analysis-agent",
                server_version="1.0.0"
            )
        )

if __name__ == "__main__":
    asyncio.run(main())

关键设计点list_tools() 返回的 inputSchema 是 JSON Schema 标准,MCP Client 根据这份 Schema 自动生成 LLM 的 Function Calling 定义,工具描述即文档即接口定义,三位一体。


三、LangGraph:有向图驱动的工作流引擎

3.1 为什么选 LangGraph 而不是 LangChain Expression Language

LangChain Expression Language(LCEL)适合线性链式调用,但生产级 Agent 往往需要:

  • 条件分支:Agent 的输出决定下一步走哪条路
  • 循环执行:迭代优化直到满足质量标准
  • 并行执行:多个 Agent 同时处理独立子任务,最后汇聚结果
  • 状态持久化:长时间运行的 Agent 任务需要 Checkpoint,防止意外中断导致全部重做

LangGraph 用有向图(Directed Graph) 对上述需求建模:节点(Node)是处理单元,边(Edge)是数据流向,条件边(Conditional Edge)实现分支逻辑。

3.2 LangGraph 核心概念

State(图的全局状态,所有节点共享并更新)
   │
   ▼
Node(处理节点,接收 State,返回 State 的增量更新)
   │
   ▼
Edge(普通边 = 无条件跳转,条件边 = 根据 State 决定下一节点)
   │
   ▼
Checkpoint(状态快照,支持任务中断恢复)

State 的设计哲学:LangGraph 采用「不可变 + 增量更新」的 Reducer 模式。每个 Node 不直接修改 State,而是返回一个 dict,LangGraph 负责把这个 dict 合并到全局 State 中。这使得 Agent 的执行过程天然可追踪、可回放。

3.3 一个最小化 LangGraph Demo

以下示例构建了一个「自动修复代码」的 Agent,它会循环尝试,直到代码通过测试或超过最大重试次数:

# auto_fix_agent.py
# 依赖:pip install langgraph langchain-openai

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
import subprocess
import operator

# ──────────────────────────────────────────
# 1. 定义图的全局 State
# ──────────────────────────────────────────

class CodeFixState(TypedDict):
    code: str                          # 当前版本的代码
    error_message: str                 # 最近一次测试的错误信息
    fix_history: Annotated[list, operator.add]  # 每次修复记录(append-only)
    retry_count: int                   # 当前重试次数
    max_retries: int                   # 最大重试次数
    is_fixed: bool                     # 是否修复成功

# ──────────────────────────────────────────
# 2. 定义节点(Node)
# ──────────────────────────────────────────

llm = ChatOpenAI(model="gpt-4o", temperature=0)

def run_tests(state: CodeFixState) -> dict:
    """节点:执行测试,捕获错误信息"""
    try:
        result = subprocess.run(
            ["python", "-c", state["code"]],
            capture_output=True, text=True, timeout=10
        )
        if result.returncode == 0:
            return {"is_fixed": True, "error_message": ""}
        else:
            return {"is_fixed": False, "error_message": result.stderr[:2000]}
    except subprocess.TimeoutExpired:
        return {"is_fixed": False, "error_message": "执行超时(>10s)"}

def fix_code(state: CodeFixState) -> dict:
    """节点:调用 LLM 修复代码"""
    prompt = f"""以下 Python 代码存在错误,请修复:

代码:
```python
{state['code']}

错误信息:
{state[‘error_message’]}

要求:

  • 只返回修复后的完整 Python 代码,不加任何解释

  • 不要使用 Markdown 代码块包裹
    “”"
    response = llm.invoke(prompt)
    fixed_code = response.content.strip()

    return {
    “code”: fixed_code,
    “retry_count”: state[“retry_count”] + 1,
    “fix_history”: [{“attempt”: state[“retry_count”] + 1, “error”: state[“error_message”]}]
    }

──────────────────────────────────────────

3. 定义路由函数(条件边)

──────────────────────────────────────────

def should_continue(state: CodeFixState) -> str:
“”“根据 State 决定下一步走哪条边”“”
if state[“is_fixed”]:
return “success”
if state[“retry_count”] >= state[“max_retries”]:
return “give_up”
return “retry”

──────────────────────────────────────────

4. 构建图

──────────────────────────────────────────

workflow = StateGraph(CodeFixState)

添加节点

workflow.add_node(“run_tests”, run_tests)
workflow.add_node(“fix_code”, fix_code)

设置入口

workflow.set_entry_point(“run_tests”)

添加条件边

workflow.add_conditional_edges(
“run_tests”,
should_continue,
{
“success”: END, # 修复成功,结束
“give_up”: END, # 超过重试次数,放弃
“retry”: “fix_code” # 继续修复
}
)

修复后重新跑测试

workflow.add_edge(“fix_code”, “run_tests”)

编译成可执行图

app = workflow.compile()

──────────────────────────────────────────

5. 运行

──────────────────────────────────────────

if name == “main”:
broken_code = “”"
def calculate_average(numbers):
return sum(numbers) / len(numbers) # 没有处理空列表

result = calculate_average([])
print(result)
“”"

final_state = app.invoke({
    "code": broken_code,
    "error_message": "",
    "fix_history": [],
    "retry_count": 0,
    "max_retries": 3,
    "is_fixed": False
})

print("=== 最终代码 ===")
print(final_state["code"])
print(f"\n=== 修复记录:共尝试 {final_state['retry_count']} 次 ===")
for record in final_state["fix_history"]:
    print(f"  第 {record['attempt']} 次:{record['error'][:100]}...")

---

## 四、Multi-Agent 协作模式拆解

### 4.1 四种主流协作模式

┌──────────────────────────────────────────────────────────────┐
│ Multi-Agent 协作模式 │
├──────────────┬───────────────┬──────────────┬────────────────┤
│ Supervisor │ Pipeline │ Peer-to-Peer │ Hierarchical │
│ (主从式) │ (流水线) │ (去中心化) │ (层级式) │
├──────────────┼───────────────┼──────────────┼────────────────┤
│ 一个 Supervisor│ Agent A→B→C │ Agent 之间直 │ 多层 Supervisor │
│ 调度多个 Worker│ 顺序传递结果 │ 接协商和通信 │ 递归管理子团队 │
├──────────────┼───────────────┼──────────────┼────────────────┤
│ 适合:任务分 │ 适合:步骤明确 │ 适合:需要 │ 适合:大规模 │
│ 配和结果汇总 │ 的处理流程 │ 动态协作的场景│ 复杂任务编排 │
└──────────────┴───────────────┴──────────────┴────────────────┘


生产环境中最常用的是 **Supervisor 模式** 和 **Pipeline 模式**,下面各给一个完整实战案例。

### 4.2 Supervisor 模式实战:自动化周报生成系统

**场景描述**:每周五自动生成业务周报——Supervisor 负责任务分配,DataAgent 负责数据查询,AnalystAgent 负责数据解读,WriterAgent 负责将分析结果写成可读性强的报告。

```python
# weekly_report_system.py
# 依赖:pip install crewai langchain-openai

from crewai import Agent, Task, Crew, Process
from langchain_openai import ChatOpenAI
from langchain.tools import tool
import json
from datetime import datetime, timedelta

# ──────────────────────────────────────────
# 工具定义(生产环境中这些应该来自 MCP Server)
# ──────────────────────────────────────────

@tool("query_weekly_sales")
def query_weekly_sales(start_date: str) -> str:
    """查询指定日期开始的一周销售数据,输入格式 YYYY-MM-DD"""
    # 模拟数据(实际应查询数据库)
    mock_data = {
        "total_revenue": 892340,
        "total_orders": 1523,
        "avg_order_value": 586.2,
        "top_products": [
            {"name": "蓝牙耳机 Pro", "revenue": 234500, "units": 312},
            {"name": "智能手环 X3", "revenue": 189200, "units": 498},
            {"name": "便携充电宝", "revenue": 156800, "units": 892}
        ],
        "daily_breakdown": [
            {"date": "2026-06-15", "revenue": 124500, "orders": 213},
            {"date": "2026-06-16", "revenue": 135200, "orders": 231},
            {"date": "2026-06-17", "revenue": 98700, "orders": 168},
            {"date": "2026-06-18", "revenue": 142300, "orders": 243},
            {"date": "2026-06-19", "revenue": 167800, "orders": 287},
            {"date": "2026-06-20", "revenue": 110040, "orders": 188},
            {"date": "2026-06-21", "revenue": 113800, "orders": 193},
        ],
        "compared_to_last_week": {
            "revenue_change": "+12.3%",
            "orders_change": "+8.7%"
        }
    }
    return json.dumps(mock_data, ensure_ascii=False)

@tool("get_customer_metrics")
def get_customer_metrics(period: str) -> str:
    """获取客户相关指标,period 可选 'week' 或 'month'"""
    mock_metrics = {
        "new_customers": 234,
        "returning_customers": 1289,
        "churn_rate": "3.2%",
        "customer_satisfaction_score": 4.3,
        "top_complaint": "物流速度偏慢(占投诉的 34%)"
    }
    return json.dumps(mock_metrics, ensure_ascii=False)

# ──────────────────────────────────────────
# Agent 定义
# ──────────────────────────────────────────

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.3)  # 用小模型降低成本

data_agent = Agent(
    role="数据分析师",
    goal="从数据库中获取本周完整的销售数据和客户数据,确保数据准确无误",
    backstory=(
        "你是一名专业的数据分析师,擅长从原始数据中提取关键指标。"
        "你非常注重数据准确性,遇到异常数据会主动标注。"
    ),
    tools=[query_weekly_sales, get_customer_metrics],
    llm=llm,
    verbose=True
)

analyst_agent = Agent(
    role="业务分析师",
    goal="基于数据分析师提供的数据,解读业务表现,识别机会和风险",
    backstory=(
        "你是一名资深业务分析师,能从数字背后看到业务逻辑。"
        "你的分析结论总是有数据支撑,而不是凭感觉。"
    ),
    tools=[],  # 纯分析节点,不需要外部工具
    llm=llm,
    verbose=True
)

writer_agent = Agent(
    role="内容撰写员",
    goal="将分析结论整理成结构清晰、易于阅读的周报,适合直接发给管理层",
    backstory=(
        "你擅长将复杂的数据分析结论转化为清晰易懂的商业报告。"
        "你的报告总是先讲结论,再讲支撑数据,符合金字塔原理。"
    ),
    tools=[],
    llm=llm,
    verbose=True
)

# ──────────────────────────────────────────
# Task 定义
# ──────────────────────────────────────────

data_collection_task = Task(
    description=(
        f"获取本周({datetime.now().strftime('%Y-%m-%d')} 前7天)的完整业务数据,"
        "包括:销售额、订单量、同比变化、TOP 商品、客户新增/留存/流失数据。"
        "将所有数据整理成结构化 JSON,供后续分析使用。"
    ),
    agent=data_agent,
    expected_output="包含本周所有关键指标的结构化 JSON 数据"
)

analysis_task = Task(
    description=(
        "基于数据分析师收集的数据,完成以下分析:\n"
        "1. 本周整体业务表现评估(好/中/差,给出理由)\n"
        "2. 与上周相比的核心变化及原因推测\n"
        "3. TOP3 商品表现分析\n"
        "4. 客户健康度分析(获客、留存、满意度)\n"
        "5. 下周需要关注的 2-3 个风险点\n"
        "6. 下周值得把握的 1-2 个机会点"
    ),
    agent=analyst_agent,
    expected_output="包含上述 6 个维度的详细分析报告(纯文本)",
    context=[data_collection_task]  # 依赖数据收集任务的输出
)

report_writing_task = Task(
    description=(
        "将业务分析师的分析结论整理成一份正式的管理层周报,格式要求:\n"
        "- 标题:XX年XX月第X周业务周报\n"
        "- 核心摘要(3-5 句话,最重要的结论)\n"
        "- 数据总览表(Markdown 表格)\n"
        "- 业务亮点(数据 + 简短分析,3 条)\n"
        "- 风险预警(需关注的问题,2-3 条)\n"
        "- 下周行动建议(可落地的行动,3 条)\n"
        "整个周报控制在 800 字以内,使用 Markdown 格式。"
    ),
    agent=writer_agent,
    expected_output="800 字以内、Markdown 格式的管理层周报",
    context=[analysis_task]
)

# ──────────────────────────────────────────
# 组建 Crew 并启动
# ──────────────────────────────────────────

crew = Crew(
    agents=[data_agent, analyst_agent, writer_agent],
    tasks=[data_collection_task, analysis_task, report_writing_task],
    process=Process.sequential,  # 顺序执行(Pipeline 模式)
    verbose=2
)

if __name__ == "__main__":
    result = crew.kickoff()
    print("\n" + "="*60)
    print("【最终周报】")
    print("="*60)
    print(result)

4.3 Pipeline 模式实战:代码审查流水线

场景描述:自动化 Code Review 流水线——代码经过「安全审查 → 性能分析 → 风格检查 → 综合报告」四个节点的流水线处理:

# code_review_pipeline.py
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from typing import TypedDict, List

class ReviewState(TypedDict):
    code: str
    language: str
    security_issues: List[str]
    performance_issues: List[str]
    style_issues: List[str]
    final_report: str
    overall_score: int  # 0-100

llm = ChatOpenAI(model="gpt-4o", temperature=0)

def security_review(state: ReviewState) -> dict:
    """安全审查节点:检查 SQL 注入、XSS、敏感信息泄露等"""
    response = llm.invoke(
        f"对以下 {state['language']} 代码做安全审查,"
        f"只列出发现的安全问题(JSON 数组格式,每项为一个字符串),"
        f"没有问题则返回空数组 []:\n\n{state['code']}"
    )
    try:
        import json
        issues = json.loads(response.content)
    except:
        issues = [response.content] if response.content.strip() != "[]" else []
    return {"security_issues": issues}

def performance_review(state: ReviewState) -> dict:
    """性能分析节点:检查 N+1 查询、不必要的循环、内存泄漏等"""
    response = llm.invoke(
        f"对以下 {state['language']} 代码做性能分析,"
        f"只列出发现的性能问题(JSON 数组格式),没有问题则返回 []:\n\n{state['code']}"
    )
    try:
        import json
        issues = json.loads(response.content)
    except:
        issues = []
    return {"performance_issues": issues}

def style_review(state: ReviewState) -> dict:
    """代码风格检查节点"""
    response = llm.invoke(
        f"对以下 {state['language']} 代码做风格检查(命名规范、注释、复杂度等),"
        f"只列出发现的风格问题(JSON 数组格式),没有问题则返回 []:\n\n{state['code']}"
    )
    try:
        import json
        issues = json.loads(response.content)
    except:
        issues = []
    return {"style_issues": issues}

def generate_report(state: ReviewState) -> dict:
    """综合报告节点:汇总所有审查结果"""
    all_issues = (
        len(state['security_issues']) + 
        len(state['performance_issues']) + 
        len(state['style_issues'])
    )
    
    # 简单评分逻辑
    score = max(0, 100 - len(state['security_issues']) * 20 
                - len(state['performance_issues']) * 10 
                - len(state['style_issues']) * 5)
    
    report_lines = [
        f"## 代码审查报告\n",
        f"**综合评分:{score}/100**\n",
        f"**问题总计:{all_issues} 个**\n",
    ]
    
    if state['security_issues']:
        report_lines.append(f"\n### 🔴 安全问题({len(state['security_issues'])} 个)")
        for i, issue in enumerate(state['security_issues'], 1):
            report_lines.append(f"{i}. {issue}")
    
    if state['performance_issues']:
        report_lines.append(f"\n### 🟡 性能问题({len(state['performance_issues'])} 个)")
        for i, issue in enumerate(state['performance_issues'], 1):
            report_lines.append(f"{i}. {issue}")
    
    if state['style_issues']:
        report_lines.append(f"\n### 🔵 风格问题({len(state['style_issues'])} 个)")
        for i, issue in enumerate(state['style_issues'], 1):
            report_lines.append(f"{i}. {issue}")
    
    if all_issues == 0:
        report_lines.append("\n✅ 代码质量优秀,未发现明显问题。")
    
    return {"final_report": "\n".join(report_lines), "overall_score": score}

# 构建流水线图
pipeline = StateGraph(ReviewState)
pipeline.add_node("security", security_review)
pipeline.add_node("performance", performance_review)
pipeline.add_node("style", style_review)
pipeline.add_node("report", generate_report)

# 流水线:顺序执行
pipeline.set_entry_point("security")
pipeline.add_edge("security", "performance")
pipeline.add_edge("performance", "style")
pipeline.add_edge("style", "report")
pipeline.add_edge("report", END)

review_app = pipeline.compile()

五、Agent Memory 工程:让 Agent 真正「记住」事情

5.1 四层记忆模型

参考人类记忆模型,Agent 的记忆体系分为四层:

┌────────────────────────────────────────────────────────┐
│ 工作记忆(Working Memory)                               │
│  - LangGraph State,当前任务执行上下文                   │
│  - 生命周期:单次任务运行期间                             │
├────────────────────────────────────────────────────────┤
│ 短期记忆(Short-term Memory)                            │
│  - 对话历史,最近 N 轮 Message                           │
│  - 生命周期:当前会话(通常几小时到几天)                  │
├────────────────────────────────────────────────────────┤
│ 长期记忆(Long-term Memory)                             │
│  - 向量数据库存储的历史知识和用户偏好                      │
│  - 生命周期:持久化,跨会话可用                           │
├────────────────────────────────────────────────────────┤
│ 外部记忆(External Memory)                              │
│  - 通过 MCP 工具实时访问的外部数据源                      │
│  - 生命周期:实时查询,不缓存                             │
└────────────────────────────────────────────────────────┘

5.2 短期记忆:LangGraph State 机制

LangGraph 的 MemorySaver 可以将 State 持久化到本地,实现跨请求的上下文保持:

from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, MessagesState, END

# MemorySaver 将 Checkpoint 存储在内存中
# 生产环境替换为 SqliteSaver 或 PostgresSaver
memory = MemorySaver()

# 构建带 Checkpoint 的图
graph = StateGraph(MessagesState)
# ... 添加节点和边 ...
app_with_memory = graph.compile(checkpointer=memory)

# 同一个 thread_id 的多次调用会共享上下文
config = {"configurable": {"thread_id": "user_123_session_456"}}

response1 = app_with_memory.invoke(
    {"messages": [{"role": "user", "content": "我叫小明,帮我分析上周的销售数据"}]},
    config=config
)

# 第二次调用,Agent 能记住"我叫小明"
response2 = app_with_memory.invoke(
    {"messages": [{"role": "user", "content": "把结果发给我"}]},
    config=config
)

5.3 长期记忆:向量化 + 检索增强

对于需要跨会话保持记忆的场景,结合向量数据库实现 RAG 式长期记忆:

# long_term_memory.py
# 依赖:pip install langchain-openai chromadb

from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.schema import Document
from datetime import datetime

class AgentLongTermMemory:
    """Agent 长期记忆模块"""
    
    def __init__(self, collection_name: str = "agent_memory"):
        self.embeddings = OpenAIEmbeddings()
        self.store = Chroma(
            collection_name=collection_name,
            embedding_function=self.embeddings,
            persist_directory="./agent_memory_db"
        )
    
    def remember(self, content: str, metadata: dict = None) -> str:
        """将信息写入长期记忆"""
        doc = Document(
            page_content=content,
            metadata={
                "timestamp": datetime.now().isoformat(),
                "source": "agent",
                **(metadata or {})
            }
        )
        ids = self.store.add_documents([doc])
        return ids[0]
    
    def recall(self, query: str, k: int = 5) -> list[str]:
        """从长期记忆中检索相关信息"""
        docs = self.store.similarity_search(query, k=k)
        return [
            f"[{doc.metadata.get('timestamp', 'unknown')}] {doc.page_content}"
            for doc in docs
        ]
    
    def build_memory_context(self, current_query: str) -> str:
        """为当前 Query 构建记忆上下文,注入到 Prompt"""
        memories = self.recall(current_query, k=3)
        if not memories:
            return ""
        return (
            "## 相关历史记忆\n"
            + "\n".join(f"- {m}" for m in memories)
            + "\n\n"
        )

# 使用示例
memory = AgentLongTermMemory()
memory.remember("用户偏好:报告使用中文,数据保留两位小数")
memory.remember("本系统的数据库连接用 DB_URL 环境变量,不要硬编码")

context = memory.build_memory_context("帮我生成本月报告")
# 将 context 拼接到 Prompt 的 System 部分

六、生产级部署:从 Demo 到真实可用

6.1 成本控制:模型分层策略

生产环境中最容易忽视的问题是成本失控。一个不经过设计的 Multi-Agent 系统,Token 消耗可以轻松达到理论最低值的 10-50 倍。

分层策略

Agent 角色 推荐模型 单价(参考) 适用原因
路由/分类 Agent GPT-4o-mini / DeepSeek-V3 任务简单,不需要强推理
数据提取 Agent GPT-4o-mini / Qwen-Plus 结构化任务,小模型够用
分析/推理 Agent GPT-4o / Claude-Sonnet 需要较强的逻辑推理
复杂规划 Agent o3-mini / Claude-Opus 只用于高价值复杂决策

关键原则用最小够用的模型,而不是用最强的模型。路由任务用旗舰模型和用小模型效果差不多,但成本可能相差 50 倍。

6.2 可观测性:LangSmith 集成

生产级 Agent 系统必须有可观测性,否则出了问题无从排查:

# 只需要设置环境变量,LangSmith 会自动追踪所有 LangChain/LangGraph 调用
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-api-key"
os.environ["LANGCHAIN_PROJECT"] = "production-multi-agent"

# 之后正常运行你的 LangGraph/LangChain 代码
# 所有执行链路、Token 消耗、延迟、错误都会自动记录到 LangSmith Dashboard

LangSmith 提供的核心指标:

  • 每次 Agent 运行的完整执行树:看到每个节点的输入输出
  • Token 消耗热力图:快速定位消耗大户
  • 错误率和 P99 延迟:监控 SLA
  • Prompt 版本管理:A/B 测试不同 Prompt 的效果

6.3 安全防护:Prompt 注入与权限隔离

Multi-Agent 系统的攻击面远比单 Agent 大,因为:

  1. Agent 处理外部数据(爬虫结果、用户输入、文件内容)时,这些数据可能包含恶意指令
  2. 多个 Agent 之间传递信息,一个 Agent 被注入后可能影响整个链路

实战防御方案

import re

class PromptInjectionGuard:
    """简单的 Prompt 注入检测中间件"""
    
    # 常见注入模式(持续更新)
    INJECTION_PATTERNS = [
        r"ignore (all )?previous instructions",
        r"forget (everything|all) (above|before)",
        r"you are now",
        r"act as (a |an )?(different|new|another)",
        r"system prompt",
        r"<\|im_start\|>",
        r"###\s*(instruction|system|human)",
    ]
    
    def __init__(self):
        self.patterns = [
            re.compile(p, re.IGNORECASE) for p in self.INJECTION_PATTERNS
        ]
    
    def scan(self, text: str) -> tuple[bool, str]:
        """
        扫描文本是否包含注入模式
        返回 (is_safe, reason)
        """
        for pattern in self.patterns:
            if pattern.search(text):
                return False, f"检测到可疑模式: {pattern.pattern}"
        return True, "通过"
    
    def sanitize(self, user_input: str) -> str:
        """清理用户输入,移除危险内容"""
        is_safe, reason = self.scan(user_input)
        if not is_safe:
            raise ValueError(f"输入被拒绝:{reason}")
        return user_input

# MCP Server 层面的权限隔离
TOOL_PERMISSIONS = {
    "query_database": ["analyst_agent", "data_agent"],      # 只有这两个 Agent 可以查库
    "send_email": ["notifier_agent"],                        # 只有通知 Agent 可以发邮件
    "delete_record": [],                                     # 任何 Agent 都不能删记录
}

def check_tool_permission(agent_name: str, tool_name: str) -> bool:
    allowed_agents = TOOL_PERMISSIONS.get(tool_name, [])
    return agent_name in allowed_agents

七、避坑指南:我们踩过的 8 个生产级坑

在将 Multi-Agent 系统推向生产的过程中,我们总结了以下高频踩坑点:

# 坑的名字 现象 解决方案
1 Agent 无限循环 Agent 在循环修复中一直失败,费用爆炸 强制设置 max_retries,每个循环节点必须有终止条件
2 Context 爆炸 长任务后期 Token 超出限制,直接报错 实现滚动窗口截断;重要信息摘要后存入长期记忆
3 Agent 打架 两个 Agent 修改同一份文件,互相覆盖 用 LangGraph 有向图约束执行顺序;关键资源加锁
4 旗舰模型滥用 Token 成本是预期的 20 倍 分层模型策略(见第六节),路由任务用小模型
5 无重试机制 API 超时后整个任务失败 所有外部调用包 tenacity 重试装饰器,至少 3 次指数退避
6 无日志黑盒 出了问题不知道哪个 Agent 出错 LangSmith 全链路追踪 + 每个 Node 入口/出口打结构化日志
7 权限过于宽松 Agent 误删了生产数据 MCP 层权限隔离;写操作必须经过人工审核节点
8 Prompt 不稳定 升级模型后输出格式变了,JSON 解析报错 Pydantic 定义输出 Schema,让模型强制输出结构化 JSON

关于第 8 条的代码示例(使用 Pydantic 强制结构化输出):

from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field
from typing import List

class WeeklyReportSection(BaseModel):
    title: str = Field(description="章节标题")
    content: str = Field(description="章节内容,200字以内")
    data_points: List[str] = Field(description="支撑数据列表,每项为一个数字结论")

class WeeklyReport(BaseModel):
    summary: str = Field(description="执行摘要,3句话以内")
    sections: List[WeeklyReportSection] = Field(description="报告各章节,3-5个")
    action_items: List[str] = Field(description="下周行动建议,3条")
    overall_rating: int = Field(description="本周综合评分 1-10", ge=1, le=10)

parser = PydanticOutputParser(pydantic_object=WeeklyReport)
llm = ChatOpenAI(model="gpt-4o")

# format_instructions 告诉 LLM 按 Pydantic 模型输出 JSON
prompt = f"""基于以下数据生成周报:
{data_summary}

{parser.get_format_instructions()}
"""

result = llm.invoke(prompt)
report: WeeklyReport = parser.parse(result.content)
# 此后 report 就是类型安全的 Python 对象,不再有 JSON 解析失败的风险

八、总结与展望

2026 年 Multi-Agent 技术栈全景

┌──────────────────────────────────────────────────────────┐
│                    应用层 (Application)                    │
│         周报生成 │ 代码审查 │ 数据分析 │ 客服系统           │
├──────────────────────────────────────────────────────────┤
│                    编排层 (Orchestration)                  │
│         LangGraph (复杂工作流) │ CrewAI (快速多 Agent)      │
├──────────────────────────────────────────────────────────┤
│                    协议层 (Protocol)                       │
│              MCP 协议(工具调用标准)                        │
├──────────────────────────────────────────────────────────┤
│                    能力层 (Capability)                     │
│  LLM (GPT/Claude/DeepSeek) │ 向量检索 │ 代码执行 │ 浏览器  │
├──────────────────────────────────────────────────────────┤
│                    基础设施层 (Infrastructure)              │
│   PostgreSQL │ Redis │ ChromaDB │ S3 │ 消息队列            │
└──────────────────────────────────────────────────────────┘

核心结论

  1. MCP 是不可绕过的基础设施:不管你用哪个 Agent 框架,MCP 协议都将是工具调用的统一标准。现在开始按 MCP 规范封装你的工具,是对未来最好的投资。

  2. LangGraph 适合复杂工作流,CrewAI 适合快速原型:两者不是竞争关系,而是互补。用 CrewAI 快速验证 Multi-Agent 的业务价值,用 LangGraph 将验证成功的场景工程化。

  3. 记忆工程是差异化能力的关键:好的 Agent 和普通 Agent 的差距,往往不在于模型有多强,而在于有多少相关的上下文信息。四层记忆模型(工作/短期/长期/外部)是系统化解决这个问题的框架。

  4. 成本控制决定系统能否上生产:模型分层是非协商性的工程要求,不是可有可无的优化项。

  5. 可观测性是 Multi-Agent 的生命线:你的 Agent 做了什么、花了多少 Token、哪里出了问题——这些如果是黑盒,系统迟早翻车。


作者后记:Multi-Agent 系统的核心不是技术本身,而是任务分解的艺术——把一个复杂问题拆成多个独立的、专注的子问题,然后用技术手段把这些子问题的答案组合起来。这和软件工程的「单一职责原则」、「关注点分离」本质上是同一个思想,只是在 AI 时代换了一种实现形式。

如果你正在入局 Agent 开发,不要被框架和协议的复杂性吓到。找一个真实的业务场景,从最简单的两个 Agent 协作开始,先让它工作起来,再逐步完善。行动比完美的方案更有价值。


本文参考资料:稀土掘金 2026-06-22 热榜、CSDN AI Agent 专题、GitHub Trending(2026年6月)、MCP 协议官方文档(docs.anthropic.com/mcp)、LangGraph 官方文档(langchain-ai.github.io/langgraph)

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐