专栏第8篇:第五篇我们讲了工具调用的四层防御,第六篇讲了 MCP 协议,第七篇讲了 LangChain 的 LCEL。但这些都有一个共同点:单次调用。真正的 Agent 需要循环——LLM 决定调用工具 → 执行 → 结果返回 → LLM 再决定…直到生成最终回答。今天我们要讲的 LangGraph,就是专门解决这个"循环工作流"问题的状态机框架。


目录


一、为什么需要 LangGraph?手动管理循环太累了

第五篇和第六篇讲的工具调用,都是"一问一答"模式:用户提问 → LLM 决定调用 → 执行工具 → 返回结果 → 结束。

但现实中的 Agent 往往不是一次就能搞定的:

用户:帮我查一下北京和上海的人口,再算一下人均 GDP

第1轮:
  LLM → 需要调用 get_population(北京)、get_population(上海)
  执行 → 返回两个结果
  
第2轮:
  LLM → 还需要调用 get_gdp(北京)、get_gdp(上海)
  执行 → 返回两个结果
  
第3轮:
  LLM → 现在有所有数据了,计算人均 GDP
  执行 → 返回计算结果
  
第4轮:
  LLM → 生成最终回答

1.1 手动管理的问题

如果不用框架,代码会写成这样:

# 第1轮
response1 = llm.invoke(messages)
if response1.tool_calls:
    results1 = execute_tools(response1.tool_calls)
    messages.extend([response1] + results1)
    
    # 第2轮
    response2 = llm.invoke(messages)
    if response2.tool_calls:
        results2 = execute_tools(response2.tool_calls)
        messages.extend([response2] + results2)
        
        # 第3轮...第N轮
        # 代码层层嵌套,无法扩展

问题很明显:

  • 代码嵌套:几轮调用就嵌套几层
  • 状态管理混乱:messages 列表手动维护,容易出错
  • 没有条件分支:无法根据结果走不同的流程
  • 无法持久化:重启后对话历史丢失

1.2 LangGraph 的解决思路

LangGraph 把上面的循环抽象成了状态机

START

LLM节点

需要工具?

工具节点

END

核心思想

  • State 保存整个工作流的数据(对话历史、中间结果)
  • Node 定义每个处理步骤(LLM 决策、工具执行)
  • Edge 定义节点之间的流转关系
  • 条件路由 让 LLM 自己决定走哪条路

二、StateGraph 三要素:State、Node、Edge

2.1 State:共享数据容器

State 是整个工作流的"内存",所有节点都能读写。用 TypedDict 定义:

from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages

class AgentState(TypedDict):
    # Annotated[list, add_messages] → 追加模式,新消息添加到末尾
    messages: Annotated[list, add_messages]
    # 普通字段 → 覆盖模式
    iteration: int

关键设计messagesAnnotated[list, add_messages] 包装,表示这条字段是追加更新的——新消息添加到列表末尾,而不是替换。这保证了对话历史的完整性。

2.2 Node:处理函数

Node 是工作流中的处理步骤。每个 Node 接收 State,返回需要更新的字段:

def llm_node(state: AgentState) -> dict:
    """LLM 决策节点"""
    llm = create_llm_instance()
    llm_with_tools = llm.bind_tools(tools)
    
    response = llm_with_tools.invoke(state["messages"])
    
    return {
        "messages": [response],           # 追加到 messages
        "iteration": state["iteration"] + 1  # 覆盖 iteration
    }

def tool_node(state: AgentState) -> dict:
    """工具执行节点"""
    last_msg = state["messages"][-1]
    
    tool_messages = []
    for tc in last_msg.tool_calls:
        result = tool_map[tc["name"]].invoke(tc["args"])
        tool_messages.append(ToolMessage(
            content=str(result),
            tool_call_id=tc["id"]
        ))
    
    return {"messages": tool_messages}  # 追加工具结果

Node 的返回值规则:只返回需要更新的字段,未返回的字段保持原值。

2.3 Edge:节点连接

Edge 定义数据怎么流:

from langgraph.graph import StateGraph, START, END

# 1. 创建图
workflow = StateGraph(AgentState)

# 2. 添加节点
workflow.add_node("llm", llm_node)
workflow.add_node("tools", tool_node)

