在这里插入图片描述


一、前言

在当今人工智能应用蓬勃发展的时代,构建能够处理复杂、多步骤任务的智能系统成为开发者面临的重要挑战。传统的线性代码流程难以适应需要状态管理、条件分支和循环迭代的AI工作流。为此,LangChain团队推出了LangGraph——一个基于图(Graph)的编程框架,专门为创建具有复杂逻辑和状态持久化能力的大语言模型(LLM)应用而设计。

LangGraph的核心思想是将应用程序建模为一个有向图,其中节点代表执行单元(如调用LLM、执行代码),定义节点间的流转逻辑,而状态对象则随着数据在图中流动而不断演化。这种范式使得开发者能够清晰地构建包含条件判断、循环、并行执行和外部工具调用的复杂链(Chain),极大地增强了AI Agent、聊天机器人、自动化工作流等应用的表达能力和可控性。

本文将系统介绍LangGraph的基础概念与核心图形API。我们将从状态、节点、边、图等基本构建块开始,逐步深入到Send、Command、Interrupt等高级控制流操作,并通过大量可运行的代码示例展示如何利用这些API构建功能强大的应用。无论您是希望为现有应用添加智能,还是探索下一代AI工作流编排,相信LangGraph都能为您提供强大而优雅的解决方案。


二、基础知识

在深入LangGraph的具体API之前,我们必须理解其构建复杂工作流所依赖的几个核心抽象:状态、节点、边和图。这些概念共同构成了LangGraph编程模型的基础。

2.1 状态

在LangGraph中,状态是一个中心化的、可变的字典(或类似结构),它在图的整个执行生命周期中存在并演化。状态是节点之间通信和数据传递的唯一媒介。每个节点读取状态的一部分,执行计算,并将结果写回状态。这种设计使得工作流具有清晰的输入输出定义和强大的可观测性。

状态通常用一个Pydantic BaseModel 或 TypedDict 来定义,这为状态的结构提供了类型提示和验证。让我们通过一个简单的示例来理解状态的定义与使用。

# 示例 2.1: 定义和使用状态
from typing import Annotated, TypedDict
from typing_extensions import TypedDict
import operator
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI

# 1. 定义状态结构
# 我们定义一个名为MyState的类型字典,它包含两个字段:
# - question: 用户输入的问题字符串
# - answer: 工作流最终生成的答案字符串
# 使用Annotated和operator.add来指定在多个节点写入同一字段时的合并策略。
# 这里我们为`answer`字段指定“追加”策略,意味着如果多个节点都修改`answer`,它们的结果会被拼接起来。
class MyState(TypedDict):
    question: str
    answer: Annotated[str, operator.add]  # 使用`add`操作符表示追加合并

# 2. 定义节点函数
# 节点是图中执行特定任务的单元。它接收当前状态,并返回要更新到状态中的字典。
def generate_greeting(state: MyState):
    """第一个节点:生成问候语"""
    # 从状态中读取用户问题(虽然在此节点未使用)
    user_question = state.get("question", "")
    # 准备要更新到状态的内容
    # 根据合并策略,此字符串将被追加到`answer`字段的末尾
    return {"answer": "你好!我已收到您的问题。\n"}

def process_question(state: MyState):
    """第二个节点:处理问题并生成答案"""
    user_question = state.get("question", "")
    if not user_question:
        response = "您似乎没有提出具体问题。"
    else:
        # 这里模拟一个简单的处理逻辑。在实际应用中,这里可能会调用LLM或其他工具。
        response = f"您的问题是:'{user_question}'\n这是一个非常好的问题,涉及人工智能领域。"
    # 将处理结果追加到答案中
    return {"answer": response}

def add_conclusion(state: MyState):
    """第三个节点:添加结束语"""
    return {"answer": "\n感谢您的提问!"}

# 3. 构建图
# 创建状态图构建器,并指定状态的结构类型
builder = StateGraph(MyState)

# 4. 将节点函数添加到图中
# 每个节点需要一个唯一的名字
builder.add_node("greet", generate_greeting)
builder.add_node("process", process_question)
builder.add_node("conclude", add_conclusion)

# 5. 设置入口点
# 指定工作流从哪个节点开始执行
builder.set_entry_point("greet")

# 6. 添加边来定义节点间的执行顺序
# 从“greet”节点执行完后,自动跳转到“process”节点
builder.add_edge("greet", "process")
# 从“process”节点执行完后,自动跳转到“conclude”节点
builder.add_edge("process", "conclude")
# 从“conclude”节点执行完后,工作流结束
builder.add_edge("conclude", END)

# 7. 编译图
# 将构建器编译成一个可执行的计算图对象
graph = builder.compile()

# 8. 执行图
# 通过invoke方法运行图,需要传入初始状态
initial_state = {"question": "请解释LangGraph是什么?"}
final_state = graph.invoke(initial_state)

# 9. 查看结果
print("最终状态中的答案:")
print(final_state["answer"])
print("\n完整状态:")
print(final_state)

代码解析与核心概念

  1. 状态定义MyState 类定义了工作流状态的结构。Annotated[str, operator.add] 是关键,它声明了 answer 字段的合并策略。operator.add 意味着“追加”,这是LangGraph处理状态冲突的核心机制。当多个节点并行执行或一个节点多次写入同一字段时,合并策略定义了如何整合这些更新。
  2. 节点:节点是执行具体工作的函数。它接收一个状态对象(类型为定义的状态类),并返回一个字典,其中包含要更新到状态中的键值对。节点不应该直接修改传入的状态对象,而是返回增量更新。
  3. 图与边:我们将节点添加到图中,然后用边将它们连接起来,形成一个有向执行链。set_entry_point 定义了起点,END 是一个特殊的节点,表示工作流终止。

