从零搭建生产级 Multi-Agent 系统:MCP 协议 + LangGraph 实战全解析
从零搭建生产级 Multi-Agent 系统:MCP 协议 + LangGraph 实战全解析
文章背景:2026 年上半年,GitHub Trending 中 AI Agent 相关项目单周新增 Star 动辄破万;掘金热榜连续数周由多 Agent 协作、MCP 协议、Spec-First 编程工作流占据前列。本文结合当前最热技术实践,系统梳理 Multi-Agent 系统的工程落地路径,附完整可运行代码。
目录
- 一、为什么 2026 年是 Multi-Agent 爆发的元年
- 二、MCP 协议:Agent 世界的「USB 标准」
- 三、LangGraph:有向图驱动的工作流引擎
- 四、Multi-Agent 协作模式拆解
- 五、Agent Memory 工程:让 Agent 真正「记住」事情
- 六、生产级部署:从 Demo 到真实可用
- 七、避坑指南:我们踩过的 8 个生产级坑
- 八、总结与展望
一、为什么 2026 年是 Multi-Agent 爆发的元年
1.1 从单 Agent 到多 Agent 的范式迁移
2024 年以前,"AI Agent"这个词听起来很酷,但实际落地形态通常是:
用户输入 → Prompt Engineering → LLM → 输出
有时候绑一两个工具(搜索、代码执行),但整体逻辑还是单点调用。这类 Agent 遇到复杂任务就会出现几个典型问题:
- 上下文爆炸:复杂任务需要积累大量中间状态,单个 Agent 的上下文窗口撑不住
- 能力单一:一个 Agent 很难同时是「好的代码生成器」和「好的业务分析师」
- 缺乏校验:没有独立的审核节点,输出错误时无从发现
Multi-Agent 的核心思路是分治——把一个复杂任务拆给多个专注的 Agent 协作完成,每个 Agent 专注自己的领域,通过明确的接口相互通信。
1.2 三大驱动力
| 驱动力 | 2024 年状态 | 2026 年状态 |
|---|---|---|
| 模型能力 | Tool Calling 不稳定,幻觉率高 | 旗舰模型原生支持电脑控制,工具调用准确率大幅提升 |
| 协议标准化 | 各框架自造轮子,互不兼容 | MCP 协议被微软、Anthropic、OpenAI 共同采纳成为事实标准 |
| 框架成熟度 | LangChain 早期 API 频繁变动 | LangGraph 1.x 稳定,CrewAI v0.8 原生支持 MCP |
这三个条件同时成熟,使得 Multi-Agent 从"实验室玩具"真正走向"生产可用"。
二、MCP 协议:Agent 世界的「USB 标准」
2.1 MCP 解决了什么问题
问题背景:在 MCP 之前,如果你要让一个 Agent 能调用数据库、搜索引擎、GitHub API,需要为每个框架(LangChain、AutoGPT、CrewAI)单独写适配器。换一个框架,所有工具适配代码全部重写。
MCP 的解法:定义一个统一的协议层,工具只需实现一次,任何支持 MCP 的 Agent 都能直接调用。类比 USB 接口——设备(工具)和主机(Agent)各自实现标准接口,彼此无需了解对方内部实现。
2.2 MCP 核心架构拆解
┌─────────────────────────────────────────────────┐
│ Agent / LLM │
│ (MCP Client 角色) │
└────────────────────┬────────────────────────────┘
│ MCP 协议(JSON-RPC over stdio/HTTP)
▼
┌─────────────────────────────────────────────────┐
│ MCP Server │
│ ┌────────────┐ ┌──────────┐ ┌─────────────┐ │
│ │ 工具注册表 │ │ 调用路由 │ │ 权限/审计层 │ │
│ └────────────┘ └──────────┘ └─────────────┘ │
└──────┬──────────────┬──────────────┬────────────┘
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌──────────┐
│ 搜索 API │ │ 数据库 │ │ 文件系统 │
└─────────┘ └─────────┘ └──────────┘
三个核心概念:
- MCP Client:集成在 Agent 中,负责发现和调用 MCP Server 暴露的工具
- MCP Server:独立进程,暴露工具接口,处理鉴权、限流、审计等横切关注点
- Transport:Client 和 Server 之间的通信方式,支持
stdio(本地进程)和HTTP+SSE(远程服务)两种模式
2.3 MCP vs 传统 Tool Calling 对比
| 特性 | 传统 Function Calling | MCP 协议 |
|---|---|---|
| 工具定义方式 | 每个框架单独定义 JSON Schema | 统一的 MCP 协议规范 |
| 跨框架复用 | ❌ 每个框架写适配器 | ✅ 一次实现,到处可用 |
| 工具自动发现 | ❌ 手动注册 | ✅ Agent 启动时自动枚举 |
| 安全控制 | 几乎没有 | 内置鉴权 + 调用审计日志 |
| 调试体验 | print 大法 |
MCP Inspector 可视化追踪 |
| 工具生态 | 框架私有 | 公共 MCP Store,开箱即用 200+ 工具 |
2.4 动手实现一个 MCP Server(Python)
以下是一个完整可运行的 MCP Server 示例,暴露三个工具:数据库查询、图表生成、邮件发送。
# mcp_server.py
# 依赖:pip install mcp aiofiles
from mcp.server import Server
from mcp.server.models import InitializationOptions
import mcp.server.stdio
import mcp.types as types
import json
import asyncio
from typing import Any
app = Server("data-analysis-agent")
# ──────────────────────────────────────────
# 工具定义
# ──────────────────────────────────────────
@app.list_tools()
async def list_tools() -> list[types.Tool]:
"""MCP Client 启动时会调用此接口枚举所有可用工具"""
return [
types.Tool(
name="query_database",
description="查询业务数据库,返回 JSON 格式的结果集",
inputSchema={
"type": "object",
"properties": {
"sql": {
"type": "string",
"description": "只读 SELECT 语句,禁止 DDL/DML"
},
"limit": {
"type": "integer",
"description": "最大返回行数,默认 100",
"default": 100
}
},
"required": ["sql"]
}
),
types.Tool(
name="generate_chart",
description="根据数据生成图表,返回图表访问 URL",
inputSchema={
"type": "object",
"properties": {
"data": {
"type": "string",
"description": "JSON 格式的数据数组"
},
"chart_type": {
"type": "string",
"enum": ["bar", "line", "pie", "scatter"],
"description": "图表类型"
},
"title": {
"type": "string",
"description": "图表标题"
}
},
"required": ["data", "chart_type"]
}
),
types.Tool(
name="send_email",
description="发送邮件给指定收件人",
inputSchema={
"type": "object",
"properties": {
"to": {"type": "string", "description": "收件人邮箱"},
"subject": {"type": "string", "description": "邮件主题"},
"body": {"type": "string", "description": "邮件正文(支持 HTML)"},
"attachments": {
"type": "array",
"items": {"type": "string"},
"description": "附件 URL 列表(可选)"
}
},
"required": ["to", "subject", "body"]
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: Any) -> list[types.TextContent]:
"""工具调用的统一入口,MCP Server 在这里做路由和权限校验"""
# 安全校验:只允许只读 SQL
if name == "query_database":
sql = arguments.get("sql", "").strip().upper()
if not sql.startswith("SELECT"):
return [types.TextContent(
type="text",
text=json.dumps({"error": "只允许 SELECT 语句,拒绝执行"})
)]
# 实际生产中替换为真实 DB 连接
mock_result = [
{"date": "2026-06-15", "sales": 12580, "orders": 234},
{"date": "2026-06-16", "sales": 13920, "orders": 267},
{"date": "2026-06-17", "sales": 11340, "orders": 198},
]
return [types.TextContent(type="text", text=json.dumps(mock_result))]
elif name == "generate_chart":
# 实际生产中调用图表服务
chart_url = f"https://charts.example.com/{hash(arguments['data'])}.png"
return [types.TextContent(type="text", text=chart_url)]
elif name == "send_email":
# 实际生产中调用邮件服务
print(f"[MCP] 发送邮件至 {arguments['to']},主题:{arguments['subject']}")
return [types.TextContent(
type="text",
text=json.dumps({"status": "ok", "message_id": "msg_20260622_001"})
)]
return [types.TextContent(type="text", text=json.dumps({"error": f"未知工具: {name}"}))]
# ──────────────────────────────────────────
# 启动 Server(stdio 模式,适合本地 Agent 调用)
# ──────────────────────────────────────────
async def main():
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
InitializationOptions(
server_name="data-analysis-agent",
server_version="1.0.0"
)
)
if __name__ == "__main__":
asyncio.run(main())
关键设计点:
list_tools()返回的inputSchema是 JSON Schema 标准,MCP Client 根据这份 Schema 自动生成 LLM 的 Function Calling 定义,工具描述即文档即接口定义,三位一体。
三、LangGraph:有向图驱动的工作流引擎
3.1 为什么选 LangGraph 而不是 LangChain Expression Language
LangChain Expression Language(LCEL)适合线性链式调用,但生产级 Agent 往往需要:
- 条件分支:Agent 的输出决定下一步走哪条路
- 循环执行:迭代优化直到满足质量标准
- 并行执行:多个 Agent 同时处理独立子任务,最后汇聚结果
- 状态持久化:长时间运行的 Agent 任务需要 Checkpoint,防止意外中断导致全部重做
LangGraph 用有向图(Directed Graph) 对上述需求建模:节点(Node)是处理单元,边(Edge)是数据流向,条件边(Conditional Edge)实现分支逻辑。
3.2 LangGraph 核心概念
State(图的全局状态,所有节点共享并更新)
│
▼
Node(处理节点,接收 State,返回 State 的增量更新)
│
▼
Edge(普通边 = 无条件跳转,条件边 = 根据 State 决定下一节点)
│
▼
Checkpoint(状态快照,支持任务中断恢复)
State 的设计哲学:LangGraph 采用「不可变 + 增量更新」的 Reducer 模式。每个 Node 不直接修改 State,而是返回一个 dict,LangGraph 负责把这个 dict 合并到全局 State 中。这使得 Agent 的执行过程天然可追踪、可回放。
3.3 一个最小化 LangGraph Demo
以下示例构建了一个「自动修复代码」的 Agent,它会循环尝试,直到代码通过测试或超过最大重试次数:
# auto_fix_agent.py
# 依赖:pip install langgraph langchain-openai
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
import subprocess
import operator
# ──────────────────────────────────────────
# 1. 定义图的全局 State
# ──────────────────────────────────────────
class CodeFixState(TypedDict):
code: str # 当前版本的代码
error_message: str # 最近一次测试的错误信息
fix_history: Annotated[list, operator.add] # 每次修复记录(append-only)
retry_count: int # 当前重试次数
max_retries: int # 最大重试次数
is_fixed: bool # 是否修复成功
# ──────────────────────────────────────────
# 2. 定义节点(Node)
# ──────────────────────────────────────────
llm = ChatOpenAI(model="gpt-4o", temperature=0)
def run_tests(state: CodeFixState) -> dict:
"""节点:执行测试,捕获错误信息"""
try:
result = subprocess.run(
["python", "-c", state["code"]],
capture_output=True, text=True, timeout=10
)
if result.returncode == 0:
return {"is_fixed": True, "error_message": ""}
else:
return {"is_fixed": False, "error_message": result.stderr[:2000]}
except subprocess.TimeoutExpired:
return {"is_fixed": False, "error_message": "执行超时(>10s)"}
def fix_code(state: CodeFixState) -> dict:
"""节点:调用 LLM 修复代码"""
prompt = f"""以下 Python 代码存在错误,请修复:
代码:
```python
{state['code']}
错误信息:
{state[‘error_message’]}
要求:
-
只返回修复后的完整 Python 代码,不加任何解释
-
不要使用 Markdown 代码块包裹
“”"
response = llm.invoke(prompt)
fixed_code = response.content.strip()return {
“code”: fixed_code,
“retry_count”: state[“retry_count”] + 1,
“fix_history”: [{“attempt”: state[“retry_count”] + 1, “error”: state[“error_message”]}]
}
──────────────────────────────────────────
3. 定义路由函数(条件边)
──────────────────────────────────────────
def should_continue(state: CodeFixState) -> str:
“”“根据 State 决定下一步走哪条边”“”
if state[“is_fixed”]:
return “success”
if state[“retry_count”] >= state[“max_retries”]:
return “give_up”
return “retry”
──────────────────────────────────────────
4. 构建图
──────────────────────────────────────────
workflow = StateGraph(CodeFixState)
添加节点
workflow.add_node(“run_tests”, run_tests)
workflow.add_node(“fix_code”, fix_code)
设置入口
workflow.set_entry_point(“run_tests”)
添加条件边
workflow.add_conditional_edges(
“run_tests”,
should_continue,
{
“success”: END, # 修复成功,结束
“give_up”: END, # 超过重试次数,放弃
“retry”: “fix_code” # 继续修复
}
)
修复后重新跑测试
workflow.add_edge(“fix_code”, “run_tests”)
编译成可执行图
app = workflow.compile()
──────────────────────────────────────────
5. 运行
──────────────────────────────────────────
if name == “main”:
broken_code = “”"
def calculate_average(numbers):
return sum(numbers) / len(numbers) # 没有处理空列表
result = calculate_average([])
print(result)
“”"
final_state = app.invoke({
"code": broken_code,
"error_message": "",
"fix_history": [],
"retry_count": 0,
"max_retries": 3,
"is_fixed": False
})
print("=== 最终代码 ===")
print(final_state["code"])
print(f"\n=== 修复记录:共尝试 {final_state['retry_count']} 次 ===")
for record in final_state["fix_history"]:
print(f" 第 {record['attempt']} 次:{record['error'][:100]}...")
---
## 四、Multi-Agent 协作模式拆解
### 4.1 四种主流协作模式
┌──────────────────────────────────────────────────────────────┐
│ Multi-Agent 协作模式 │
├──────────────┬───────────────┬──────────────┬────────────────┤
│ Supervisor │ Pipeline │ Peer-to-Peer │ Hierarchical │
│ (主从式) │ (流水线) │ (去中心化) │ (层级式) │
├──────────────┼───────────────┼──────────────┼────────────────┤
│ 一个 Supervisor│ Agent A→B→C │ Agent 之间直 │ 多层 Supervisor │
│ 调度多个 Worker│ 顺序传递结果 │ 接协商和通信 │ 递归管理子团队 │
├──────────────┼───────────────┼──────────────┼────────────────┤
│ 适合:任务分 │ 适合:步骤明确 │ 适合:需要 │ 适合:大规模 │
│ 配和结果汇总 │ 的处理流程 │ 动态协作的场景│ 复杂任务编排 │
└──────────────┴───────────────┴──────────────┴────────────────┘
生产环境中最常用的是 **Supervisor 模式** 和 **Pipeline 模式**,下面各给一个完整实战案例。
### 4.2 Supervisor 模式实战:自动化周报生成系统
**场景描述**:每周五自动生成业务周报——Supervisor 负责任务分配,DataAgent 负责数据查询,AnalystAgent 负责数据解读,WriterAgent 负责将分析结果写成可读性强的报告。
```python
# weekly_report_system.py
# 依赖:pip install crewai langchain-openai
from crewai import Agent, Task, Crew, Process
from langchain_openai import ChatOpenAI
from langchain.tools import tool
import json
from datetime import datetime, timedelta
# ──────────────────────────────────────────
# 工具定义(生产环境中这些应该来自 MCP Server)
# ──────────────────────────────────────────
@tool("query_weekly_sales")
def query_weekly_sales(start_date: str) -> str:
"""查询指定日期开始的一周销售数据,输入格式 YYYY-MM-DD"""
# 模拟数据(实际应查询数据库)
mock_data = {
"total_revenue": 892340,
"total_orders": 1523,
"avg_order_value": 586.2,
"top_products": [
{"name": "蓝牙耳机 Pro", "revenue": 234500, "units": 312},
{"name": "智能手环 X3", "revenue": 189200, "units": 498},
{"name": "便携充电宝", "revenue": 156800, "units": 892}
],
"daily_breakdown": [
{"date": "2026-06-15", "revenue": 124500, "orders": 213},
{"date": "2026-06-16", "revenue": 135200, "orders": 231},
{"date": "2026-06-17", "revenue": 98700, "orders": 168},
{"date": "2026-06-18", "revenue": 142300, "orders": 243},
{"date": "2026-06-19", "revenue": 167800, "orders": 287},
{"date": "2026-06-20", "revenue": 110040, "orders": 188},
{"date": "2026-06-21", "revenue": 113800, "orders": 193},
],
"compared_to_last_week": {
"revenue_change": "+12.3%",
"orders_change": "+8.7%"
}
}
return json.dumps(mock_data, ensure_ascii=False)
@tool("get_customer_metrics")
def get_customer_metrics(period: str) -> str:
"""获取客户相关指标,period 可选 'week' 或 'month'"""
mock_metrics = {
"new_customers": 234,
"returning_customers": 1289,
"churn_rate": "3.2%",
"customer_satisfaction_score": 4.3,
"top_complaint": "物流速度偏慢(占投诉的 34%)"
}
return json.dumps(mock_metrics, ensure_ascii=False)
# ──────────────────────────────────────────
# Agent 定义
# ──────────────────────────────────────────
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.3) # 用小模型降低成本
data_agent = Agent(
role="数据分析师",
goal="从数据库中获取本周完整的销售数据和客户数据,确保数据准确无误",
backstory=(
"你是一名专业的数据分析师,擅长从原始数据中提取关键指标。"
"你非常注重数据准确性,遇到异常数据会主动标注。"
),
tools=[query_weekly_sales, get_customer_metrics],
llm=llm,
verbose=True
)
analyst_agent = Agent(
role="业务分析师",
goal="基于数据分析师提供的数据,解读业务表现,识别机会和风险",
backstory=(
"你是一名资深业务分析师,能从数字背后看到业务逻辑。"
"你的分析结论总是有数据支撑,而不是凭感觉。"
),
tools=[], # 纯分析节点,不需要外部工具
llm=llm,
verbose=True
)
writer_agent = Agent(
role="内容撰写员",
goal="将分析结论整理成结构清晰、易于阅读的周报,适合直接发给管理层",
backstory=(
"你擅长将复杂的数据分析结论转化为清晰易懂的商业报告。"
"你的报告总是先讲结论,再讲支撑数据,符合金字塔原理。"
),
tools=[],
llm=llm,
verbose=True
)
# ──────────────────────────────────────────
# Task 定义
# ──────────────────────────────────────────
data_collection_task = Task(
description=(
f"获取本周({datetime.now().strftime('%Y-%m-%d')} 前7天)的完整业务数据,"
"包括:销售额、订单量、同比变化、TOP 商品、客户新增/留存/流失数据。"
"将所有数据整理成结构化 JSON,供后续分析使用。"
),
agent=data_agent,
expected_output="包含本周所有关键指标的结构化 JSON 数据"
)
analysis_task = Task(
description=(
"基于数据分析师收集的数据,完成以下分析:\n"
"1. 本周整体业务表现评估(好/中/差,给出理由)\n"
"2. 与上周相比的核心变化及原因推测\n"
"3. TOP3 商品表现分析\n"
"4. 客户健康度分析(获客、留存、满意度)\n"
"5. 下周需要关注的 2-3 个风险点\n"
"6. 下周值得把握的 1-2 个机会点"
),
agent=analyst_agent,
expected_output="包含上述 6 个维度的详细分析报告(纯文本)",
context=[data_collection_task] # 依赖数据收集任务的输出
)
report_writing_task = Task(
description=(
"将业务分析师的分析结论整理成一份正式的管理层周报,格式要求:\n"
"- 标题:XX年XX月第X周业务周报\n"
"- 核心摘要(3-5 句话,最重要的结论)\n"
"- 数据总览表(Markdown 表格)\n"
"- 业务亮点(数据 + 简短分析,3 条)\n"
"- 风险预警(需关注的问题,2-3 条)\n"
"- 下周行动建议(可落地的行动,3 条)\n"
"整个周报控制在 800 字以内,使用 Markdown 格式。"
),
agent=writer_agent,
expected_output="800 字以内、Markdown 格式的管理层周报",
context=[analysis_task]
)
# ──────────────────────────────────────────
# 组建 Crew 并启动
# ──────────────────────────────────────────
crew = Crew(
agents=[data_agent, analyst_agent, writer_agent],
tasks=[data_collection_task, analysis_task, report_writing_task],
process=Process.sequential, # 顺序执行(Pipeline 模式)
verbose=2
)
if __name__ == "__main__":
result = crew.kickoff()
print("\n" + "="*60)
print("【最终周报】")
print("="*60)
print(result)
4.3 Pipeline 模式实战:代码审查流水线
场景描述:自动化 Code Review 流水线——代码经过「安全审查 → 性能分析 → 风格检查 → 综合报告」四个节点的流水线处理:
# code_review_pipeline.py
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from typing import TypedDict, List
class ReviewState(TypedDict):
code: str
language: str
security_issues: List[str]
performance_issues: List[str]
style_issues: List[str]
final_report: str
overall_score: int # 0-100
llm = ChatOpenAI(model="gpt-4o", temperature=0)
def security_review(state: ReviewState) -> dict:
"""安全审查节点:检查 SQL 注入、XSS、敏感信息泄露等"""
response = llm.invoke(
f"对以下 {state['language']} 代码做安全审查,"
f"只列出发现的安全问题(JSON 数组格式,每项为一个字符串),"
f"没有问题则返回空数组 []:\n\n{state['code']}"
)
try:
import json
issues = json.loads(response.content)
except:
issues = [response.content] if response.content.strip() != "[]" else []
return {"security_issues": issues}
def performance_review(state: ReviewState) -> dict:
"""性能分析节点:检查 N+1 查询、不必要的循环、内存泄漏等"""
response = llm.invoke(
f"对以下 {state['language']} 代码做性能分析,"
f"只列出发现的性能问题(JSON 数组格式),没有问题则返回 []:\n\n{state['code']}"
)
try:
import json
issues = json.loads(response.content)
except:
issues = []
return {"performance_issues": issues}
def style_review(state: ReviewState) -> dict:
"""代码风格检查节点"""
response = llm.invoke(
f"对以下 {state['language']} 代码做风格检查(命名规范、注释、复杂度等),"
f"只列出发现的风格问题(JSON 数组格式),没有问题则返回 []:\n\n{state['code']}"
)
try:
import json
issues = json.loads(response.content)
except:
issues = []
return {"style_issues": issues}
def generate_report(state: ReviewState) -> dict:
"""综合报告节点:汇总所有审查结果"""
all_issues = (
len(state['security_issues']) +
len(state['performance_issues']) +
len(state['style_issues'])
)
# 简单评分逻辑
score = max(0, 100 - len(state['security_issues']) * 20
- len(state['performance_issues']) * 10
- len(state['style_issues']) * 5)
report_lines = [
f"## 代码审查报告\n",
f"**综合评分:{score}/100**\n",
f"**问题总计:{all_issues} 个**\n",
]
if state['security_issues']:
report_lines.append(f"\n### 🔴 安全问题({len(state['security_issues'])} 个)")
for i, issue in enumerate(state['security_issues'], 1):
report_lines.append(f"{i}. {issue}")
if state['performance_issues']:
report_lines.append(f"\n### 🟡 性能问题({len(state['performance_issues'])} 个)")
for i, issue in enumerate(state['performance_issues'], 1):
report_lines.append(f"{i}. {issue}")
if state['style_issues']:
report_lines.append(f"\n### 🔵 风格问题({len(state['style_issues'])} 个)")
for i, issue in enumerate(state['style_issues'], 1):
report_lines.append(f"{i}. {issue}")
if all_issues == 0:
report_lines.append("\n✅ 代码质量优秀,未发现明显问题。")
return {"final_report": "\n".join(report_lines), "overall_score": score}
# 构建流水线图
pipeline = StateGraph(ReviewState)
pipeline.add_node("security", security_review)
pipeline.add_node("performance", performance_review)
pipeline.add_node("style", style_review)
pipeline.add_node("report", generate_report)
# 流水线:顺序执行
pipeline.set_entry_point("security")
pipeline.add_edge("security", "performance")
pipeline.add_edge("performance", "style")
pipeline.add_edge("style", "report")
pipeline.add_edge("report", END)
review_app = pipeline.compile()
五、Agent Memory 工程:让 Agent 真正「记住」事情
5.1 四层记忆模型
参考人类记忆模型,Agent 的记忆体系分为四层:
┌────────────────────────────────────────────────────────┐
│ 工作记忆(Working Memory) │
│ - LangGraph State,当前任务执行上下文 │
│ - 生命周期:单次任务运行期间 │
├────────────────────────────────────────────────────────┤
│ 短期记忆(Short-term Memory) │
│ - 对话历史,最近 N 轮 Message │
│ - 生命周期:当前会话(通常几小时到几天) │
├────────────────────────────────────────────────────────┤
│ 长期记忆(Long-term Memory) │
│ - 向量数据库存储的历史知识和用户偏好 │
│ - 生命周期:持久化,跨会话可用 │
├────────────────────────────────────────────────────────┤
│ 外部记忆(External Memory) │
│ - 通过 MCP 工具实时访问的外部数据源 │
│ - 生命周期:实时查询,不缓存 │
└────────────────────────────────────────────────────────┘
5.2 短期记忆:LangGraph State 机制
LangGraph 的 MemorySaver 可以将 State 持久化到本地,实现跨请求的上下文保持:
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph, MessagesState, END
# MemorySaver 将 Checkpoint 存储在内存中
# 生产环境替换为 SqliteSaver 或 PostgresSaver
memory = MemorySaver()
# 构建带 Checkpoint 的图
graph = StateGraph(MessagesState)
# ... 添加节点和边 ...
app_with_memory = graph.compile(checkpointer=memory)
# 同一个 thread_id 的多次调用会共享上下文
config = {"configurable": {"thread_id": "user_123_session_456"}}
response1 = app_with_memory.invoke(
{"messages": [{"role": "user", "content": "我叫小明,帮我分析上周的销售数据"}]},
config=config
)
# 第二次调用,Agent 能记住"我叫小明"
response2 = app_with_memory.invoke(
{"messages": [{"role": "user", "content": "把结果发给我"}]},
config=config
)
5.3 长期记忆:向量化 + 检索增强
对于需要跨会话保持记忆的场景,结合向量数据库实现 RAG 式长期记忆:
# long_term_memory.py
# 依赖:pip install langchain-openai chromadb
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.schema import Document
from datetime import datetime
class AgentLongTermMemory:
"""Agent 长期记忆模块"""
def __init__(self, collection_name: str = "agent_memory"):
self.embeddings = OpenAIEmbeddings()
self.store = Chroma(
collection_name=collection_name,
embedding_function=self.embeddings,
persist_directory="./agent_memory_db"
)
def remember(self, content: str, metadata: dict = None) -> str:
"""将信息写入长期记忆"""
doc = Document(
page_content=content,
metadata={
"timestamp": datetime.now().isoformat(),
"source": "agent",
**(metadata or {})
}
)
ids = self.store.add_documents([doc])
return ids[0]
def recall(self, query: str, k: int = 5) -> list[str]:
"""从长期记忆中检索相关信息"""
docs = self.store.similarity_search(query, k=k)
return [
f"[{doc.metadata.get('timestamp', 'unknown')}] {doc.page_content}"
for doc in docs
]
def build_memory_context(self, current_query: str) -> str:
"""为当前 Query 构建记忆上下文,注入到 Prompt"""
memories = self.recall(current_query, k=3)
if not memories:
return ""
return (
"## 相关历史记忆\n"
+ "\n".join(f"- {m}" for m in memories)
+ "\n\n"
)
# 使用示例
memory = AgentLongTermMemory()
memory.remember("用户偏好:报告使用中文,数据保留两位小数")
memory.remember("本系统的数据库连接用 DB_URL 环境变量,不要硬编码")
context = memory.build_memory_context("帮我生成本月报告")
# 将 context 拼接到 Prompt 的 System 部分
六、生产级部署:从 Demo 到真实可用
6.1 成本控制:模型分层策略
生产环境中最容易忽视的问题是成本失控。一个不经过设计的 Multi-Agent 系统,Token 消耗可以轻松达到理论最低值的 10-50 倍。
分层策略:
| Agent 角色 | 推荐模型 | 单价(参考) | 适用原因 |
|---|---|---|---|
| 路由/分类 Agent | GPT-4o-mini / DeepSeek-V3 | 低 | 任务简单,不需要强推理 |
| 数据提取 Agent | GPT-4o-mini / Qwen-Plus | 低 | 结构化任务,小模型够用 |
| 分析/推理 Agent | GPT-4o / Claude-Sonnet | 中 | 需要较强的逻辑推理 |
| 复杂规划 Agent | o3-mini / Claude-Opus | 高 | 只用于高价值复杂决策 |
关键原则:用最小够用的模型,而不是用最强的模型。路由任务用旗舰模型和用小模型效果差不多,但成本可能相差 50 倍。
6.2 可观测性:LangSmith 集成
生产级 Agent 系统必须有可观测性,否则出了问题无从排查:
# 只需要设置环境变量,LangSmith 会自动追踪所有 LangChain/LangGraph 调用
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your-langsmith-api-key"
os.environ["LANGCHAIN_PROJECT"] = "production-multi-agent"
# 之后正常运行你的 LangGraph/LangChain 代码
# 所有执行链路、Token 消耗、延迟、错误都会自动记录到 LangSmith Dashboard
LangSmith 提供的核心指标:
- 每次 Agent 运行的完整执行树:看到每个节点的输入输出
- Token 消耗热力图:快速定位消耗大户
- 错误率和 P99 延迟:监控 SLA
- Prompt 版本管理:A/B 测试不同 Prompt 的效果
6.3 安全防护:Prompt 注入与权限隔离
Multi-Agent 系统的攻击面远比单 Agent 大,因为:
- Agent 处理外部数据(爬虫结果、用户输入、文件内容)时,这些数据可能包含恶意指令
- 多个 Agent 之间传递信息,一个 Agent 被注入后可能影响整个链路
实战防御方案:
import re
class PromptInjectionGuard:
"""简单的 Prompt 注入检测中间件"""
# 常见注入模式(持续更新)
INJECTION_PATTERNS = [
r"ignore (all )?previous instructions",
r"forget (everything|all) (above|before)",
r"you are now",
r"act as (a |an )?(different|new|another)",
r"system prompt",
r"<\|im_start\|>",
r"###\s*(instruction|system|human)",
]
def __init__(self):
self.patterns = [
re.compile(p, re.IGNORECASE) for p in self.INJECTION_PATTERNS
]
def scan(self, text: str) -> tuple[bool, str]:
"""
扫描文本是否包含注入模式
返回 (is_safe, reason)
"""
for pattern in self.patterns:
if pattern.search(text):
return False, f"检测到可疑模式: {pattern.pattern}"
return True, "通过"
def sanitize(self, user_input: str) -> str:
"""清理用户输入,移除危险内容"""
is_safe, reason = self.scan(user_input)
if not is_safe:
raise ValueError(f"输入被拒绝:{reason}")
return user_input
# MCP Server 层面的权限隔离
TOOL_PERMISSIONS = {
"query_database": ["analyst_agent", "data_agent"], # 只有这两个 Agent 可以查库
"send_email": ["notifier_agent"], # 只有通知 Agent 可以发邮件
"delete_record": [], # 任何 Agent 都不能删记录
}
def check_tool_permission(agent_name: str, tool_name: str) -> bool:
allowed_agents = TOOL_PERMISSIONS.get(tool_name, [])
return agent_name in allowed_agents
七、避坑指南:我们踩过的 8 个生产级坑
在将 Multi-Agent 系统推向生产的过程中,我们总结了以下高频踩坑点:
| # | 坑的名字 | 现象 | 解决方案 |
|---|---|---|---|
| 1 | Agent 无限循环 | Agent 在循环修复中一直失败,费用爆炸 | 强制设置 max_retries,每个循环节点必须有终止条件 |
| 2 | Context 爆炸 | 长任务后期 Token 超出限制,直接报错 | 实现滚动窗口截断;重要信息摘要后存入长期记忆 |
| 3 | Agent 打架 | 两个 Agent 修改同一份文件,互相覆盖 | 用 LangGraph 有向图约束执行顺序;关键资源加锁 |
| 4 | 旗舰模型滥用 | Token 成本是预期的 20 倍 | 分层模型策略(见第六节),路由任务用小模型 |
| 5 | 无重试机制 | API 超时后整个任务失败 | 所有外部调用包 tenacity 重试装饰器,至少 3 次指数退避 |
| 6 | 无日志黑盒 | 出了问题不知道哪个 Agent 出错 | LangSmith 全链路追踪 + 每个 Node 入口/出口打结构化日志 |
| 7 | 权限过于宽松 | Agent 误删了生产数据 | MCP 层权限隔离;写操作必须经过人工审核节点 |
| 8 | Prompt 不稳定 | 升级模型后输出格式变了,JSON 解析报错 | 用 Pydantic 定义输出 Schema,让模型强制输出结构化 JSON |
关于第 8 条的代码示例(使用 Pydantic 强制结构化输出):
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import PydanticOutputParser
from pydantic import BaseModel, Field
from typing import List
class WeeklyReportSection(BaseModel):
title: str = Field(description="章节标题")
content: str = Field(description="章节内容,200字以内")
data_points: List[str] = Field(description="支撑数据列表,每项为一个数字结论")
class WeeklyReport(BaseModel):
summary: str = Field(description="执行摘要,3句话以内")
sections: List[WeeklyReportSection] = Field(description="报告各章节,3-5个")
action_items: List[str] = Field(description="下周行动建议,3条")
overall_rating: int = Field(description="本周综合评分 1-10", ge=1, le=10)
parser = PydanticOutputParser(pydantic_object=WeeklyReport)
llm = ChatOpenAI(model="gpt-4o")
# format_instructions 告诉 LLM 按 Pydantic 模型输出 JSON
prompt = f"""基于以下数据生成周报:
{data_summary}
{parser.get_format_instructions()}
"""
result = llm.invoke(prompt)
report: WeeklyReport = parser.parse(result.content)
# 此后 report 就是类型安全的 Python 对象,不再有 JSON 解析失败的风险
八、总结与展望
2026 年 Multi-Agent 技术栈全景
┌──────────────────────────────────────────────────────────┐
│ 应用层 (Application) │
│ 周报生成 │ 代码审查 │ 数据分析 │ 客服系统 │
├──────────────────────────────────────────────────────────┤
│ 编排层 (Orchestration) │
│ LangGraph (复杂工作流) │ CrewAI (快速多 Agent) │
├──────────────────────────────────────────────────────────┤
│ 协议层 (Protocol) │
│ MCP 协议(工具调用标准) │
├──────────────────────────────────────────────────────────┤
│ 能力层 (Capability) │
│ LLM (GPT/Claude/DeepSeek) │ 向量检索 │ 代码执行 │ 浏览器 │
├──────────────────────────────────────────────────────────┤
│ 基础设施层 (Infrastructure) │
│ PostgreSQL │ Redis │ ChromaDB │ S3 │ 消息队列 │
└──────────────────────────────────────────────────────────┘
核心结论
-
MCP 是不可绕过的基础设施:不管你用哪个 Agent 框架,MCP 协议都将是工具调用的统一标准。现在开始按 MCP 规范封装你的工具,是对未来最好的投资。
-
LangGraph 适合复杂工作流,CrewAI 适合快速原型:两者不是竞争关系,而是互补。用 CrewAI 快速验证 Multi-Agent 的业务价值,用 LangGraph 将验证成功的场景工程化。
-
记忆工程是差异化能力的关键:好的 Agent 和普通 Agent 的差距,往往不在于模型有多强,而在于有多少相关的上下文信息。四层记忆模型(工作/短期/长期/外部)是系统化解决这个问题的框架。
-
成本控制决定系统能否上生产:模型分层是非协商性的工程要求,不是可有可无的优化项。
-
可观测性是 Multi-Agent 的生命线:你的 Agent 做了什么、花了多少 Token、哪里出了问题——这些如果是黑盒,系统迟早翻车。
作者后记:Multi-Agent 系统的核心不是技术本身,而是任务分解的艺术——把一个复杂问题拆成多个独立的、专注的子问题,然后用技术手段把这些子问题的答案组合起来。这和软件工程的「单一职责原则」、「关注点分离」本质上是同一个思想,只是在 AI 时代换了一种实现形式。
如果你正在入局 Agent 开发,不要被框架和协议的复杂性吓到。找一个真实的业务场景,从最简单的两个 Agent 协作开始,先让它工作起来,再逐步完善。行动比完美的方案更有价值。
本文参考资料:稀土掘金 2026-06-22 热榜、CSDN AI Agent 专题、GitHub Trending(2026年6月)、MCP 协议官方文档(docs.anthropic.com/mcp)、LangGraph 官方文档(langchain-ai.github.io/langgraph)
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)