# 3. 添加边(固定流转)
workflow.add_edge(START, "llm")      # 入口
workflow.add_edge("tools", "llm")    # 工具执行后回到 LLM

# 4. 编译
app = workflow.compile()

# 5. 运行
result = app.invoke({
    "messages": [HumanMessage(content="搜索Python信息")],
    "iteration": 0
})

三、条件路由:Agent 自己决定下一步走哪

前面的例子中,工具执行后总是回到 LLM。但如果 LLM 已经不需要工具了呢?我们需要条件路由

3.1 条件边的定义

def should_continue(state: AgentState) -> str:
    """判断是继续调用工具还是结束"""
    last_msg = state["messages"][-1]
    iteration = state["iteration"]
    
    # 安全阀:最多5轮,防止死循环
    if iteration >= 5:
        return END
    
    # 如果 LLM 返回了 tool_calls,继续执行工具
    if last_msg.tool_calls:
        return "tools"
    
    # 否则结束
    return END

# 添加条件边
workflow.add_conditional_edges(
    "llm",                    # 从哪个节点出发
    should_continue,          # 路由函数
    {                         # 路由映射
        "tools": "tools",     # 返回"tools" → 走到 tools 节点
        END: END               # 返回 END → 结束
    }
)

3.2 完整工作流图

有tool_calls

无tool_calls

START

LLM节点

should_continue

工具节点

END

3.3 条件路由的执行流程

第1轮:
  用户提问 → LLM 节点 → 返回 tool_calls → 条件路由 → 走 tools 节点

第2轮:
  tools 节点 → 工具结果追加到 messages → 回到 LLM 节点
  LLM 再次判断 → 如果还需要工具 → 继续循环
  LLM 再次判断 → 如果不需要了 → 走 END

第N轮:
  达到最大迭代次数 → 强制 END(防止死循环)

四、状态持久化:对话记忆与多用户隔离

客服机器人需要记住之前的对话。StateGraph 通过 Checkpointer 实现状态持久化。

4.1 MemorySaver:内存级持久化

from langgraph.checkpoint.memory import MemorySaver

# 编译时加入 Checkpointer
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)

# 用 thread_id 区分不同会话
config = {"configurable": {"thread_id": "user-001"}}

# 第1轮
r1 = app.invoke(
    {"messages": [HumanMessage(content="我叫张三")]},
    config=config
)

# 第2轮(同一会话,自动携带历史)
r2 = app.invoke(
    {"messages": [HumanMessage(content="我叫什么名字?")]},
    config=config
)
# AI 会回答:你叫张三

4.2 多会话隔离

# 用户A的会话
config_a = {"configurable": {"thread_id": "user-A"}}

# 用户B的会话
config_b = {"configurable": {"thread_id": "user-B"}}

# 两个会话互不干扰,同一个 StateGraph 实例服务多个用户

4.3 生产环境的持久化方案

方案 特点 适用场景
MemorySaver 内存存储,重启丢失 开发调试
SQLite 轻量级文件数据库 小规模部署
PostgreSQL 关系型数据库 生产环境
Redis 内存数据库 高并发场景

五、流式执行:实时查看每个节点的输出

除了 invoke()(等待全部完成),StateGraph 还支持 stream()(逐步返回每个节点的输出):

# 流式执行
for event in app.stream({
    "messages": [HumanMessage(content="什么是AI Agent?")],
    "iteration": 0
}):
    for node_name, node_output in event.items():
        print(f"节点 {node_name} 输出:{node_output}")

# 输出:
# 节点 llm 输出:{messages: [AIMessage], iteration: 1}
# 节点 tools 输出:{messages: [ToolMessage]}
# 节点 llm 输出:{messages: [AIMessage], iteration: 2}

stream 的价值

  • 实时展示 Agent 的思考过程(“正在搜索…”、“正在计算…”)
  • 便于调试,能看到每个节点的中间状态
  • 生产环境中配合 SSE 实现流式响应

六、实战:80 行代码的客服机器人

把前面的知识串起来,实现一个完整的客服机器人:

from typing import TypedDict, Annotated
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage
from langchain_core.tools import tool
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.memory import MemorySaver