运行上面的代码,你会看到输出结果,其中answer字段是三个节点依次追加内容的结果。这展示了状态是如何随着在图中流动而演化的。

2.2 节点

节点是LangGraph工作流中的基本计算单元。每个节点封装了一个特定的操作或任务。如示例所示,节点函数接收状态,返回更新。节点的设计应该尽量保持功能单一和可复用。除了普通函数,节点也可以是:

  • LangChain可运行对象:如 LCEL 链、工具等。
  • 其他LangGraph:这实现了图的嵌套,是构建模块化复杂应用的关键。
# 示例 2.2: 使用LangChain组件作为节点
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 定义一个新的、更复杂的状态
class AnalysisState(TypedDict):
    topic: str
    analysis: Annotated[str, operator.add]
    summary: str

# 创建一个LangChain链,它将作为图中的一个节点
llm = ChatOpenAI(model="gpt-3.5-turbo") # 请确保已设置OPENAI_API_KEY环境变量
prompt = ChatPromptTemplate.from_messages([
    ("system", "你是一位技术分析师。"),
    ("user", "请对以下主题进行简要分析:{topic}")
])
analysis_chain = prompt | llm | StrOutputParser()

def call_analysis_chain(state: AnalysisState):
    """节点函数:调用LangChain链进行分析"""
    topic = state.get("topic", "无主题")
    # 调用链,传入状态中的topic
    result = analysis_chain.invoke({"topic": topic})
    # 返回更新
    return {"analysis": f"分析结果:{result}\n"}

def human_summarize(state: AnalysisState):
    """另一个节点:人工总结(模拟)"""
    # 模拟一个基于分析结果生成总结的逻辑
    analysis_text = state.get("analysis", "")
    if "强大" in analysis_text or "优秀" in analysis_text:
        summary = "该技术前景广阔。"
    else:
        summary = "该技术需要进一步观察。"
    return {"summary": summary}

# 构建图
builder = StateGraph(AnalysisState)
builder.add_node("analyze", call_analysis_chain)
builder.add_node("summarize", human_summarize)
builder.set_entry_point("analyze")
builder.add_edge("analyze", "summarize")
builder.add_edge("summarize", END)

graph = builder.compile()

# 执行
result = graph.invoke({"topic": "大语言模型的发展"})
print("分析内容:", result.get("analysis"))
print("总结:", result.get("summary"))

这个例子展示了如何将现有的LangChain链无缝集成到LangGraph中作为节点,实现了传统链与图工作流的结合。

2.3 边

边定义了图中节点之间的流转逻辑。最基本的边是固定边,它从一个节点无条件地指向另一个节点(或END)。但LangGraph的强大之处在于条件边

条件边允许根据当前状态的值,动态决定下一个要执行的节点。这为实现分支、循环等复杂逻辑提供了可能。条件边通过一个函数(通常称为路由函数)来实现,该函数检查状态并返回下一个节点的名称。

# 示例 2.3: 使用条件边实现分支逻辑
class BranchState(TypedDict):
    query: str
    category: str
    response: Annotated[str, operator.add]

def classify_query(state: BranchState):
    """节点:对查询进行分类"""
    query = state.get("query", "").lower()
    if "价格" in query or "多少钱" in query:
        category = "price"
    elif "功能" in query or "做什么" in query:
        category = "feature"
    else:
        category = "general"
    # 将分类结果写入状态
    return {"category": category}

def handle_price(state: BranchState):
    """分支节点:处理价格查询"""
    return {"response": "价格相关处理逻辑已执行。\n"}

def handle_feature(state: BranchState):
    """分支节点:处理功能查询"""
    return {"response": "功能相关处理逻辑已执行。\n"}

def handle_general(state: BranchState):
    """分支节点:处理一般查询"""
    return {"response": "一般咨询处理逻辑已执行。\n"}

def route_after_classify(state: BranchState):
    """条件边路由函数:根据分类结果决定下一个节点"""
    category = state.get("category")
    if category == "price":
        return "price_node"
    elif category == "feature":
        return "feature_node"
    else:
        return "general_node"

# 构建图
builder = StateGraph(BranchState)
# 添加节点
builder.add_node("classify", classify_query)
builder.add_node("price_node", handle_price)
builder.add_node("feature_node", handle_feature)
builder.add_node("general_node", handle_general)

# 设置入口点
builder.set_entry_point("classify")

# 添加条件边
# 关键:将`classify`节点连接到条件路由函数,而非固定节点
builder.add_conditional_edges(
    "classify",        # 源节点
    route_after_classify, # 路由函数,它返回下一个节点的名字
    {
        "price_node": "price_node",   # 路由函数可能返回的值到实际节点的映射(可选,用于文档)
        "feature_node": "feature_node",
        "general_node": "general_node"
    }
)
# 为每个分支节点添加固定边,指向结束
builder.add_edge("price_node", END)
builder.add_edge("feature_node", END)
builder.add_edge("general_node", END)

graph = builder.compile()

# 测试不同查询
test_queries = ["这个产品多少钱?", "它有哪些主要功能?", "你们公司地址在哪?"]
for q in test_queries:
    print(f"\n输入查询: {q}")
    result = graph.invoke({"query": q})
    print(f"路由分类: {result.get('category')}")
    print(f"响应: {result.get('response')}")

