👨 作者简介:大家好,我是唐璜Taro,全栈 领域创作者
✒️ 个人主页 :唐璜Taro
🚀 支持我:点赞👍+📝 评论 + ⭐️收藏


LangChain + LangGraph 多 Agent 实战:从工具链到工作流编排的万字精华指南

单个 Agent 能回答问题,多个 Agent 协作能解决复杂任务。本文从工具链集成讲起,深入 LangGraph 的状态机设计,最后用三个经典案例带你从零构建多 Agent 系统。


在这里插入图片描述

目录


第一章:为什么需要多 Agent

1.1 单 Agent 的天花板

一个 Agent 能调用工具、能推理,但当任务变复杂时,它会遇到三个瓶颈:

上下文窗口溢出。 GPT-4 有 128K 上下文,Claude 有 200K,但一个复杂任务的中间结果、工具返回值、历史对话加在一起,很容易撑满。单 Agent 不得不在上下文和信息量之间取舍。

推理链过长导致退化。 研究表明,当 ReAct 循环超过 10 步时,LLM 的推理准确率会显著下降。就像人做太长的数学题容易出错一样。

职责不清晰。 让一个 Agent 同时负责"搜索资料、分析数据、写报告、审校质量",就像让一个人同时干四个岗位,哪个都干不好。

1.2 多 Agent 的解法

把一个大任务拆成多个子任务,每个子任务由一个专门的 Agent 负责:

用户: "帮我写一份竞品分析报告"

                    ┌──────────────┐
                    │  协调者 Agent │
                    │  (Orchestrator)│
                    └──────┬───────┘
                           │
              ┌────────────┼────────────┐
              ▼            ▼            ▼
        ┌──────────┐ ┌──────────┐ ┌──────────┐
        │ 搜索 Agent │ │ 分析 Agent │ │ 写作 Agent │
        │ 负责搜集   │ │ 负责对比   │ │ 负责成文   │
        │ 竞品信息   │ │ 优劣势    │ │ 输出报告   │
        └──────────┘ └──────────┘ └──────────┘
              │            │            │
              └────────────┼────────────┘
                           ▼
                    ┌──────────────┐
                    │  审校 Agent   │
                    │  检查质量     │
                    └──────────────┘

1.3 LangGraph 的定位

LangChain 负责"单个 Agent 内部"的工具调用和 LLM 交互。LangGraph 负责"多个 Agent 之间"的协调和工作流。

LangChain:  LLM ↔ Tools(一个 Agent 的内部能力)
LangGraph:  Agent ↔ Agent(多个 Agent 之间的编排)

第二章:LangChain 工具链集成

在编排多 Agent 之前,先把每个 Agent 的"工具箱"装好。

2.1 工具的三种创建方式

方式一:用 @tool 装饰器(最简单)

from langchain_core.tools import tool

@tool
def search_web(query: str) -> str:
    """搜索互联网获取最新信息"""
    # 这里接入搜索 API
    return f"搜索结果: {query}的相关信息..."

@tool
def read_file(file_path: str) -> str:
    """读取本地文件内容"""
    with open(file_path, "r", encoding="utf-8") as f:
        return f.read()

@tool
def write_file(file_path: str, content: str) -> str:
    """将内容写入文件"""
    with open(file_path, "w", encoding="utf-8") as f:
        f.write(content)
    return f"文件已写入: {file_path}"

函数的 docstring 非常重要——LLM 靠它来理解工具的用途。写得越清晰,Agent 调用工具的准确率越高。

方式二:用 StructuredTool 定义复杂参数

from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field

class SearchInput(BaseModel):
    query: str = Field(description="搜索关键词")
    max_results: int = Field(default=5, description="最大返回结果数")
    language: str = Field(default="zh", description="搜索语言")

search_tool = StructuredTool.from_function(
    func=search_web,
    name="web_search",
    description="搜索互联网",
    args_schema=SearchInput
)

方式三:从 LangChain Hub 加载

