LangChain与LangGraph多Agent实战:从工具链到工作流编排(上)
👨 作者简介:大家好,我是唐璜Taro,全栈 领域创作者
✒️ 个人主页 :唐璜Taro
🚀 支持我:点赞👍+📝 评论 + ⭐️收藏
LangChain + LangGraph 多 Agent 实战:从工具链到工作流编排的万字精华指南
单个 Agent 能回答问题,多个 Agent 协作能解决复杂任务。本文从工具链集成讲起,深入 LangGraph 的状态机设计,最后用三个经典案例带你从零构建多 Agent 系统。

目录
- 第一章:为什么需要多 Agent
- 第二章:LangChain 工具链集成
- 第三章:LangGraph 核心概念
- 第四章:状态机与工作流设计
- 第五章:经典案例一——客服系统(路由+协作)
- 第六章:经典案例二——研究报告生成(并行+聚合)
- 第七章:经典案例三——代码审查 Pipeline(串行+反馈循环)
- 第八章:生产环境最佳实践
- 第九章:总结与选型建议
第一章:为什么需要多 Agent
1.1 单 Agent 的天花板
一个 Agent 能调用工具、能推理,但当任务变复杂时,它会遇到三个瓶颈:
上下文窗口溢出。 GPT-4 有 128K 上下文,Claude 有 200K,但一个复杂任务的中间结果、工具返回值、历史对话加在一起,很容易撑满。单 Agent 不得不在上下文和信息量之间取舍。
推理链过长导致退化。 研究表明,当 ReAct 循环超过 10 步时,LLM 的推理准确率会显著下降。就像人做太长的数学题容易出错一样。
职责不清晰。 让一个 Agent 同时负责"搜索资料、分析数据、写报告、审校质量",就像让一个人同时干四个岗位,哪个都干不好。
1.2 多 Agent 的解法
把一个大任务拆成多个子任务,每个子任务由一个专门的 Agent 负责:
用户: "帮我写一份竞品分析报告"
┌──────────────┐
│ 协调者 Agent │
│ (Orchestrator)│
└──────┬───────┘
│
┌────────────┼────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ 搜索 Agent │ │ 分析 Agent │ │ 写作 Agent │
│ 负责搜集 │ │ 负责对比 │ │ 负责成文 │
│ 竞品信息 │ │ 优劣势 │ │ 输出报告 │
└──────────┘ └──────────┘ └──────────┘
│ │ │
└────────────┼────────────┘
▼
┌──────────────┐
│ 审校 Agent │
│ 检查质量 │
└──────────────┘
1.3 LangGraph 的定位
LangChain 负责"单个 Agent 内部"的工具调用和 LLM 交互。LangGraph 负责"多个 Agent 之间"的协调和工作流。
LangChain: LLM ↔ Tools(一个 Agent 的内部能力)
LangGraph: Agent ↔ Agent(多个 Agent 之间的编排)
第二章:LangChain 工具链集成
在编排多 Agent 之前,先把每个 Agent 的"工具箱"装好。
2.1 工具的三种创建方式
方式一:用 @tool 装饰器(最简单)
from langchain_core.tools import tool
@tool
def search_web(query: str) -> str:
"""搜索互联网获取最新信息"""
# 这里接入搜索 API
return f"搜索结果: {query}的相关信息..."
@tool
def read_file(file_path: str) -> str:
"""读取本地文件内容"""
with open(file_path, "r", encoding="utf-8") as f:
return f.read()
@tool
def write_file(file_path: str, content: str) -> str:
"""将内容写入文件"""
with open(file_path, "w", encoding="utf-8") as f:
f.write(content)
return f"文件已写入: {file_path}"
函数的 docstring 非常重要——LLM 靠它来理解工具的用途。写得越清晰,Agent 调用工具的准确率越高。
方式二:用 StructuredTool 定义复杂参数
from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field
class SearchInput(BaseModel):
query: str = Field(description="搜索关键词")
max_results: int = Field(default=5, description="最大返回结果数")
language: str = Field(default="zh", description="搜索语言")
search_tool = StructuredTool.from_function(
func=search_web,
name="web_search",
description="搜索互联网",
args_schema=SearchInput
)
方式三:从 LangChain Hub 加载
from langchain_community.tools import WikipediaQueryRun
from langchain_community.utilities import WikipediaAPIWrapper
wiki_tool = WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper())
wiki_tool.invoke("量子计算")
2.2 工具包(Toolkit)组织
把相关的工具打包在一起,方便分配给不同的 Agent:
from langchain_core.tools import tool
class ResearchTools:
"""研究类工具包"""
@staticmethod
def get_tools():
return [search_web, wiki_tool, read_file]
class WritingTools:
"""写作类工具包"""
@staticmethod
def get_tools():
return [write_file, format_markdown, count_words]
class CodeTools:
"""代码类工具包"""
@staticmethod
def get_tools():
return [run_python, lint_code, run_tests]
2.3 工具调用的底层原理
当 LLM 决定调用工具时,它输出的不是自然语言,而是一段结构化的 JSON:
{
"tool_calls": [
{
"name": "search_web",
"args": {
"query": "LangGraph 多Agent 编排"
},
"id": "call_abc123"
}
]
}
LangChain 拿到这段 JSON 后:
- 解析出工具名
search_web - 从注册的工具列表中找到对应工具
- 执行
search_web(query="LangGraph 多Agent 编排") - 把返回值包装成
ToolMessage放回对话历史 - LLM 根据返回值继续推理
# 完整的工具调用链
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4")
llm_with_tools = llm.bind_tools([search_web, read_file])
# LLM 自动决定是否调用工具
response = llm_with_tools.invoke("搜索一下最新的AI新闻")
print(response.tool_calls)
# [{'name': 'search_web', 'args': {'query': '最新AI新闻'}, ...}]
2.4 自定义工具调用的错误处理
工具执行可能失败。好的错误处理能让 Agent 优雅地恢复:
from langchain_core.tools import tool
@tool
def call_api(url: str) -> str:
"""调用外部 API"""
try:
import requests
resp = requests.get(url, timeout=10)
resp.raise_for_status()
return resp.text
except requests.Timeout:
return "错误: API 请求超时,请稍后重试"
except requests.HTTPError as e:
return f"错误: HTTP {e.response.status_code},请检查 URL"
except Exception as e:
return f"错误: {str(e)}"
关键原则:永远不要让工具抛出异常中断 Agent 循环。 把错误信息返回给 LLM,让它决定下一步怎么做。
第三章:LangGraph 核心概念
3.1 安装
pip install langgraph langchain langchain-openai
3.2 从 Chain 到 Graph
LangChain 的 Chain 是线性的:A → B → C。
LangGraph 的 Graph 是图:可以分支、循环、并行。
LangChain Chain:
input → prompt → llm → output
LangGraph Graph:
┌→ search → analyze ─┐
input → router → merge → output
└→ code → test ──────┘
3.3 三个核心概念
State(状态): 在 Agent 之间传递的数据。
from typing import TypedDict, Annotated
from operator import add
class AgentState(TypedDict):
# messages 用 add 操作符,表示追加而非覆盖
messages: Annotated[list, add]
# 当前任务
task: str
# 中间结果
search_results: str
analysis: str
draft: str
# 控制流
next_agent: str
Node(节点): 每个节点是一个函数,接收 State,返回更新后的 State。
def search_agent(state: AgentState) -> dict:
"""搜索 Agent:负责搜集信息"""
task = state["task"]
# 调用 LLM + 工具
result = search_llm.invoke(f"搜索: {task}")
return {"search_results": result.content}
def analysis_agent(state: AgentState) -> dict:
"""分析 Agent:负责分析数据"""
data = state["search_results"]
result = analysis_llm.invoke(f"分析: {data}")
return {"analysis": result.content}
Edge(边): 连接节点的线,决定执行流向。
from langgraph.graph import StateGraph, END
# 普通边:A 完成后执行 B
graph.add_edge("search", "analyze")
# 条件边:根据状态决定走哪条路
def router(state: AgentState) -> str:
if "代码" in state["task"]:
return "code_agent"
return "search_agent"
graph.add_conditional_edges("router", router, {
"code_agent": "code",
"search_agent": "search"
})
3.4 一个最简单的 Graph
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
from operator import add
# 1. 定义状态
class State(TypedDict):
messages: Annotated[list, add]
result: str
# 2. 定义节点
def node_a(state: State) -> dict:
return {"result": state["result"] + "A→", "messages": ["A执行完成"]}
def node_b(state: State) -> dict:
return {"result": state["result"] + "B→", "messages": ["B执行完成"]}
def node_c(state: State) -> dict:
return {"result": state["result"] + "C", "messages": ["C执行完成"]}
# 3. 构建图
graph = StateGraph(State)
graph.add_node("a", node_a)
graph.add_node("b", node_b)
graph.add_node("c", node_c)
graph.set_entry_point("a")
graph.add_edge("a", "b")
graph.add_edge("b", "c")
graph.add_edge("c", END)
# 4. 编译并执行
app = graph.compile()
result = app.invoke({"result": "", "messages": []})
print(result["result"]) # "A→B→C"
第四章:状态机与工作流设计
4.1 四种经典工作流模式
模式一:串行(Sequential)
A → B → C → D
最简单的模式,适合有明确先后顺序的任务。
graph.add_edge("a", "b")
graph.add_edge("b", "c")
graph.add_edge("c", "d")
graph.add_edge("d", END)
模式二:并行(Parallel)
┌→ B →┐
A →├→ C →├→ D
└→ E →┘
多个 Agent 同时工作,最后汇总。
graph.add_edge("a", "b")
graph.add_edge("a", "c") # a 同时指向 b 和 c
graph.add_edge("a", "e")
graph.add_edge("b", "d")
graph.add_edge("c", "d") # b、c、e 都汇入 d
graph.add_edge("e", "d")
模式三:条件分支(Conditional)
┌→ B(技术问题)
A → Router ├→ C(订单问题)
└→ D(其他问题)
根据输入内容走不同分支。
def route(state):
if "技术" in state["task"]: return "b"
if "订单" in state["task"]: return "c"
return "d"
graph.add_conditional_edges("router", route, {
"b": "tech_agent",
"c": "order_agent",
"d": "general_agent"
})
模式四:循环(Loop)
A → B → 检查 → (不满意) → 回到B
→ (满意) → C
Agent 可以反复修改,直到结果达标。
def should_continue(state):
if state.get("approved"):
return "finish"
return "revise" # 回到修改节点
graph.add_conditional_edges("reviewer", should_continue, {
"finish": END,
"revise": "writer" # 循环回去
})
4.2 人机协作节点(Human-in-the-Loop)
有些关键决策需要人确认:
from langgraph.checkpoint.memory import MemorySaver
# 在关键节点前设置中断
graph.compile(
checkpointer=MemorySaver(),
interrupt_before=["deploy"] # 部署前暂停,等人确认
)
# 执行到中断点会暂停
config = {"configurable": {"thread_id": "1"}}
app.invoke({"task": "部署新版本"}, config)
# 暂停,等待人工确认
# 人工确认后继续
app.invoke(None, config) # 继续执行 deploy 节点
4.3 状态持久化与恢复
LangGraph 支持把状态保存到数据库,实现断点续跑:
from langgraph.checkpoint.postgres import PostgresSaver
# 使用 PostgreSQL 持久化
checkpointer = PostgresSaver.from_conn_string(
"postgresql://user:pass@localhost/db"
)
app = graph.compile(checkpointer=checkpointer)
# 执行任务
config = {"configurable": {"thread_id": "task-001"}}
app.invoke({"task": "写报告"}, config)
# 即使进程崩溃,也能恢复状态继续执行
# 重新加载 app,用相同的 thread_id 即可
第五章:经典案例一——客服系统(路由+协作)
5.1 需求
构建一个智能客服系统:
- 用户输入问题后,由路由 Agent 判断问题类型
- 技术问题 → 技术支持 Agent(带文档检索工具)
- 订单问题 → 订单 Agent(带数据库查询工具)
- 通用问题 → 通用 Agent(直接回答)
- 如果 Agent 不确定,转给人工审核
5.2 完整代码
from typing import TypedDict, Annotated, Literal
from operator import add
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.tools import tool
# ============ 1. 定义工具 ============
@tool
def search_docs(query: str) -> str:
"""搜索技术文档"""
docs = {
"登录": "登录问题:请清除浏览器缓存后重试,或使用无痕模式。",
"API": "API 报错:请检查 API Key 是否过期,接口地址是否正确。",
"部署": "部署失败:请检查 Docker 版本和端口占用情况。"
}
for key, val in docs.items():
if key in query:
return val
return "未找到相关文档,请升级到人工客服。"
@tool
def query_order(order_id: str) -> str:
"""查询订单信息"""
orders = {
"ORD001": "订单ORD001:已发货,快递单号SF1234567890",
"ORD002": "订单ORD002:待付款,请在24小时内完成支付",
"ORD003": "订单ORD003:已签收,签收时间2025-01-15 14:30"
}
return orders.get(order_id, f"未找到订单{order_id},请核实单号。")
# ============ 2. 定义状态 ============
class CustomerServiceState(TypedDict):
messages: Annotated[list, add]
user_query: str
category: str # 问题分类
agent_response: str # Agent 回答
need_human: bool # 是否需要人工
final_answer: str
# ============ 3. 定义 LLM ============
llm = ChatOpenAI(model="gpt-4", temperature=0)
# ============ 4. 定义节点 ============
def router_node(state: CustomerServiceState) -> dict:
"""路由 Agent:判断问题类型"""
query = state["user_query"]
response = llm.invoke([
SystemMessage(content="""你是客服路由系统。根据用户问题判断类别:
- 技术问题(登录、API、报错、部署)→ 返回 "tech"
- 订单问题(订单、物流、退款、付款)→ 返回 "order"
- 其他问题(咨询、投诉、建议)→ 返回 "general"
只返回类别单词,不要其他内容。"""),
HumanMessage(content=query)
])
category = response.content.strip().lower()
return {"category": category}
def tech_agent_node(state: CustomerServiceState) -> dict:
"""技术支持 Agent"""
query = state["user_query"]
tech_llm = llm.bind_tools([search_docs])
response = tech_llm.invoke([
SystemMessage(content="你是技术支持工程师。使用文档搜索工具查找解决方案,然后给出清晰的解答。"),
HumanMessage(content=query)
])
# 如果 LLM 调用了工具,执行工具
if response.tool_calls:
tool_result = search_docs.invoke(response.tool_calls[0]["args"])
final = llm.invoke(f"根据文档回答用户问题:{query}\n文档内容:{tool_result}")
return {"agent_response": final.content, "need_human": False}
return {"agent_response": response.content, "need_human": "人工" in response.content}
def order_agent_node(state: CustomerServiceState) -> dict:
"""订单 Agent"""
query = state["user_query"]
order_llm = llm.bind_tools([query_order])
response = order_llm.invoke([
SystemMessage(content="""你是订单客服。从用户消息中提取订单号(ORD开头),查询订单信息后回答。
如果用户没有提供订单号,要求用户提供。"""),
HumanMessage(content=query)
])
if response.tool_calls:
tool_result = query_order.invoke(response.tool_calls[0]["args"])
final = llm.invoke(f"根据订单信息回答:{query}\n订单数据:{tool_result}")
return {"agent_response": final.content, "need_human": False}
return {"agent_response": response.content, "need_human": False}
def general_agent_node(state: CustomerServiceState) -> dict:
"""通用 Agent"""
response = llm.invoke([
SystemMessage(content="你是友善的客服代表。礼貌地回答用户的一般性问题。如果无法回答,建议转人工。"),
HumanMessage(content=state["user_query"])
])
need_human = "人工" in response.content or "转接" in response.content
return {"agent_response": response.content, "need_human": need_human}
def human_review_node(state: CustomerServiceState) -> dict:
"""人工审核(实际会触发通知)"""
return {
"final_answer": f"您的问题已转给人工客服,请稍候。\n问题摘要:{state['user_query']}\nAgent初步回复:{state['agent_response']}"
}
def response_node(state: CustomerServiceState) -> dict:
"""最终回复"""
if state.get("final_answer"):
return {"final_answer": state["final_answer"]}
return {"final_answer": state["agent_response"]}
# ============ 5. 定义路由函数 ============
def route_by_category(state: CustomerServiceState) -> str:
category = state.get("category", "general")
if category == "tech":
return "tech_agent"
elif category == "order":
return "order_agent"
return "general_agent"
def check_need_human(state: CustomerServiceState) -> str:
if state.get("need_human"):
return "human_review"
return "response"
# ============ 6. 构建图 ============
graph = StateGraph(CustomerServiceState)
# 添加节点
graph.add_node("router", router_node)
graph.add_node("tech_agent", tech_agent_node)
graph.add_node("order_agent", order_agent_node)
graph.add_node("general_agent", general_agent_node)
graph.add_node("human_review", human_review_node)
graph.add_node("response", response_node)
# 入口
graph.set_entry_point("router")
# 条件路由
graph.add_conditional_edges("router", route_by_category, {
"tech_agent": "tech_agent",
"order_agent": "order_agent",
"general_agent": "general_agent"
})
# 各 Agent 完成后检查是否需要人工
graph.add_conditional_edges("tech_agent", check_need_human, {
"human_review": "human_review",
"response": "response"
})
graph.add_conditional_edges("order_agent", check_need_human, {
"human_review": "human_review",
"response": "response"
})
graph.add_conditional_edges("general_agent", check_need_human, {
"human_review": "human_review",
"response": "response"
})
# 汇总
graph.add_edge("human_review", END)
graph.add_edge("response", END)
# 编译
app = graph.compile()
# ============ 7. 测试 ============
# 技术问题
result = app.invoke({
"user_query": "我登录的时候一直报错500怎么办",
"messages": [], "category": "", "agent_response": "",
"need_human": False, "final_answer": ""
})
print("技术问题:", result["final_answer"])
# 订单问题
result = app.invoke({
"user_query": "我的订单ORD001到哪了",
"messages": [], "category": "", "agent_response": "",
"need_human": False, "final_answer": ""
})
print("订单问题:", result["final_answer"])
5.3 执行流程图
"登录报错500" "订单ORD001到哪了" "投诉你们服务差"
│ │ │
▼ ▼ ▼
Router Router Router
(tech) (order) (general)
│ │ │
▼ ▼ ▼
Tech Agent Order Agent General Agent
search_docs("登录") query_order("ORD001") 直接回答
│ │ │
▼ ▼ ▼
"清除缓存重试" "已发货SF123..." "非常抱歉..."
│ │ (需人工)
▼ ▼ ▼
Response Response Human Review
│ │ │
▼ ▼ ▼
END END END
本文为系列上篇,完整系列:
上篇:从工具链到工作流编排(上)(本文)
下篇:从工具链到工作流编排(下)
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)