代码解析

  • add_conditional_edges 是创建条件边的核心API。它接收一个源节点、一个路由函数和一个可选的映射字典。
  • 路由函数 route_after_classify 接收当前状态,并必须返回一个字符串,该字符串是下一个要执行的节点的名称
  • 运行此代码,你会发现对于不同的输入查询,工作流会走不同的分支路径,实现了动态路由。

2.4 图

图是节点和边的集合,它定义了工作流的整体拓扑结构。LangGraph中的图是有状态的,意味着它维护着一个状态对象贯穿执行始终。图通过 builder.compile() 编译后,得到一个可调用对象,其核心方法是:

  • invoke(initial_state): 以初始状态同步执行整个图,返回最终状态。
  • astream(initial_state): 以初始状态异步流式执行图,返回一个异步迭代器,可以逐步产出状态更新。这对于实时UI更新或长时间运行任务非常有用。

图本身也可以作为一个节点,嵌入到另一个更大的图中,这称为子图,是构建分层、模块化应用的核心模式,我们将在API章节详细讨论。

三、图形API

在掌握基础概念后,我们来探索LangGraph提供的一系列强大API,它们用于精细控制工作流的执行流、交互和结构。

3.1 Send 发送

Send API 用于动态地向图中添加任务或消息,通常在需要从某个节点“触发”另一个节点的执行时使用。它是实现事件驱动或特定工作流模式(如循环直到满足条件)的关键。然而,在标准 StateGraph 中,执行流主要由边定义。Send 的概念在更底层的 MessageGraph 或通过 tools 调用时更常见。

StateGraph 范式下,更常见的模式是使用条件边来实现循环,模拟“发送”任务给自己或他人的效果。例如,构建一个“循环直到满足条件”的节点:

# 示例 3.1: 使用条件边实现循环(模拟Send行为)
class LoopState(TypedDict):
    count: int
    target: int
    log: Annotated[str, operator.add]

def increment_and_check(state: LoopState):
    """循环节点:递增计数器并检查条件"""
    current = state.get("count", 0)
    target = state.get("target", 5)
    new_count = current + 1
    update_log = f"计数从 {current} 增加到 {new_count}.\n"
    
    # 准备更新
    updates = {"count": new_count, "log": update_log}
    return updates

def should_continue(state: LoopState) -> str:
    """条件路由函数:决定继续循环还是结束"""
    current = state.get("count", 0)
    target = state.get("target", 5)
    if current < target:
        # 返回当前节点的名字,实现循环
        return "increment_node"
    else:
        return "__end__"  # 特殊值,表示结束

# 构建图
builder = StateGraph(LoopState)
builder.add_node("increment_node", increment_and_check)
builder.set_entry_point("increment_node")
# 关键:节点通过条件边连接到自己,形成循环潜力
builder.add_conditional_edges(
    "increment_node",
    should_continue
)
graph = builder.compile()

# 执行循环
print("执行循环示例:")
result = graph.invoke({"count": 0, "target": 3})
print(f"最终计数: {result.get('count')}")
print("执行日志:")
print(result.get("log"))

这个例子展示了如何通过条件边让一个节点重复执行自身,直到状态满足退出条件,这本质上是将任务“发送”给自己。

3.2 Command 命令

Command 并不是LangGraph API中的一个独立函数,而是一个概念,通常指在状态中设置特定的指令或标志,并由某个节点(通常是一个路由节点或专门的命令处理节点)来解析和执行该指令,从而改变工作流的走向。这类似于在聊天机器人中,用户输入“/help”会触发帮助流程。

我们可以利用状态和条件边来实现命令模式。

# 示例 3.2: 实现简单的命令模式
class CommandState(TypedDict):
    user_input: str
    command: str
    output: Annotated[str, operator.add]

def parse_command(state: CommandState):
    """节点:解析用户输入,提取命令"""
    user_input = state.get("user_input", "").strip()
    if user_input.startswith("/"):
        # 简单解析命令,例如 "/help" -> "help"
        cmd = user_input[1:].split()[0]  # 获取‘/’后的第一个词
    else:
        cmd = "chat"  # 非命令输入视为普通聊天
    return {"command": cmd}

def route_by_command(state: CommandState) -> str:
    """路由函数:根据解析出的命令路由到不同节点"""
    cmd = state.get("command", "chat")
    if cmd == "help":
        return "help_node"
    elif cmd == "info":
        return "info_node"
    elif cmd == "chat":
        return "chat_node"
    else:
        return "unknown_cmd_node"

def handle_help(state: CommandState):
    return {"output": "可用命令:/help, /info, /chat\n"}

def handle_info(state: CommandState):
    return {"output": "这是基于LangGraph构建的示例Bot。\n"}

def handle_chat(state: CommandState):
    user_input = state.get("user_input", "")
    return {"output": f"您说:{user_input}\n(这是普通聊天回复)\n"}

def handle_unknown(state: CommandState):
    cmd = state.get("command", "")
    return {"output": f"未知命令 '{cmd}'。\n"}

# 构建图
builder = StateGraph(CommandState)
builder.add_node("parse", parse_command)
builder.add_node("help_node", handle_help)
builder.add_node("info_node", handle_info)
builder.add_node("chat_node", handle_chat)
builder.add_node("unknown_cmd_node", handle_unknown)