# ========== 1. 定义工具 ==========
@tool
def search_knowledge(query: str) -> str:
    """搜索知识库,输入搜索关键词"""
    mock_data = {
        "Python": "安装步骤:1.访问官网下载 2.运行安装程序 3.勾选Add to PATH 4.点击Install",
        "Git": "使用步骤:1.git init初始化 2.git add添加文件 3.git commit提交 4.git push推送到远程",
    }
    for key, value in mock_data.items():
        if key in query:
            return value
    return f"未找到与 {query} 相关的信息"

@tool
def calculate(expression: str) -> str:
    """计算数学表达式"""
    try:
        return f"计算结果:{eval(expression)}"
    except:
        return "计算错误"

tools = [search_knowledge, calculate]
tool_map = {t.name: t for t in tools}

# ========== 2. 定义状态 ==========
class ChatState(TypedDict):
    messages: Annotated[list, add_messages]
    iteration: int

# ========== 3. 定义节点 ==========
def llm_node(state: ChatState) -> dict:
    llm = create_llm_instance(temperature=0)
    llm_with_tools = llm.bind_tools(tools)
    response = llm_with_tools.invoke(state["messages"])
    return {
        "messages": [response],
        "iteration": state.get("iteration", 0) + 1
    }

def tool_node(state: ChatState) -> dict:
    last_msg = state["messages"][-1]
    tool_messages = []
    for tc in last_msg.tool_calls:
        result = tool_map[tc["name"]].invoke(tc["args"])
        tool_messages.append(ToolMessage(
            content=str(result),
            tool_call_id=tc["id"]
        ))
    return {"messages": tool_messages}

# ========== 4. 条件路由 ==========
def should_continue(state: ChatState) -> str:
    last_msg = state["messages"][-1]
    iteration = state.get("iteration", 0)
    if iteration >= 5:
        return END
    if isinstance(last_msg, AIMessage) and last_msg.tool_calls:
        return "tools"
    return END

# ========== 5. 构建工作流 ==========
workflow = StateGraph(ChatState)
workflow.add_node("llm", llm_node)
workflow.add_node("tools", tool_node)
workflow.add_edge(START, "llm")
workflow.add_conditional_edges("llm", should_continue, {"tools": "tools", END: END})
workflow.add_edge("tools", "llm")

# ========== 6. 编译(带记忆) ==========
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)

# ========== 7. 运行 ==========
config = {"configurable": {"thread_id": "user-001"}}

result = app.invoke(
    {"messages": [HumanMessage(content="Python怎么安装?然后帮我算一下1250*8")]},
    config=config
)

# 提取最终回答
for msg in reversed(result["messages"]):
    if isinstance(msg, AIMessage) and msg.content:
        print(f"AI:{msg.content}")
        break

执行流程

用户:Python怎么安装?然后帮我算一下1250*8

LLM 节点 → 返回 2 个 tool_calls:
  → search_knowledge(query="Python安装")
  → calculate(expression="1250*8")

工具节点 → 并行执行 → 返回 2 个 ToolMessage

LLM 节点 → 基于工具结果 → 生成最终回答

AI:Python安装步骤是... 1250*8 的计算结果是 10000

七、总结

本文从"手动管理循环的痛点"出发,梳理了 LangGraph 的核心机制:

  1. 为什么需要 LangGraph:手动管理循环会导致代码嵌套、状态混乱、无法扩展
  2. StateGraph 三要素:State(共享数据)、Node(处理函数)、Edge(流转方向)
  3. 条件路由add_conditional_edges 让 Agent 动态决定下一步走向,防止死循环
  4. 状态持久化:Checkpointer + thread_id 实现对话记忆和多会话隔离
  5. 流式执行stream() 逐步获取每个节点输出,适合实时展示
  6. 实战代码:80 行客服机器人,支持工具调用、循环决策、状态记忆

💡 LangGraph 的定位:它不是替代 LangChain,而是在 LangChain 之上增加状态管理能力。LCEL 负责"组件组合",StateGraph 负责"流程编排"。


参考资源

  • LangGraph Documentation: StateGraph API
  • LangChain Documentation: Function Calling
  • LangGraph Cookbook: Building Agents
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