from langchain_community.tools import WikipediaQueryRun
from langchain_community.utilities import WikipediaAPIWrapper

wiki_tool = WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper())
wiki_tool.invoke("量子计算")

2.2 工具包(Toolkit)组织

把相关的工具打包在一起,方便分配给不同的 Agent:

from langchain_core.tools import tool

class ResearchTools:
    """研究类工具包"""
    
    @staticmethod
    def get_tools():
        return [search_web, wiki_tool, read_file]

class WritingTools:
    """写作类工具包"""
    
    @staticmethod
    def get_tools():
        return [write_file, format_markdown, count_words]

class CodeTools:
    """代码类工具包"""
    
    @staticmethod
    def get_tools():
        return [run_python, lint_code, run_tests]

2.3 工具调用的底层原理

当 LLM 决定调用工具时,它输出的不是自然语言,而是一段结构化的 JSON:

{
  "tool_calls": [
    {
      "name": "search_web",
      "args": {
        "query": "LangGraph 多Agent 编排"
      },
      "id": "call_abc123"
    }
  ]
}

LangChain 拿到这段 JSON 后:

  1. 解析出工具名 search_web
  2. 从注册的工具列表中找到对应工具
  3. 执行 search_web(query="LangGraph 多Agent 编排")
  4. 把返回值包装成 ToolMessage 放回对话历史
  5. LLM 根据返回值继续推理
# 完整的工具调用链
from langchain_openai import ChatOpenAI

llm = ChatOpenAI(model="gpt-4")
llm_with_tools = llm.bind_tools([search_web, read_file])

# LLM 自动决定是否调用工具
response = llm_with_tools.invoke("搜索一下最新的AI新闻")
print(response.tool_calls)
# [{'name': 'search_web', 'args': {'query': '最新AI新闻'}, ...}]

2.4 自定义工具调用的错误处理

工具执行可能失败。好的错误处理能让 Agent 优雅地恢复:

from langchain_core.tools import tool

@tool
def call_api(url: str) -> str:
    """调用外部 API"""
    try:
        import requests
        resp = requests.get(url, timeout=10)
        resp.raise_for_status()
        return resp.text
    except requests.Timeout:
        return "错误: API 请求超时,请稍后重试"
    except requests.HTTPError as e:
        return f"错误: HTTP {e.response.status_code},请检查 URL"
    except Exception as e:
        return f"错误: {str(e)}"

关键原则:永远不要让工具抛出异常中断 Agent 循环。 把错误信息返回给 LLM,让它决定下一步怎么做。


第三章:LangGraph 核心概念

3.1 安装

pip install langgraph langchain langchain-openai

3.2 从 Chain 到 Graph

LangChain 的 Chain 是线性的:A → B → C

LangGraph 的 Graph 是图:可以分支、循环、并行。

LangChain Chain:
    input → prompt → llm → output

LangGraph Graph:
              ┌→ search → analyze ─┐
    input → router                → merge → output
              └→ code → test ──────┘

3.3 三个核心概念

State(状态): 在 Agent 之间传递的数据。

from typing import TypedDict, Annotated
from operator import add

class AgentState(TypedDict):
    # messages 用 add 操作符,表示追加而非覆盖
    messages: Annotated[list, add]
    # 当前任务
    task: str
    # 中间结果
    search_results: str
    analysis: str
    draft: str
    # 控制流
    next_agent: str

Node(节点): 每个节点是一个函数,接收 State,返回更新后的 State。

def search_agent(state: AgentState) -> dict:
    """搜索 Agent:负责搜集信息"""
    task = state["task"]
    # 调用 LLM + 工具
    result = search_llm.invoke(f"搜索: {task}")
    return {"search_results": result.content}

def analysis_agent(state: AgentState) -> dict:
    """分析 Agent:负责分析数据"""
    data = state["search_results"]
    result = analysis_llm.invoke(f"分析: {data}")
    return {"analysis": result.content}

Edge(边): 连接节点的线,决定执行流向。