builder.set_entry_point("parse")
# 根据命令路由到不同处理节点
builder.add_conditional_edges("parse", route_by_command)
# 所有处理节点都直接结束
for node in ["help_node", "info_node", "chat_node", "unknown_cmd_node"]:
    builder.add_edge(node, END)

graph = builder.compile()

# 测试不同输入
test_inputs = ["你好", "/help", "/info 详细资料", "/invalid"]
for inp in test_inputs:
    print(f"\n输入: {inp}")
    result = graph.invoke({"user_input": inp})
    print(f"输出: {result.get('output')}")

在这个实现中,user_input 中的“/help”等字符串就是命令parse_command节点充当了解析器,将命令提取到状态中,随后的条件边根据这个命令值决定工作流的走向。

3.3 interrupt 中断

interrupt 机制允许工作流在特定点暂停,等待外部输入或干预,然后再继续执行。这是构建交互式Agent(如需要用户确认、提供额外信息)的基石。在LangGraph中,这通常通过在状态中设置一个特殊标志,并由条件边检查该标志来实现,或者更正式地,通过预定义的中断点。

下面是一个模拟中断的示例:工作流执行一个任务,然后在“审批”节点中断,等待(模拟的)外部批准信号,再决定是继续执行成功流程还是失败流程。

# 示例 3.3: 实现中断机制
from typing import Literal

class InterruptState(TypedDict):
    task: str
    # 用于控制流程的字段
    status: Literal["processing", "awaiting_approval", "approved", "rejected", "completed"]
    result: Annotated[str, operator.add]
    # 模拟外部输入(在实际应用中,这可能来自用户或API)
    external_approval: bool

def process_task(state: InterruptState):
    """节点1:处理任务"""
    task = state.get("task", "N/A")
    return {
        "result": f"开始处理任务:'{task}'...\n",
        "status": "processing"
    }

def request_approval(state: InterruptState):
    """节点2:请求批准(中断点)"""
    # 在实际场景中,这里可能会:
    # 1. 向用户发送消息
    # 2. 将状态持久化到数据库
    # 3. 暂停图的执行,返回一个包含“interrupt”信息的响应
    # 本例中,我们通过状态模拟“已发出请求,正在等待”
    print("[系统] 任务需要审批。当前状态已暂停,等待 external_approval 信号。")
    # 将状态设为等待审批,这将导致路由函数决定下一步
    return {
        "result": "任务已提交,等待审批中...\n",
        "status": "awaiting_approval"
    }

def check_approval(state: InterruptState) -> str:
    """条件路由函数:检查状态,决定流程"""
    current_status = state.get("status", "processing")
    if current_status == "processing":
        # 刚处理完任务,去请求审批
        return "approval_node"
    elif current_status == "awaiting_approval":
        # 关键:检查模拟的外部输入
        is_approved = state.get("external_approval", False)
        if is_approved:
            return "execute_node"
        else:
            return "reject_node"
    elif current_status in ["approved", "rejected"]:
        # 执行或拒绝后,工作流结束
        return "__end__"
    else:
        return "__end__"

def execute_approved_task(state: InterruptState):
    """节点3:执行已批准的任务"""
    return {
        "result": "审批通过!任务执行成功。\n",
        "status": "completed"
    }

def handle_rejection(state: InterruptState):
    """节点4:处理被拒绝的任务"""
    return {
        "result": "审批被拒绝。任务已取消。\n",
        "status": "completed"
    }

# 构建图
builder = StateGraph(InterruptState)
builder.add_node("process_node", process_task)
builder.add_node("approval_node", request_approval)
builder.add_node("execute_node", execute_approved_task)
builder.add_node("reject_node", handle_rejection)

builder.set_entry_point("process_node")
# 整个流程由 check_approval 函数控制
builder.add_conditional_edges("process_node", check_approval)
builder.add_conditional_edges("approval_node", check_approval)
builder.add_conditional_edges("execute_node", check_approval)
builder.add_conditional_edges("reject_node", check_approval)

graph = builder.compile()

# 模拟执行 - 第一次调用,进入等待审批状态
print("=== 第一次调用(任务开始,等待审批)===")
state_after_pause = graph.invoke({
    "task": "部署新服务",
    "status": "processing",
    "external_approval": False  # 初始没有批准
})
print("当前结果:", state_after_pause.get("result"))
print("当前状态:", state_after_pause.get("status"))
print("\n... 模拟外部系统或用户进行审批 ...\n")

# 模拟第二次调用,传入“批准”信号,从暂停点继续
print("=== 第二次调用(传入批准信号)===")
# 关键:我们传入更新后的状态,其中 external_approval 为 True
state_after_pause["external_approval"] = True
final_state = graph.invoke(state_after_pause)  # 从之前的状态继续执行
print("最终结果:", final_state.get("result"))
print("最终状态:", final_state.get("status"))

核心机制解析

  1. 状态作为暂停点request_approval 节点将状态设置为 awaiting_approval 并返回。此时,图的一次 invoke 调用结束。
  2. 外部干预:应用程序(可能是Web服务器)将当前状态保存起来(例如在数据库或会话中)。然后等待真实世界的交互(如用户点击“批准”按钮)。
  3. 继续执行:当外部事件发生时(如收到批准),应用程序加载保存的状态,更新相关字段(如将 external_approval 设为 True),然后再次调用 graph.invoke(),并传入这个更新后的状态。
  4. 路由决策:图从上次中断的节点(通过状态中的status标识)继续执行。条件路由函数 check_approval 检查到状态为 awaiting_approvalexternal_approvalTrue,于是路由到 execute_node,完成后续流程。

这种模式完美地模拟了“中断-继续”的交互式工作流,是构建复杂Agent的核心。

3.4 Subgraphs 子图

子图是LangGraph中实现模块化和复用的关键特性。你可以将一个编译好的图作为节点,添加到另一个图中。这有助于管理复杂性,将大型工作流分解为可重用的、逻辑独立的子组件。

# 示例 3.4: 创建和使用子图
class SubgraphState(TypedDict):
    input_data: str
    # 子图A的输出
    processed_by_a: Annotated[str, operator.add]
    # 子图B的输出
    processed_by_b: Annotated[str, operator.add]
    final_output: str

# --- 创建子图A:字符串处理流水线 ---
class StateA(TypedDict):
    value: str
    upper: str
    length: int

def to_upper(state: StateA):
    return {"upper": state.get("value", "").upper()}

def calc_length(state: StateA):
    val = state.get("value", "")
    return {"length": len(val)}

builder_a = StateGraph(StateA)
builder_a.add_node("upper_node", to_upper)
builder_a.add_node("length_node", calc_length)
builder_a.set_entry_point("upper_node")
builder_a.add_edge("upper_node", "length_node")
builder_a.add_edge("length_node", END)
subgraph_a = builder_a.compile()  # 编译子图A

# --- 创建子图B:数字处理流水线(模拟)---
class StateB(TypedDict):
    number_str: str
    squared: int

def square_number(state: StateB):
    num = int(state.get("number_str", "0"))
    return {"squared": num * num}

builder_b = StateGraph(StateB)
builder_b.add_node("square_node", square_number)
builder_b.set_entry_point("square_node")
builder_b.add_edge("square_node", END)
subgraph_b = builder_b.compile()  # 编译子图B

# --- 主图:协调子图A和子图B ---
def preprocess(state: SubgraphState):
    """主图节点1:预处理,为子图准备数据"""
    data = state.get("input_data", "")
    return {"processed_by_a": f"原始数据: {data}\n"}

def call_subgraph_a(state: SubgraphState):
    """主图节点2:调用子图A"""
    data = state.get("input_data", "")
    # 调用子图A,需要传入符合StateA结构的初始状态
    result_a = subgraph_a.invoke({"value": data})
    # 从子图A的结果中提取我们需要的信息
    extracted = f"大写: {result_a['upper']}, 长度: {result_a['length']}\n"
    return {"processed_by_a": extracted}

def call_subgraph_b(state: SubgraphState):
    """主图节点3:调用子图B"""
    data = state.get("input_data", "")
    # 假设我们想处理字符串的长度
    length = len(data)
    # 调用子图B,传入数字字符串
    result_b = subgraph_b.invoke({"number_str": str(length)})
    extracted = f"长度平方: {result_b['squared']}\n"
    return {"processed_by_b": extracted}

def aggregate_results(state: SubgraphState):
    """主图节点4:聚合结果"""
    from_a = state.get("processed_by_a", "")
    from_b = state.get("processed_by_b", "")
    final = f"汇总:\n子图A结果:{from_a}子图B结果:{from_b}"
    return {"final_output": final}

# 构建主图
main_builder = StateGraph(SubgraphState)
main_builder.add_node("preprocess", preprocess)
# 注意:call_subgraph_a 和 call_subgraph_b 是主图的节点函数,
# 它们在函数内部调用子图的invoke方法。这不是将子图直接作为节点添加。
# 更高级的用法可以使用 `add_node("subgraph_a", subgraph_a)` 如果子图与主图状态兼容。
main_builder.add_node("run_a", call_subgraph_a)
main_builder.add_node("run_b", call_subgraph_b)
main_builder.add_node("aggregate", aggregate_results)

main_builder.set_entry_point("preprocess")
main_builder.add_edge("preprocess", "run_a")
main_builder.add_edge("run_a", "run_b")
main_builder.add_edge("run_b", "aggregate")
main_builder.add_edge("aggregate", END)

main_graph = main_builder.compile()

# 执行主图
print("执行包含子图的工作流:")
final_result = main_graph.invoke({"input_data": "HelloWorld"})
print("最终输出:", final_result["final_output"])
print("\n完整状态:")
for k, v in final_result.items():
    print(f"  {k}: {v}")

说明

  • 子图是独立的、可编译和可测试的单元。它们可以有自己的状态结构。
  • 在主图中调用子图,通常需要在主图的节点函数中手动调用 subgraph.invoke(...),并处理好状态映射(将主图状态转换为子图需要的状态,以及将子图结果写回主图状态)。
  • LangGraph也支持更紧密的集成(例如,如果状态结构设计得当,可以用 add_node(“subgraph_name”, compiled_subgraph) 直接添加子图作为节点),但手动调用提供了最大的灵活性,尤其是在状态结构不同时。

3.5 Streaming 流式处理机制

流式处理对于提供实时反馈的用户体验至关重要,例如在聊天应用中逐词显示AI的回复。LangGraph原生支持流式输出,不仅支持最终结果的流式输出,还支持整个状态变化过程的流式输出,这为调试和构建交互式界面提供了极大便利。

流式输出通过 astream()astream_events() 方法实现。

# 示例 3.5: 使用流式处理
import asyncio

class StreamingState(TypedDict):
    query: str
    # 使用一个列表来收集流式生成的片段
    streamed_response: Annotated[list, operator.add]
    final_answer: str