from langgraph.graph import StateGraph, END

# 普通边:A 完成后执行 B
graph.add_edge("search", "analyze")

# 条件边:根据状态决定走哪条路
def router(state: AgentState) -> str:
    if "代码" in state["task"]:
        return "code_agent"
    return "search_agent"

graph.add_conditional_edges("router", router, {
    "code_agent": "code",
    "search_agent": "search"
})

3.4 一个最简单的 Graph

from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
from operator import add

# 1. 定义状态
class State(TypedDict):
    messages: Annotated[list, add]
    result: str

# 2. 定义节点
def node_a(state: State) -> dict:
    return {"result": state["result"] + "A→", "messages": ["A执行完成"]}

def node_b(state: State) -> dict:
    return {"result": state["result"] + "B→", "messages": ["B执行完成"]}

def node_c(state: State) -> dict:
    return {"result": state["result"] + "C", "messages": ["C执行完成"]}

# 3. 构建图
graph = StateGraph(State)
graph.add_node("a", node_a)
graph.add_node("b", node_b)
graph.add_node("c", node_c)

graph.set_entry_point("a")
graph.add_edge("a", "b")
graph.add_edge("b", "c")
graph.add_edge("c", END)

# 4. 编译并执行
app = graph.compile()
result = app.invoke({"result": "", "messages": []})
print(result["result"])  # "A→B→C"

第四章:状态机与工作流设计

4.1 四种经典工作流模式

模式一:串行(Sequential)

A → B → C → D

最简单的模式,适合有明确先后顺序的任务。

graph.add_edge("a", "b")
graph.add_edge("b", "c")
graph.add_edge("c", "d")
graph.add_edge("d", END)

模式二:并行(Parallel)

    ┌→ B →┐
A →├→ C →├→ D
    └→ E →┘

多个 Agent 同时工作,最后汇总。

graph.add_edge("a", "b")
graph.add_edge("a", "c")  # a 同时指向 b 和 c
graph.add_edge("a", "e")
graph.add_edge("b", "d")
graph.add_edge("c", "d")  # b、c、e 都汇入 d
graph.add_edge("e", "d")

模式三:条件分支(Conditional)

           ┌→ B(技术问题)
A → Router ├→ C(订单问题)
           └→ D(其他问题)

根据输入内容走不同分支。

def route(state):
    if "技术" in state["task"]: return "b"
    if "订单" in state["task"]: return "c"
    return "d"

graph.add_conditional_edges("router", route, {
    "b": "tech_agent",
    "c": "order_agent", 
    "d": "general_agent"
})

模式四:循环(Loop)

A → B → 检查 → (不满意) → 回到B
                → (满意) → C

Agent 可以反复修改,直到结果达标。

def should_continue(state):
    if state.get("approved"):
        return "finish"
    return "revise"  # 回到修改节点

graph.add_conditional_edges("reviewer", should_continue, {
    "finish": END,
    "revise": "writer"  # 循环回去
})

4.2 人机协作节点(Human-in-the-Loop)

有些关键决策需要人确认:

from langgraph.checkpoint.memory import MemorySaver

# 在关键节点前设置中断
graph.compile(
    checkpointer=MemorySaver(),
    interrupt_before=["deploy"]  # 部署前暂停,等人确认
)

# 执行到中断点会暂停
config = {"configurable": {"thread_id": "1"}}
app.invoke({"task": "部署新版本"}, config)
# 暂停,等待人工确认

# 人工确认后继续
app.invoke(None, config)  # 继续执行 deploy 节点

4.3 状态持久化与恢复

LangGraph 支持把状态保存到数据库,实现断点续跑:

from langgraph.checkpoint.postgres import PostgresSaver

# 使用 PostgreSQL 持久化
checkpointer = PostgresSaver.from_conn_string(
    "postgresql://user:pass@localhost/db"
)

app = graph.compile(checkpointer=checkpointer)

# 执行任务
config = {"configurable": {"thread_id": "task-001"}}
app.invoke({"task": "写报告"}, config)