async def stream_generator(state: StreamingState):
    """模拟一个流式生成内容的节点"""
    query = state.get("query", "")
    # 模拟一个需要长时间处理、分段产出结果的任务
    words = [f"“{query}”", "这个问题", "可以", "分为", "几个", "步骤", "来", "理解", "。"]
    for i, word in enumerate(words):
        # 模拟每次生成一点延迟
        await asyncio.sleep(0.2)
        # 关键:每次生成一个片段,就yield一个更新
        # 这会通过astream发送给客户端
        yield {"streamed_response": [word]}  # 注意:合并策略是operator.add,所以是追加到列表
    # 节点最终也返回一个完整的更新(可选)
    full_sentence = "".join(words)
    return {"final_answer": full_sentence}

async def simple_formatter(state: StreamingState):
    """一个简单的格式化节点,处理最终结果"""
    response_list = state.get("streamed_response", [])
    formatted = " ".join(response_list)
    return {"final_answer": f"[格式化后的回答] {formatted}"}

async def main_async():
    # 构建一个简单的图
    builder = StateGraph(StreamingState)
    builder.add_node("stream_node", stream_generator)  # 注意:stream_generator是一个异步生成器函数
    builder.add_node("format_node", simple_formatter)
    builder.set_entry_point("stream_node")
    builder.add_edge("stream_node", "format_node")
    builder.add_edge("format_node", END)
    graph = builder.compile()

    print("开始流式输出演示(每行是一个流式更新):")
    print("-" * 40)
    
    # 关键:使用 astream 进行异步流式调用
    async for step_state in graph.astream({"query": "什么是机器学习"}, {"stream_mode": "values"}):
        # step_state 是图在每个步骤产生更新后的完整状态快照
        # 我们可以检查我们感兴趣的字段的变化
        if 'streamed_response' in step_state:
            # 获取最新的流片段
            latest_chunk_list = step_state['streamed_response']
            if latest_chunk_list:
                # 因为是追加,最新片段是列表最后一个元素
                latest_word = latest_chunk_list[-1]
                print(f"[流式片段] {latest_word}", end=" ", flush=True)
        if 'final_answer' in step_state and step_state['final_answer']:
            # 最终答案更新了
            print(f"\n\n[最终答案] {step_state['final_answer']}")

    print("-" * 40)
    print("流式处理结束。")

# 运行异步主函数
if __name__ == "__main__":
    asyncio.run(main_async())

流式处理核心

  1. 异步生成器节点stream_generator 是一个 async 函数,并且使用 yield 关键字。它可以在生成完整结果前,多次 yield 部分更新。这些更新会立即通过流发送出去。
  2. astream 方法graph.astream() 返回一个异步迭代器。每次迭代会得到一个状态更新。{"stream_mode": "values"} 配置确保我们获得的是状态值,而不是更底层的事件。
  3. 实时反馈:客户端(如一个WebSocket连接)可以订阅这个流,并在每个状态更新到达时立即做出反应(例如,将新的文本片段显示在UI上)。这创造了流畅的交互体验。
  4. 状态合并:注意 streamed_response 字段的合并策略是 operator.add 作用于列表。每次 yield {"streamed_response": [word]} 都会将 [word] 这个列表追加到 streamed_response 字段的末尾。因此,在最终状态中,streamed_response 是一个包含所有流式片段的列表。

astream_events() 提供了更细粒度的事件流,包括节点开始/结束、工具调用等,非常适合构建复杂的监控和调试界面。

四、本章练习题及其答案

4.1 选择题

  1. 在LangGraph中,节点之间传递数据的唯一媒介是什么?
    A) 全局变量
    B) 函数参数
    C) 状态(State)对象
    D) 消息队列

  2. 以下哪个合并策略表示“用新值完全覆盖旧值”?
    A) operator.add
    B) operator.sub
    C) None (默认)
    D) lambda old, new: new

  3. 条件边(Conditional Edge)的核心是什么?
    A) 一个固定指向END的链接
    B) 一个根据状态动态返回下一个节点名称的函数
    C) 一个并行执行多个节点的机制
    D) 一个用于记录日志的装饰器

  4. 子图(Subgraph)的主要优点不包括以下哪项?
    A) 代码复用
    B) 隐藏实现复杂度
    C) 强制所有节点同步执行
    D) 模块化设计

  5. 要实现“循环直到条件满足”,最常用的模式是?
    A) 使用多个独立的invoke调用
    B) 在一个节点内写while循环
    C) 通过条件边使节点指向自身
    D) 使用Send API直接发送消息

答案

  1. C。状态是LangGraph中节点间通信的核心数据结构。
  2. D。lambda old, new: new 会直接用新值覆盖旧值。默认策略(C)在某些情况下可能引发冲突。
  3. B。条件边通过一个路由函数来实现,该函数读取状态并返回下一个要执行的节点名。
  4. C。子图并不强制同步执行,它主要是为了组织和复用代码逻辑。
  5. C。通过条件边,让节点的出口指向自己,并在路由函数中判断循环条件,是实现循环的标准模式。

4.2 填空题

  1. 在定义状态时,使用 Annotated[str, operator.add] 声明一个字段,表示对该字段的更新将采用______策略。
  2. 标志工作流结束的特殊节点名是______。
  3. 要编译一个StateGraph,需要调用______方法。
  4. 用于异步流式执行图并获取状态更新迭代器的方法是______。
  5. 实现“中断-继续”模式时,通常需要将中间状态______,并在获得外部输入后,用更新后的状态再次调用invoke

答案

  1. 追加
  2. END
  3. .compile()
  4. .astream().astream_events()
  5. 持久化存储(或保存到数据库/内存/会话中)

4.3 简答题

  1. 简述状态(State)在LangGraph中的作用和重要性。

    :状态是LangGraph工作流的“记忆”和通信总线。它是一个中心化的、类型化的数据结构,贯穿于图的整个执行过程。每个节点读取状态的特定部分,进行计算,并将结果以增量更新的方式写回状态。其重要性体现在:1) 数据传递:是节点间共享信息的唯一方式;2) 流程控制:状态中的值常被用于条件边的路由判断,决定工作流走向;3) 可观测性:完整的状态历史便于调试和追踪应用行为;4) 持久化与恢复:状态可以被保存,使得长时间运行或中断的工作流能够从断点恢复。

  2. 解释条件边(Conditional Edge)和固定边(Fixed Edge)的区别及应用。

    固定边直接从节点A连接到节点B(或END),执行完A后无条件执行B,用于定义确定的、线性的执行顺序。条件边则连接节点A到一个路由函数,该函数在运行时检查当前状态,并动态返回下一个要执行的节点名称。区别在于:固定边是静态的、预先定义好的;条件边是动态的、依赖于运行时状态。应用上,固定边用于构建顺序流程;条件边用于实现分支(如if/else)和循环(如while)逻辑,是使工作流具备“智能”和灵活性的关键。

  3. 描述使用子图(Subgraph)构建复杂应用的两种好处。

    :1) 模块化与复用:可以将通用的、功能独立的逻辑(如“用户身份验证”、“数据清洗”、“报告生成”)封装成子图。这些子图可以在多个主图中被调用,避免代码重复,提高可维护性。2) 抽象与复杂度管理:将大型复杂工作流分解为多个逻辑清晰的子图,降低了单个图的规模和认知负荷。开发者可以专注于子图内部的逻辑,并通过定义清晰的输入/输出接口来组合它们,这使得架构更清晰,易于团队协作和测试。

4.4 实操题

题目:构建一个简单的“旅行规划助手”工作流。要求:

  1. 定义状态,包含destination(目的地),days(天数),interests(兴趣列表,如[“美食”, “古迹”]),plan(计划,字符串),cost_estimate(预算估算)。
  2. 实现三个节点:
    • generate_plan: 根据目的地、天数和兴趣,生成一个简单的文字计划(可模拟,例如:f”在{destination}进行{days}日游,重点:{‘, ‘.join(interests)}。“)。
    • estimate_cost: 根据天数和兴趣数量粗略估算预算(可模拟,例如:days * 1000 + len(interests) * 500)。
    • format_output: 将计划和预算格式化为一个友好的字符串。
  3. 使用条件边实现:如果用户兴趣列表为空,则跳过generate_plan节点,直接进入estimate_cost节点(生成一个默认计划),否则正常执行generate_plan -> estimate_cost
  4. 最后总是执行format_output节点。
  5. 编写代码并测试两种场景:
    • 场景A: {“destination”: “北京”, “days”: 3, “interests”: [“故宫”, “长城”]}
    • 场景B: {“destination”: “上海”, “days”: 2, “interests”: []}

参考实现代码

# 练习题 4.4 实操题答案
from typing import Annotated, TypedDict
import operator
from langgraph.graph import StateGraph, END

# 1. 定义状态
class TravelPlanState(TypedDict):
    destination: str
    days: int
    interests: Annotated[list, operator.add]  # 注意:列表的合并策略也是add
    plan: Annotated[str, operator.add]
    cost_estimate: int
    final_output: str

# 2. 实现节点函数
def check_interests_and_route(state: TravelPlanState) -> str:
    """路由节点:检查兴趣列表是否为空"""
    interests = state.get("interests", [])
    if not interests:  # 如果兴趣列表为空
        return "estimate_cost_node"  # 跳过生成计划,直接去估算成本
    else:
        return "generate_plan_node"  # 正常执行生成计划

def generate_plan(state: TravelPlanState):
    """生成计划节点"""
    dest = state.get("destination", "某地")
    days = state.get("days", 1)
    interests = state.get("interests", [])
    plan_text = f"为您规划{dest}{days}日游。您的兴趣点包括:{', '.join(interests)}。我们将据此安排行程。\n"
    return {"plan": plan_text}

def estimate_cost(state: TravelPlanState):
    """估算成本节点"""
    days = state.get("days", 1)
    interests = state.get("interests", [])
    # 模拟一个简单的成本计算逻辑
    base_cost = days * 1000
    interest_surcharge = len(interests) * 500
    total_cost = base_cost + interest_surcharge
    # 如果计划为空(即跳过了generate_plan),则生成一个默认计划
    current_plan = state.get("plan", "")
    if not current_plan:
        dest = state.get("destination", "某地")
        default_plan = f"为您规划{dest}{days}日自由行,行程较为灵活。\n"
        return {"plan": default_plan, "cost_estimate": total_cost}
    else:
        return {"cost_estimate": total_cost}

def format_output(state: TravelPlanState):
    """格式化输出节点"""
    dest = state.get("destination", "N/A")
    plan = state.get("plan", "暂无计划")
    cost = state.get("cost_estimate", 0)
    formatted = f"旅行目的地:{dest}\n"
    formatted += "-" * 20 + "\n"
    formatted += f"行程计划:\n{plan}\n"
    formatted += f"初步预算估算:¥{cost} 元\n"
    formatted += "-" * 20
    return {"final_output": formatted}