# 即使进程崩溃,也能恢复状态继续执行
# 重新加载 app,用相同的 thread_id 即可

第五章:经典案例一——客服系统(路由+协作)

5.1 需求

构建一个智能客服系统:

  • 用户输入问题后,由路由 Agent 判断问题类型
  • 技术问题 → 技术支持 Agent(带文档检索工具)
  • 订单问题 → 订单 Agent(带数据库查询工具)
  • 通用问题 → 通用 Agent(直接回答)
  • 如果 Agent 不确定,转给人工审核

5.2 完整代码

from typing import TypedDict, Annotated, Literal
from operator import add
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.tools import tool

# ============ 1. 定义工具 ============

@tool
def search_docs(query: str) -> str:
    """搜索技术文档"""
    docs = {
        "登录": "登录问题:请清除浏览器缓存后重试,或使用无痕模式。",
        "API": "API 报错:请检查 API Key 是否过期,接口地址是否正确。",
        "部署": "部署失败:请检查 Docker 版本和端口占用情况。"
    }
    for key, val in docs.items():
        if key in query:
            return val
    return "未找到相关文档,请升级到人工客服。"

@tool
def query_order(order_id: str) -> str:
    """查询订单信息"""
    orders = {
        "ORD001": "订单ORD001:已发货,快递单号SF1234567890",
        "ORD002": "订单ORD002:待付款,请在24小时内完成支付",
        "ORD003": "订单ORD003:已签收,签收时间2025-01-15 14:30"
    }
    return orders.get(order_id, f"未找到订单{order_id},请核实单号。")

# ============ 2. 定义状态 ============

class CustomerServiceState(TypedDict):
    messages: Annotated[list, add]
    user_query: str
    category: str          # 问题分类
    agent_response: str    # Agent 回答
    need_human: bool       # 是否需要人工
    final_answer: str

# ============ 3. 定义 LLM ============

llm = ChatOpenAI(model="gpt-4", temperature=0)

# ============ 4. 定义节点 ============

def router_node(state: CustomerServiceState) -> dict:
    """路由 Agent:判断问题类型"""
    query = state["user_query"]
    response = llm.invoke([
        SystemMessage(content="""你是客服路由系统。根据用户问题判断类别:
- 技术问题(登录、API、报错、部署)→ 返回 "tech"
- 订单问题(订单、物流、退款、付款)→ 返回 "order"
- 其他问题(咨询、投诉、建议)→ 返回 "general"
只返回类别单词,不要其他内容。"""),
        HumanMessage(content=query)
    ])
    category = response.content.strip().lower()
    return {"category": category}

def tech_agent_node(state: CustomerServiceState) -> dict:
    """技术支持 Agent"""
    query = state["user_query"]
    tech_llm = llm.bind_tools([search_docs])
    response = tech_llm.invoke([
        SystemMessage(content="你是技术支持工程师。使用文档搜索工具查找解决方案,然后给出清晰的解答。"),
        HumanMessage(content=query)
    ])
    
    # 如果 LLM 调用了工具,执行工具
    if response.tool_calls:
        tool_result = search_docs.invoke(response.tool_calls[0]["args"])
        final = llm.invoke(f"根据文档回答用户问题:{query}\n文档内容:{tool_result}")
        return {"agent_response": final.content, "need_human": False}
    
    return {"agent_response": response.content, "need_human": "人工" in response.content}

def order_agent_node(state: CustomerServiceState) -> dict:
    """订单 Agent"""
    query = state["user_query"]
    order_llm = llm.bind_tools([query_order])
    response = order_llm.invoke([
        SystemMessage(content="""你是订单客服。从用户消息中提取订单号(ORD开头),查询订单信息后回答。
如果用户没有提供订单号,要求用户提供。"""),
        HumanMessage(content=query)
    ])
    
    if response.tool_calls:
        tool_result = query_order.invoke(response.tool_calls[0]["args"])
        final = llm.invoke(f"根据订单信息回答:{query}\n订单数据:{tool_result}")
        return {"agent_response": final.content, "need_human": False}
    
    return {"agent_response": response.content, "need_human": False}

def general_agent_node(state: CustomerServiceState) -> dict:
    """通用 Agent"""
    response = llm.invoke([
        SystemMessage(content="你是友善的客服代表。礼貌地回答用户的一般性问题。如果无法回答,建议转人工。"),
        HumanMessage(content=state["user_query"])
    ])
    need_human = "人工" in response.content or "转接" in response.content
    return {"agent_response": response.content, "need_human": need_human}

def human_review_node(state: CustomerServiceState) -> dict:
    """人工审核(实际会触发通知)"""
    return {
        "final_answer": f"您的问题已转给人工客服,请稍候。\n问题摘要:{state['user_query']}\nAgent初步回复:{state['agent_response']}"
    }

def response_node(state: CustomerServiceState) -> dict:
    """最终回复"""
    if state.get("final_answer"):
        return {"final_answer": state["final_answer"]}
    return {"final_answer": state["agent_response"]}

# ============ 5. 定义路由函数 ============

def route_by_category(state: CustomerServiceState) -> str:
    category = state.get("category", "general")
    if category == "tech":
        return "tech_agent"
    elif category == "order":
        return "order_agent"
    return "general_agent"

def check_need_human(state: CustomerServiceState) -> str:
    if state.get("need_human"):
        return "human_review"
    return "response"

# ============ 6. 构建图 ============

graph = StateGraph(CustomerServiceState)

# 添加节点
graph.add_node("router", router_node)
graph.add_node("tech_agent", tech_agent_node)
graph.add_node("order_agent", order_agent_node)
graph.add_node("general_agent", general_agent_node)
graph.add_node("human_review", human_review_node)
graph.add_node("response", response_node)

# 入口
graph.set_entry_point("router")

# 条件路由
graph.add_conditional_edges("router", route_by_category, {
    "tech_agent": "tech_agent",
    "order_agent": "order_agent",
    "general_agent": "general_agent"
})

# 各 Agent 完成后检查是否需要人工
graph.add_conditional_edges("tech_agent", check_need_human, {
    "human_review": "human_review",
    "response": "response"
})
graph.add_conditional_edges("order_agent", check_need_human, {
    "human_review": "human_review",
    "response": "response"
})
graph.add_conditional_edges("general_agent", check_need_human, {
    "human_review": "human_review",
    "response": "response"
})

# 汇总
graph.add_edge("human_review", END)
graph.add_edge("response", END)

# 编译
app = graph.compile()

# ============ 7. 测试 ============

# 技术问题
result = app.invoke({
    "user_query": "我登录的时候一直报错500怎么办",
    "messages": [], "category": "", "agent_response": "",
    "need_human": False, "final_answer": ""
})
print("技术问题:", result["final_answer"])

# 订单问题
result = app.invoke({
    "user_query": "我的订单ORD001到哪了",
    "messages": [], "category": "", "agent_response": "",
    "need_human": False, "final_answer": ""
})
print("订单问题:", result["final_answer"])

5.3 执行流程图

"登录报错500"           "订单ORD001到哪了"        "投诉你们服务差"
      │                        │                        │
      ▼                        ▼                        ▼
   Router                   Router                   Router
   (tech)                   (order)                  (general)
      │                        │                        │
      ▼                        ▼                        ▼
 Tech Agent              Order Agent             General Agent
 search_docs("登录")    query_order("ORD001")    直接回答
      │                        │                        │
      ▼                        ▼                        ▼
 "清除缓存重试"         "已发货SF123..."         "非常抱歉..."
      │                        │                   (需人工)
      ▼                        ▼                        ▼
   Response               Response              Human Review
      │                        │                        │
      ▼                        ▼                        ▼
    END                      END                      END

本文为系列上篇,完整系列:

上篇:从工具链到工作流编排(上)(本文)
下篇:从工具链到工作流编排(下)

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