# 3. 构建图
builder = StateGraph(TravelPlanState)

# 添加节点
builder.add_node("route_by_interests", check_interests_and_route)  # 注意:这是一个路由函数,但作为节点添加
builder.add_node("generate_plan_node", generate_plan)
builder.add_node("estimate_cost_node", estimate_cost)
builder.add_node("format_output_node", format_output)

# 设置入口点为路由节点
builder.set_entry_point("route_by_interests")

# 添加条件边:路由节点根据兴趣列表决定下一步
builder.add_conditional_edges(
    "route_by_interests",
    # 这个函数返回下一个节点名,但我们已将路由逻辑放在节点函数自身中
    # 这里需要一个调用节点函数的路由函数,但更简单的方法是将路由逻辑直接写在这里
    # 我们需要重构:将check_interests_and_route的逻辑移到lambda中
    lambda s: "estimate_cost_node" if not s.get("interests", []) else "generate_plan_node"
)
# 添加固定边
builder.add_edge("generate_plan_node", "estimate_cost_node")
builder.add_edge("estimate_cost_node", "format_output_node")
builder.add_edge("format_output_node", END)

# 编译图
travel_planner = builder.compile()

# 4. 测试场景
print("=== 场景A:有具体兴趣 ===")
input_state_a = {
    "destination": "北京",
    "days": 3,
    "interests": ["故宫", "长城", "美食"]
}
result_a = travel_planner.invoke(input_state_a)
print(result_a["final_output"])

print("\n\n=== 场景B:无具体兴趣(兴趣列表为空)===")
input_state_b = {
    "destination": "上海",
    "days": 2,
    "interests": []  # 空列表
}
result_b = travel_planner.invoke(input_state_b)
print(result_b["final_output"])

# 可选:打印最终状态以查看所有字段
print("\n场景A完整状态:")
for k, v in result_a.items():
    if k != 'final_output':
        print(f"  {k}: {v}")

代码要点解析

  1. 状态合并interests 字段使用 Annotated[list, operator.add],这意味着如果多个节点返回 {"interests": [“新兴趣"]},这些列表会被合并。但在本题中,兴趣列表是输入,不会被修改,所以此策略影响不大。
  2. 条件路由:我们没有创建一个单独的节点来存放 check_interests_and_route 函数,而是直接在 add_conditional_edges 中使用了一个 lambda 函数来实现路由逻辑。这使得图更简洁。路由逻辑是:如果兴趣列表为空,则直接跳转到 estimate_cost_node,否则去 generate_plan_node
  3. 默认计划:在 estimate_cost 节点中,我们检查 state.get(“plan”, “”) 是否为空。如果为空,说明跳过了计划生成节点,因此我们在该节点内生成一个默认的计划文本。这确保了无论走哪条路径,plan 字段最终都有值。
  4. 执行流程
    • 场景A(有兴趣):入口 -> 路由节点(条件边:有兴趣)-> generate_plan_node -> estimate_cost_node -> format_output_node -> END。
    • 场景B(无兴趣):入口 -> 路由节点(条件边:无兴趣)-> estimate_cost_node(在此生成默认计划)-> format_output_node -> END。

五、总结

LangGraph通过引入“图”这一核心抽象,为构建复杂、有状态、可控制的AI应用工作流提供了一套强大而优雅的范式。本文系统性地介绍了其基础知识与核心图形API。

我们从状态、节点、边、图这四个基本概念入手,理解了LangGraph如何将应用程序建模为一个数据流图。状态是流动的数据载体,节点是处理单元,边是控制流路径,而图则是它们的有机整体。通过合并策略,LangGraph优雅地解决了并行或冲突写入状态的问题。

在图形API部分,我们探讨了更高级的控制流模式。条件边是实现智能路由和循环的基石。中断机制通过状态持久化和再次调用,为构建需要与人或其他系统交互的Agent提供了可能。子图是管理复杂性和促进复用的最佳实践,允许我们将大问题分解为小模块。最后,流式处理不仅是提升用户体验的关键,其astreamastream_events API也为实时监控和调试提供了强大工具。

LangGraph不是要替代LangChain的核心链(LCEL),而是对其的补充和增强。当你的应用逻辑从简单的线性链发展为包含分支、循环、工具调用和状态管理时,LangGraph便成为了自然的选择。它特别适合于构建多轮对话Agent、复杂决策系统、审批工作流和任何需要维护长期上下文与执行历史的自动化任务。

掌握LangGraph,意味着你掌握了设计和实现下一代智能、健壮且可维护的AI应用架构的关键技能。从本章介绍的基础和API出发,你可以继续探索其与LangChain工具的更深度集成、多Agent协作、持久化状态存储等高级主题,从而将你的AI想法高效地转化为现实。


🌟 感谢您耐心阅读到这里!
🚀 技术成长没有捷径,但每一次的阅读、思考和实践,都在默默缩短您与成功的距离。
💡 如果本文对您有所启发,欢迎点赞👍、收藏📌、分享📤给更多需要的伙伴!
🗣️ 期待在评论区看到您的想法、疑问或建议,我会认真回复,让我们共同探讨、一起进步~
🔔 关注我,持续获取更多干货内容!
🤗 我们下篇文章见!

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