第10节:LangGraph 基础知识与图形API

文章目录
一、前言
在当今人工智能应用蓬勃发展的时代,构建能够处理复杂、多步骤任务的智能系统成为开发者面临的重要挑战。传统的线性代码流程难以适应需要状态管理、条件分支和循环迭代的AI工作流。为此,LangChain团队推出了LangGraph——一个基于图(Graph)的编程框架,专门为创建具有复杂逻辑和状态持久化能力的大语言模型(LLM)应用而设计。
LangGraph的核心思想是将应用程序建模为一个有向图,其中节点代表执行单元(如调用LLM、执行代码),边定义节点间的流转逻辑,而状态对象则随着数据在图中流动而不断演化。这种范式使得开发者能够清晰地构建包含条件判断、循环、并行执行和外部工具调用的复杂链(Chain),极大地增强了AI Agent、聊天机器人、自动化工作流等应用的表达能力和可控性。
本文将系统介绍LangGraph的基础概念与核心图形API。我们将从状态、节点、边、图等基本构建块开始,逐步深入到Send、Command、Interrupt等高级控制流操作,并通过大量可运行的代码示例展示如何利用这些API构建功能强大的应用。无论您是希望为现有应用添加智能,还是探索下一代AI工作流编排,相信LangGraph都能为您提供强大而优雅的解决方案。
二、基础知识
在深入LangGraph的具体API之前,我们必须理解其构建复杂工作流所依赖的几个核心抽象:状态、节点、边和图。这些概念共同构成了LangGraph编程模型的基础。
2.1 状态
在LangGraph中,状态是一个中心化的、可变的字典(或类似结构),它在图的整个执行生命周期中存在并演化。状态是节点之间通信和数据传递的唯一媒介。每个节点读取状态的一部分,执行计算,并将结果写回状态。这种设计使得工作流具有清晰的输入输出定义和强大的可观测性。
状态通常用一个Pydantic BaseModel 或 TypedDict 来定义,这为状态的结构提供了类型提示和验证。让我们通过一个简单的示例来理解状态的定义与使用。
# 示例 2.1: 定义和使用状态
from typing import Annotated, TypedDict
from typing_extensions import TypedDict
import operator
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
# 1. 定义状态结构
# 我们定义一个名为MyState的类型字典,它包含两个字段:
# - question: 用户输入的问题字符串
# - answer: 工作流最终生成的答案字符串
# 使用Annotated和operator.add来指定在多个节点写入同一字段时的合并策略。
# 这里我们为`answer`字段指定“追加”策略,意味着如果多个节点都修改`answer`,它们的结果会被拼接起来。
class MyState(TypedDict):
question: str
answer: Annotated[str, operator.add] # 使用`add`操作符表示追加合并
# 2. 定义节点函数
# 节点是图中执行特定任务的单元。它接收当前状态,并返回要更新到状态中的字典。
def generate_greeting(state: MyState):
"""第一个节点:生成问候语"""
# 从状态中读取用户问题(虽然在此节点未使用)
user_question = state.get("question", "")
# 准备要更新到状态的内容
# 根据合并策略,此字符串将被追加到`answer`字段的末尾
return {"answer": "你好!我已收到您的问题。\n"}
def process_question(state: MyState):
"""第二个节点:处理问题并生成答案"""
user_question = state.get("question", "")
if not user_question:
response = "您似乎没有提出具体问题。"
else:
# 这里模拟一个简单的处理逻辑。在实际应用中,这里可能会调用LLM或其他工具。
response = f"您的问题是:'{user_question}'\n这是一个非常好的问题,涉及人工智能领域。"
# 将处理结果追加到答案中
return {"answer": response}
def add_conclusion(state: MyState):
"""第三个节点:添加结束语"""
return {"answer": "\n感谢您的提问!"}
# 3. 构建图
# 创建状态图构建器,并指定状态的结构类型
builder = StateGraph(MyState)
# 4. 将节点函数添加到图中
# 每个节点需要一个唯一的名字
builder.add_node("greet", generate_greeting)
builder.add_node("process", process_question)
builder.add_node("conclude", add_conclusion)
# 5. 设置入口点
# 指定工作流从哪个节点开始执行
builder.set_entry_point("greet")
# 6. 添加边来定义节点间的执行顺序
# 从“greet”节点执行完后,自动跳转到“process”节点
builder.add_edge("greet", "process")
# 从“process”节点执行完后,自动跳转到“conclude”节点
builder.add_edge("process", "conclude")
# 从“conclude”节点执行完后,工作流结束
builder.add_edge("conclude", END)
# 7. 编译图
# 将构建器编译成一个可执行的计算图对象
graph = builder.compile()
# 8. 执行图
# 通过invoke方法运行图,需要传入初始状态
initial_state = {"question": "请解释LangGraph是什么?"}
final_state = graph.invoke(initial_state)
# 9. 查看结果
print("最终状态中的答案:")
print(final_state["answer"])
print("\n完整状态:")
print(final_state)
代码解析与核心概念:
- 状态定义:
MyState类定义了工作流状态的结构。Annotated[str, operator.add]是关键,它声明了answer字段的合并策略。operator.add意味着“追加”,这是LangGraph处理状态冲突的核心机制。当多个节点并行执行或一个节点多次写入同一字段时,合并策略定义了如何整合这些更新。 - 节点:节点是执行具体工作的函数。它接收一个状态对象(类型为定义的状态类),并返回一个字典,其中包含要更新到状态中的键值对。节点不应该直接修改传入的状态对象,而是返回增量更新。
- 图与边:我们将节点添加到图中,然后用边将它们连接起来,形成一个有向执行链。
set_entry_point定义了起点,END是一个特殊的节点,表示工作流终止。
运行上面的代码,你会看到输出结果,其中answer字段是三个节点依次追加内容的结果。这展示了状态是如何随着在图中流动而演化的。
2.2 节点
节点是LangGraph工作流中的基本计算单元。每个节点封装了一个特定的操作或任务。如示例所示,节点函数接收状态,返回更新。节点的设计应该尽量保持功能单一和可复用。除了普通函数,节点也可以是:
- LangChain可运行对象:如
LCEL链、工具等。 - 其他LangGraph:这实现了图的嵌套,是构建模块化复杂应用的关键。
# 示例 2.2: 使用LangChain组件作为节点
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# 定义一个新的、更复杂的状态
class AnalysisState(TypedDict):
topic: str
analysis: Annotated[str, operator.add]
summary: str
# 创建一个LangChain链,它将作为图中的一个节点
llm = ChatOpenAI(model="gpt-3.5-turbo") # 请确保已设置OPENAI_API_KEY环境变量
prompt = ChatPromptTemplate.from_messages([
("system", "你是一位技术分析师。"),
("user", "请对以下主题进行简要分析:{topic}")
])
analysis_chain = prompt | llm | StrOutputParser()
def call_analysis_chain(state: AnalysisState):
"""节点函数:调用LangChain链进行分析"""
topic = state.get("topic", "无主题")
# 调用链,传入状态中的topic
result = analysis_chain.invoke({"topic": topic})
# 返回更新
return {"analysis": f"分析结果:{result}\n"}
def human_summarize(state: AnalysisState):
"""另一个节点:人工总结(模拟)"""
# 模拟一个基于分析结果生成总结的逻辑
analysis_text = state.get("analysis", "")
if "强大" in analysis_text or "优秀" in analysis_text:
summary = "该技术前景广阔。"
else:
summary = "该技术需要进一步观察。"
return {"summary": summary}
# 构建图
builder = StateGraph(AnalysisState)
builder.add_node("analyze", call_analysis_chain)
builder.add_node("summarize", human_summarize)
builder.set_entry_point("analyze")
builder.add_edge("analyze", "summarize")
builder.add_edge("summarize", END)
graph = builder.compile()
# 执行
result = graph.invoke({"topic": "大语言模型的发展"})
print("分析内容:", result.get("analysis"))
print("总结:", result.get("summary"))
这个例子展示了如何将现有的LangChain链无缝集成到LangGraph中作为节点,实现了传统链与图工作流的结合。
2.3 边
边定义了图中节点之间的流转逻辑。最基本的边是固定边,它从一个节点无条件地指向另一个节点(或END)。但LangGraph的强大之处在于条件边。
条件边允许根据当前状态的值,动态决定下一个要执行的节点。这为实现分支、循环等复杂逻辑提供了可能。条件边通过一个函数(通常称为路由函数)来实现,该函数检查状态并返回下一个节点的名称。
# 示例 2.3: 使用条件边实现分支逻辑
class BranchState(TypedDict):
query: str
category: str
response: Annotated[str, operator.add]
def classify_query(state: BranchState):
"""节点:对查询进行分类"""
query = state.get("query", "").lower()
if "价格" in query or "多少钱" in query:
category = "price"
elif "功能" in query or "做什么" in query:
category = "feature"
else:
category = "general"
# 将分类结果写入状态
return {"category": category}
def handle_price(state: BranchState):
"""分支节点:处理价格查询"""
return {"response": "价格相关处理逻辑已执行。\n"}
def handle_feature(state: BranchState):
"""分支节点:处理功能查询"""
return {"response": "功能相关处理逻辑已执行。\n"}
def handle_general(state: BranchState):
"""分支节点:处理一般查询"""
return {"response": "一般咨询处理逻辑已执行。\n"}
def route_after_classify(state: BranchState):
"""条件边路由函数:根据分类结果决定下一个节点"""
category = state.get("category")
if category == "price":
return "price_node"
elif category == "feature":
return "feature_node"
else:
return "general_node"
# 构建图
builder = StateGraph(BranchState)
# 添加节点
builder.add_node("classify", classify_query)
builder.add_node("price_node", handle_price)
builder.add_node("feature_node", handle_feature)
builder.add_node("general_node", handle_general)
# 设置入口点
builder.set_entry_point("classify")
# 添加条件边
# 关键:将`classify`节点连接到条件路由函数,而非固定节点
builder.add_conditional_edges(
"classify", # 源节点
route_after_classify, # 路由函数,它返回下一个节点的名字
{
"price_node": "price_node", # 路由函数可能返回的值到实际节点的映射(可选,用于文档)
"feature_node": "feature_node",
"general_node": "general_node"
}
)
# 为每个分支节点添加固定边,指向结束
builder.add_edge("price_node", END)
builder.add_edge("feature_node", END)
builder.add_edge("general_node", END)
graph = builder.compile()
# 测试不同查询
test_queries = ["这个产品多少钱?", "它有哪些主要功能?", "你们公司地址在哪?"]
for q in test_queries:
print(f"\n输入查询: {q}")
result = graph.invoke({"query": q})
print(f"路由分类: {result.get('category')}")
print(f"响应: {result.get('response')}")
代码解析:
add_conditional_edges是创建条件边的核心API。它接收一个源节点、一个路由函数和一个可选的映射字典。- 路由函数
route_after_classify接收当前状态,并必须返回一个字符串,该字符串是下一个要执行的节点的名称。 - 运行此代码,你会发现对于不同的输入查询,工作流会走不同的分支路径,实现了动态路由。
2.4 图
图是节点和边的集合,它定义了工作流的整体拓扑结构。LangGraph中的图是有状态的,意味着它维护着一个状态对象贯穿执行始终。图通过 builder.compile() 编译后,得到一个可调用对象,其核心方法是:
invoke(initial_state): 以初始状态同步执行整个图,返回最终状态。astream(initial_state): 以初始状态异步流式执行图,返回一个异步迭代器,可以逐步产出状态更新。这对于实时UI更新或长时间运行任务非常有用。
图本身也可以作为一个节点,嵌入到另一个更大的图中,这称为子图,是构建分层、模块化应用的核心模式,我们将在API章节详细讨论。
三、图形API
在掌握基础概念后,我们来探索LangGraph提供的一系列强大API,它们用于精细控制工作流的执行流、交互和结构。
3.1 Send 发送
Send API 用于动态地向图中添加任务或消息,通常在需要从某个节点“触发”另一个节点的执行时使用。它是实现事件驱动或特定工作流模式(如循环直到满足条件)的关键。然而,在标准 StateGraph 中,执行流主要由边定义。Send 的概念在更底层的 MessageGraph 或通过 tools 调用时更常见。
在 StateGraph 范式下,更常见的模式是使用条件边来实现循环,模拟“发送”任务给自己或他人的效果。例如,构建一个“循环直到满足条件”的节点:
# 示例 3.1: 使用条件边实现循环(模拟Send行为)
class LoopState(TypedDict):
count: int
target: int
log: Annotated[str, operator.add]
def increment_and_check(state: LoopState):
"""循环节点:递增计数器并检查条件"""
current = state.get("count", 0)
target = state.get("target", 5)
new_count = current + 1
update_log = f"计数从 {current} 增加到 {new_count}.\n"
# 准备更新
updates = {"count": new_count, "log": update_log}
return updates
def should_continue(state: LoopState) -> str:
"""条件路由函数:决定继续循环还是结束"""
current = state.get("count", 0)
target = state.get("target", 5)
if current < target:
# 返回当前节点的名字,实现循环
return "increment_node"
else:
return "__end__" # 特殊值,表示结束
# 构建图
builder = StateGraph(LoopState)
builder.add_node("increment_node", increment_and_check)
builder.set_entry_point("increment_node")
# 关键:节点通过条件边连接到自己,形成循环潜力
builder.add_conditional_edges(
"increment_node",
should_continue
)
graph = builder.compile()
# 执行循环
print("执行循环示例:")
result = graph.invoke({"count": 0, "target": 3})
print(f"最终计数: {result.get('count')}")
print("执行日志:")
print(result.get("log"))
这个例子展示了如何通过条件边让一个节点重复执行自身,直到状态满足退出条件,这本质上是将任务“发送”给自己。
3.2 Command 命令
Command 并不是LangGraph API中的一个独立函数,而是一个概念,通常指在状态中设置特定的指令或标志,并由某个节点(通常是一个路由节点或专门的命令处理节点)来解析和执行该指令,从而改变工作流的走向。这类似于在聊天机器人中,用户输入“/help”会触发帮助流程。
我们可以利用状态和条件边来实现命令模式。
# 示例 3.2: 实现简单的命令模式
class CommandState(TypedDict):
user_input: str
command: str
output: Annotated[str, operator.add]
def parse_command(state: CommandState):
"""节点:解析用户输入,提取命令"""
user_input = state.get("user_input", "").strip()
if user_input.startswith("/"):
# 简单解析命令,例如 "/help" -> "help"
cmd = user_input[1:].split()[0] # 获取‘/’后的第一个词
else:
cmd = "chat" # 非命令输入视为普通聊天
return {"command": cmd}
def route_by_command(state: CommandState) -> str:
"""路由函数:根据解析出的命令路由到不同节点"""
cmd = state.get("command", "chat")
if cmd == "help":
return "help_node"
elif cmd == "info":
return "info_node"
elif cmd == "chat":
return "chat_node"
else:
return "unknown_cmd_node"
def handle_help(state: CommandState):
return {"output": "可用命令:/help, /info, /chat\n"}
def handle_info(state: CommandState):
return {"output": "这是基于LangGraph构建的示例Bot。\n"}
def handle_chat(state: CommandState):
user_input = state.get("user_input", "")
return {"output": f"您说:{user_input}\n(这是普通聊天回复)\n"}
def handle_unknown(state: CommandState):
cmd = state.get("command", "")
return {"output": f"未知命令 '{cmd}'。\n"}
# 构建图
builder = StateGraph(CommandState)
builder.add_node("parse", parse_command)
builder.add_node("help_node", handle_help)
builder.add_node("info_node", handle_info)
builder.add_node("chat_node", handle_chat)
builder.add_node("unknown_cmd_node", handle_unknown)
builder.set_entry_point("parse")
# 根据命令路由到不同处理节点
builder.add_conditional_edges("parse", route_by_command)
# 所有处理节点都直接结束
for node in ["help_node", "info_node", "chat_node", "unknown_cmd_node"]:
builder.add_edge(node, END)
graph = builder.compile()
# 测试不同输入
test_inputs = ["你好", "/help", "/info 详细资料", "/invalid"]
for inp in test_inputs:
print(f"\n输入: {inp}")
result = graph.invoke({"user_input": inp})
print(f"输出: {result.get('output')}")
在这个实现中,user_input 中的“/help”等字符串就是命令。parse_command节点充当了解析器,将命令提取到状态中,随后的条件边根据这个命令值决定工作流的走向。
3.3 interrupt 中断
interrupt 机制允许工作流在特定点暂停,等待外部输入或干预,然后再继续执行。这是构建交互式Agent(如需要用户确认、提供额外信息)的基石。在LangGraph中,这通常通过在状态中设置一个特殊标志,并由条件边检查该标志来实现,或者更正式地,通过预定义的中断点。
下面是一个模拟中断的示例:工作流执行一个任务,然后在“审批”节点中断,等待(模拟的)外部批准信号,再决定是继续执行成功流程还是失败流程。
# 示例 3.3: 实现中断机制
from typing import Literal
class InterruptState(TypedDict):
task: str
# 用于控制流程的字段
status: Literal["processing", "awaiting_approval", "approved", "rejected", "completed"]
result: Annotated[str, operator.add]
# 模拟外部输入(在实际应用中,这可能来自用户或API)
external_approval: bool
def process_task(state: InterruptState):
"""节点1:处理任务"""
task = state.get("task", "N/A")
return {
"result": f"开始处理任务:'{task}'...\n",
"status": "processing"
}
def request_approval(state: InterruptState):
"""节点2:请求批准(中断点)"""
# 在实际场景中,这里可能会:
# 1. 向用户发送消息
# 2. 将状态持久化到数据库
# 3. 暂停图的执行,返回一个包含“interrupt”信息的响应
# 本例中,我们通过状态模拟“已发出请求,正在等待”
print("[系统] 任务需要审批。当前状态已暂停,等待 external_approval 信号。")
# 将状态设为等待审批,这将导致路由函数决定下一步
return {
"result": "任务已提交,等待审批中...\n",
"status": "awaiting_approval"
}
def check_approval(state: InterruptState) -> str:
"""条件路由函数:检查状态,决定流程"""
current_status = state.get("status", "processing")
if current_status == "processing":
# 刚处理完任务,去请求审批
return "approval_node"
elif current_status == "awaiting_approval":
# 关键:检查模拟的外部输入
is_approved = state.get("external_approval", False)
if is_approved:
return "execute_node"
else:
return "reject_node"
elif current_status in ["approved", "rejected"]:
# 执行或拒绝后,工作流结束
return "__end__"
else:
return "__end__"
def execute_approved_task(state: InterruptState):
"""节点3:执行已批准的任务"""
return {
"result": "审批通过!任务执行成功。\n",
"status": "completed"
}
def handle_rejection(state: InterruptState):
"""节点4:处理被拒绝的任务"""
return {
"result": "审批被拒绝。任务已取消。\n",
"status": "completed"
}
# 构建图
builder = StateGraph(InterruptState)
builder.add_node("process_node", process_task)
builder.add_node("approval_node", request_approval)
builder.add_node("execute_node", execute_approved_task)
builder.add_node("reject_node", handle_rejection)
builder.set_entry_point("process_node")
# 整个流程由 check_approval 函数控制
builder.add_conditional_edges("process_node", check_approval)
builder.add_conditional_edges("approval_node", check_approval)
builder.add_conditional_edges("execute_node", check_approval)
builder.add_conditional_edges("reject_node", check_approval)
graph = builder.compile()
# 模拟执行 - 第一次调用,进入等待审批状态
print("=== 第一次调用(任务开始,等待审批)===")
state_after_pause = graph.invoke({
"task": "部署新服务",
"status": "processing",
"external_approval": False # 初始没有批准
})
print("当前结果:", state_after_pause.get("result"))
print("当前状态:", state_after_pause.get("status"))
print("\n... 模拟外部系统或用户进行审批 ...\n")
# 模拟第二次调用,传入“批准”信号,从暂停点继续
print("=== 第二次调用(传入批准信号)===")
# 关键:我们传入更新后的状态,其中 external_approval 为 True
state_after_pause["external_approval"] = True
final_state = graph.invoke(state_after_pause) # 从之前的状态继续执行
print("最终结果:", final_state.get("result"))
print("最终状态:", final_state.get("status"))
核心机制解析:
- 状态作为暂停点:
request_approval节点将状态设置为awaiting_approval并返回。此时,图的一次invoke调用结束。 - 外部干预:应用程序(可能是Web服务器)将当前状态保存起来(例如在数据库或会话中)。然后等待真实世界的交互(如用户点击“批准”按钮)。
- 继续执行:当外部事件发生时(如收到批准),应用程序加载保存的状态,更新相关字段(如将
external_approval设为True),然后再次调用graph.invoke(),并传入这个更新后的状态。 - 路由决策:图从上次中断的节点(通过状态中的
status标识)继续执行。条件路由函数check_approval检查到状态为awaiting_approval且external_approval为True,于是路由到execute_node,完成后续流程。
这种模式完美地模拟了“中断-继续”的交互式工作流,是构建复杂Agent的核心。
3.4 Subgraphs 子图
子图是LangGraph中实现模块化和复用的关键特性。你可以将一个编译好的图作为节点,添加到另一个图中。这有助于管理复杂性,将大型工作流分解为可重用的、逻辑独立的子组件。
# 示例 3.4: 创建和使用子图
class SubgraphState(TypedDict):
input_data: str
# 子图A的输出
processed_by_a: Annotated[str, operator.add]
# 子图B的输出
processed_by_b: Annotated[str, operator.add]
final_output: str
# --- 创建子图A:字符串处理流水线 ---
class StateA(TypedDict):
value: str
upper: str
length: int
def to_upper(state: StateA):
return {"upper": state.get("value", "").upper()}
def calc_length(state: StateA):
val = state.get("value", "")
return {"length": len(val)}
builder_a = StateGraph(StateA)
builder_a.add_node("upper_node", to_upper)
builder_a.add_node("length_node", calc_length)
builder_a.set_entry_point("upper_node")
builder_a.add_edge("upper_node", "length_node")
builder_a.add_edge("length_node", END)
subgraph_a = builder_a.compile() # 编译子图A
# --- 创建子图B:数字处理流水线(模拟)---
class StateB(TypedDict):
number_str: str
squared: int
def square_number(state: StateB):
num = int(state.get("number_str", "0"))
return {"squared": num * num}
builder_b = StateGraph(StateB)
builder_b.add_node("square_node", square_number)
builder_b.set_entry_point("square_node")
builder_b.add_edge("square_node", END)
subgraph_b = builder_b.compile() # 编译子图B
# --- 主图:协调子图A和子图B ---
def preprocess(state: SubgraphState):
"""主图节点1:预处理,为子图准备数据"""
data = state.get("input_data", "")
return {"processed_by_a": f"原始数据: {data}\n"}
def call_subgraph_a(state: SubgraphState):
"""主图节点2:调用子图A"""
data = state.get("input_data", "")
# 调用子图A,需要传入符合StateA结构的初始状态
result_a = subgraph_a.invoke({"value": data})
# 从子图A的结果中提取我们需要的信息
extracted = f"大写: {result_a['upper']}, 长度: {result_a['length']}\n"
return {"processed_by_a": extracted}
def call_subgraph_b(state: SubgraphState):
"""主图节点3:调用子图B"""
data = state.get("input_data", "")
# 假设我们想处理字符串的长度
length = len(data)
# 调用子图B,传入数字字符串
result_b = subgraph_b.invoke({"number_str": str(length)})
extracted = f"长度平方: {result_b['squared']}\n"
return {"processed_by_b": extracted}
def aggregate_results(state: SubgraphState):
"""主图节点4:聚合结果"""
from_a = state.get("processed_by_a", "")
from_b = state.get("processed_by_b", "")
final = f"汇总:\n子图A结果:{from_a}子图B结果:{from_b}"
return {"final_output": final}
# 构建主图
main_builder = StateGraph(SubgraphState)
main_builder.add_node("preprocess", preprocess)
# 注意:call_subgraph_a 和 call_subgraph_b 是主图的节点函数,
# 它们在函数内部调用子图的invoke方法。这不是将子图直接作为节点添加。
# 更高级的用法可以使用 `add_node("subgraph_a", subgraph_a)` 如果子图与主图状态兼容。
main_builder.add_node("run_a", call_subgraph_a)
main_builder.add_node("run_b", call_subgraph_b)
main_builder.add_node("aggregate", aggregate_results)
main_builder.set_entry_point("preprocess")
main_builder.add_edge("preprocess", "run_a")
main_builder.add_edge("run_a", "run_b")
main_builder.add_edge("run_b", "aggregate")
main_builder.add_edge("aggregate", END)
main_graph = main_builder.compile()
# 执行主图
print("执行包含子图的工作流:")
final_result = main_graph.invoke({"input_data": "HelloWorld"})
print("最终输出:", final_result["final_output"])
print("\n完整状态:")
for k, v in final_result.items():
print(f" {k}: {v}")
说明:
- 子图是独立的、可编译和可测试的单元。它们可以有自己的状态结构。
- 在主图中调用子图,通常需要在主图的节点函数中手动调用
subgraph.invoke(...),并处理好状态映射(将主图状态转换为子图需要的状态,以及将子图结果写回主图状态)。 - LangGraph也支持更紧密的集成(例如,如果状态结构设计得当,可以用
add_node(“subgraph_name”, compiled_subgraph)直接添加子图作为节点),但手动调用提供了最大的灵活性,尤其是在状态结构不同时。
3.5 Streaming 流式处理机制
流式处理对于提供实时反馈的用户体验至关重要,例如在聊天应用中逐词显示AI的回复。LangGraph原生支持流式输出,不仅支持最终结果的流式输出,还支持整个状态变化过程的流式输出,这为调试和构建交互式界面提供了极大便利。
流式输出通过 astream() 或 astream_events() 方法实现。
# 示例 3.5: 使用流式处理
import asyncio
class StreamingState(TypedDict):
query: str
# 使用一个列表来收集流式生成的片段
streamed_response: Annotated[list, operator.add]
final_answer: str
async def stream_generator(state: StreamingState):
"""模拟一个流式生成内容的节点"""
query = state.get("query", "")
# 模拟一个需要长时间处理、分段产出结果的任务
words = [f"“{query}”", "这个问题", "可以", "分为", "几个", "步骤", "来", "理解", "。"]
for i, word in enumerate(words):
# 模拟每次生成一点延迟
await asyncio.sleep(0.2)
# 关键:每次生成一个片段,就yield一个更新
# 这会通过astream发送给客户端
yield {"streamed_response": [word]} # 注意:合并策略是operator.add,所以是追加到列表
# 节点最终也返回一个完整的更新(可选)
full_sentence = "".join(words)
return {"final_answer": full_sentence}
async def simple_formatter(state: StreamingState):
"""一个简单的格式化节点,处理最终结果"""
response_list = state.get("streamed_response", [])
formatted = " ".join(response_list)
return {"final_answer": f"[格式化后的回答] {formatted}"}
async def main_async():
# 构建一个简单的图
builder = StateGraph(StreamingState)
builder.add_node("stream_node", stream_generator) # 注意:stream_generator是一个异步生成器函数
builder.add_node("format_node", simple_formatter)
builder.set_entry_point("stream_node")
builder.add_edge("stream_node", "format_node")
builder.add_edge("format_node", END)
graph = builder.compile()
print("开始流式输出演示(每行是一个流式更新):")
print("-" * 40)
# 关键:使用 astream 进行异步流式调用
async for step_state in graph.astream({"query": "什么是机器学习"}, {"stream_mode": "values"}):
# step_state 是图在每个步骤产生更新后的完整状态快照
# 我们可以检查我们感兴趣的字段的变化
if 'streamed_response' in step_state:
# 获取最新的流片段
latest_chunk_list = step_state['streamed_response']
if latest_chunk_list:
# 因为是追加,最新片段是列表最后一个元素
latest_word = latest_chunk_list[-1]
print(f"[流式片段] {latest_word}", end=" ", flush=True)
if 'final_answer' in step_state and step_state['final_answer']:
# 最终答案更新了
print(f"\n\n[最终答案] {step_state['final_answer']}")
print("-" * 40)
print("流式处理结束。")
# 运行异步主函数
if __name__ == "__main__":
asyncio.run(main_async())
流式处理核心:
- 异步生成器节点:
stream_generator是一个async函数,并且使用yield关键字。它可以在生成完整结果前,多次yield部分更新。这些更新会立即通过流发送出去。 astream方法:graph.astream()返回一个异步迭代器。每次迭代会得到一个状态更新。{"stream_mode": "values"}配置确保我们获得的是状态值,而不是更底层的事件。- 实时反馈:客户端(如一个WebSocket连接)可以订阅这个流,并在每个状态更新到达时立即做出反应(例如,将新的文本片段显示在UI上)。这创造了流畅的交互体验。
- 状态合并:注意
streamed_response字段的合并策略是operator.add作用于列表。每次yield {"streamed_response": [word]}都会将[word]这个列表追加到streamed_response字段的末尾。因此,在最终状态中,streamed_response是一个包含所有流式片段的列表。
astream_events() 提供了更细粒度的事件流,包括节点开始/结束、工具调用等,非常适合构建复杂的监控和调试界面。
四、本章练习题及其答案
4.1 选择题
-
在LangGraph中,节点之间传递数据的唯一媒介是什么?
A) 全局变量
B) 函数参数
C) 状态(State)对象
D) 消息队列 -
以下哪个合并策略表示“用新值完全覆盖旧值”?
A)operator.add
B)operator.sub
C)None(默认)
D)lambda old, new: new -
条件边(Conditional Edge)的核心是什么?
A) 一个固定指向END的链接
B) 一个根据状态动态返回下一个节点名称的函数
C) 一个并行执行多个节点的机制
D) 一个用于记录日志的装饰器 -
子图(Subgraph)的主要优点不包括以下哪项?
A) 代码复用
B) 隐藏实现复杂度
C) 强制所有节点同步执行
D) 模块化设计 -
要实现“循环直到条件满足”,最常用的模式是?
A) 使用多个独立的invoke调用
B) 在一个节点内写while循环
C) 通过条件边使节点指向自身
D) 使用SendAPI直接发送消息
答案:
- C。状态是LangGraph中节点间通信的核心数据结构。
- D。
lambda old, new: new会直接用新值覆盖旧值。默认策略(C)在某些情况下可能引发冲突。- B。条件边通过一个路由函数来实现,该函数读取状态并返回下一个要执行的节点名。
- C。子图并不强制同步执行,它主要是为了组织和复用代码逻辑。
- C。通过条件边,让节点的出口指向自己,并在路由函数中判断循环条件,是实现循环的标准模式。
4.2 填空题
- 在定义状态时,使用
Annotated[str, operator.add]声明一个字段,表示对该字段的更新将采用______策略。 - 标志工作流结束的特殊节点名是______。
- 要编译一个
StateGraph,需要调用______方法。 - 用于异步流式执行图并获取状态更新迭代器的方法是______。
- 实现“中断-继续”模式时,通常需要将中间状态______,并在获得外部输入后,用更新后的状态再次调用
invoke。
答案:
- 追加
END.compile().astream()或.astream_events()- 持久化存储(或保存到数据库/内存/会话中)
4.3 简答题
-
简述状态(State)在LangGraph中的作用和重要性。
答:状态是LangGraph工作流的“记忆”和通信总线。它是一个中心化的、类型化的数据结构,贯穿于图的整个执行过程。每个节点读取状态的特定部分,进行计算,并将结果以增量更新的方式写回状态。其重要性体现在:1) 数据传递:是节点间共享信息的唯一方式;2) 流程控制:状态中的值常被用于条件边的路由判断,决定工作流走向;3) 可观测性:完整的状态历史便于调试和追踪应用行为;4) 持久化与恢复:状态可以被保存,使得长时间运行或中断的工作流能够从断点恢复。
-
解释条件边(Conditional Edge)和固定边(Fixed Edge)的区别及应用。
答:固定边直接从节点A连接到节点B(或
END),执行完A后无条件执行B,用于定义确定的、线性的执行顺序。条件边则连接节点A到一个路由函数,该函数在运行时检查当前状态,并动态返回下一个要执行的节点名称。区别在于:固定边是静态的、预先定义好的;条件边是动态的、依赖于运行时状态。应用上,固定边用于构建顺序流程;条件边用于实现分支(如if/else)和循环(如while)逻辑,是使工作流具备“智能”和灵活性的关键。 -
描述使用子图(Subgraph)构建复杂应用的两种好处。
答:1) 模块化与复用:可以将通用的、功能独立的逻辑(如“用户身份验证”、“数据清洗”、“报告生成”)封装成子图。这些子图可以在多个主图中被调用,避免代码重复,提高可维护性。2) 抽象与复杂度管理:将大型复杂工作流分解为多个逻辑清晰的子图,降低了单个图的规模和认知负荷。开发者可以专注于子图内部的逻辑,并通过定义清晰的输入/输出接口来组合它们,这使得架构更清晰,易于团队协作和测试。
4.4 实操题
题目:构建一个简单的“旅行规划助手”工作流。要求:
- 定义状态,包含
destination(目的地),days(天数),interests(兴趣列表,如[“美食”, “古迹”]),plan(计划,字符串),cost_estimate(预算估算)。 - 实现三个节点:
generate_plan: 根据目的地、天数和兴趣,生成一个简单的文字计划(可模拟,例如:f”在{destination}进行{days}日游,重点:{‘, ‘.join(interests)}。“)。estimate_cost: 根据天数和兴趣数量粗略估算预算(可模拟,例如:days * 1000 + len(interests) * 500)。format_output: 将计划和预算格式化为一个友好的字符串。
- 使用条件边实现:如果用户兴趣列表为空,则跳过
generate_plan节点,直接进入estimate_cost节点(生成一个默认计划),否则正常执行generate_plan->estimate_cost。 - 最后总是执行
format_output节点。 - 编写代码并测试两种场景:
- 场景A:
{“destination”: “北京”, “days”: 3, “interests”: [“故宫”, “长城”]} - 场景B:
{“destination”: “上海”, “days”: 2, “interests”: []}
- 场景A:
参考实现代码:
# 练习题 4.4 实操题答案
from typing import Annotated, TypedDict
import operator
from langgraph.graph import StateGraph, END
# 1. 定义状态
class TravelPlanState(TypedDict):
destination: str
days: int
interests: Annotated[list, operator.add] # 注意:列表的合并策略也是add
plan: Annotated[str, operator.add]
cost_estimate: int
final_output: str
# 2. 实现节点函数
def check_interests_and_route(state: TravelPlanState) -> str:
"""路由节点:检查兴趣列表是否为空"""
interests = state.get("interests", [])
if not interests: # 如果兴趣列表为空
return "estimate_cost_node" # 跳过生成计划,直接去估算成本
else:
return "generate_plan_node" # 正常执行生成计划
def generate_plan(state: TravelPlanState):
"""生成计划节点"""
dest = state.get("destination", "某地")
days = state.get("days", 1)
interests = state.get("interests", [])
plan_text = f"为您规划{dest}{days}日游。您的兴趣点包括:{', '.join(interests)}。我们将据此安排行程。\n"
return {"plan": plan_text}
def estimate_cost(state: TravelPlanState):
"""估算成本节点"""
days = state.get("days", 1)
interests = state.get("interests", [])
# 模拟一个简单的成本计算逻辑
base_cost = days * 1000
interest_surcharge = len(interests) * 500
total_cost = base_cost + interest_surcharge
# 如果计划为空(即跳过了generate_plan),则生成一个默认计划
current_plan = state.get("plan", "")
if not current_plan:
dest = state.get("destination", "某地")
default_plan = f"为您规划{dest}{days}日自由行,行程较为灵活。\n"
return {"plan": default_plan, "cost_estimate": total_cost}
else:
return {"cost_estimate": total_cost}
def format_output(state: TravelPlanState):
"""格式化输出节点"""
dest = state.get("destination", "N/A")
plan = state.get("plan", "暂无计划")
cost = state.get("cost_estimate", 0)
formatted = f"旅行目的地:{dest}\n"
formatted += "-" * 20 + "\n"
formatted += f"行程计划:\n{plan}\n"
formatted += f"初步预算估算:¥{cost} 元\n"
formatted += "-" * 20
return {"final_output": formatted}
# 3. 构建图
builder = StateGraph(TravelPlanState)
# 添加节点
builder.add_node("route_by_interests", check_interests_and_route) # 注意:这是一个路由函数,但作为节点添加
builder.add_node("generate_plan_node", generate_plan)
builder.add_node("estimate_cost_node", estimate_cost)
builder.add_node("format_output_node", format_output)
# 设置入口点为路由节点
builder.set_entry_point("route_by_interests")
# 添加条件边:路由节点根据兴趣列表决定下一步
builder.add_conditional_edges(
"route_by_interests",
# 这个函数返回下一个节点名,但我们已将路由逻辑放在节点函数自身中
# 这里需要一个调用节点函数的路由函数,但更简单的方法是将路由逻辑直接写在这里
# 我们需要重构:将check_interests_and_route的逻辑移到lambda中
lambda s: "estimate_cost_node" if not s.get("interests", []) else "generate_plan_node"
)
# 添加固定边
builder.add_edge("generate_plan_node", "estimate_cost_node")
builder.add_edge("estimate_cost_node", "format_output_node")
builder.add_edge("format_output_node", END)
# 编译图
travel_planner = builder.compile()
# 4. 测试场景
print("=== 场景A:有具体兴趣 ===")
input_state_a = {
"destination": "北京",
"days": 3,
"interests": ["故宫", "长城", "美食"]
}
result_a = travel_planner.invoke(input_state_a)
print(result_a["final_output"])
print("\n\n=== 场景B:无具体兴趣(兴趣列表为空)===")
input_state_b = {
"destination": "上海",
"days": 2,
"interests": [] # 空列表
}
result_b = travel_planner.invoke(input_state_b)
print(result_b["final_output"])
# 可选:打印最终状态以查看所有字段
print("\n场景A完整状态:")
for k, v in result_a.items():
if k != 'final_output':
print(f" {k}: {v}")
代码要点解析:
- 状态合并:
interests字段使用Annotated[list, operator.add],这意味着如果多个节点返回{"interests": [“新兴趣"]},这些列表会被合并。但在本题中,兴趣列表是输入,不会被修改,所以此策略影响不大。 - 条件路由:我们没有创建一个单独的节点来存放
check_interests_and_route函数,而是直接在add_conditional_edges中使用了一个lambda函数来实现路由逻辑。这使得图更简洁。路由逻辑是:如果兴趣列表为空,则直接跳转到estimate_cost_node,否则去generate_plan_node。 - 默认计划:在
estimate_cost节点中,我们检查state.get(“plan”, “”)是否为空。如果为空,说明跳过了计划生成节点,因此我们在该节点内生成一个默认的计划文本。这确保了无论走哪条路径,plan字段最终都有值。 - 执行流程:
- 场景A(有兴趣):入口 -> 路由节点(条件边:有兴趣)->
generate_plan_node->estimate_cost_node->format_output_node-> END。 - 场景B(无兴趣):入口 -> 路由节点(条件边:无兴趣)->
estimate_cost_node(在此生成默认计划)->format_output_node-> END。
- 场景A(有兴趣):入口 -> 路由节点(条件边:有兴趣)->
五、总结
LangGraph通过引入“图”这一核心抽象,为构建复杂、有状态、可控制的AI应用工作流提供了一套强大而优雅的范式。本文系统性地介绍了其基础知识与核心图形API。
我们从状态、节点、边、图这四个基本概念入手,理解了LangGraph如何将应用程序建模为一个数据流图。状态是流动的数据载体,节点是处理单元,边是控制流路径,而图则是它们的有机整体。通过合并策略,LangGraph优雅地解决了并行或冲突写入状态的问题。
在图形API部分,我们探讨了更高级的控制流模式。条件边是实现智能路由和循环的基石。中断机制通过状态持久化和再次调用,为构建需要与人或其他系统交互的Agent提供了可能。子图是管理复杂性和促进复用的最佳实践,允许我们将大问题分解为小模块。最后,流式处理不仅是提升用户体验的关键,其astream和astream_events API也为实时监控和调试提供了强大工具。
LangGraph不是要替代LangChain的核心链(LCEL),而是对其的补充和增强。当你的应用逻辑从简单的线性链发展为包含分支、循环、工具调用和状态管理时,LangGraph便成为了自然的选择。它特别适合于构建多轮对话Agent、复杂决策系统、审批工作流和任何需要维护长期上下文与执行历史的自动化任务。
掌握LangGraph,意味着你掌握了设计和实现下一代智能、健壮且可维护的AI应用架构的关键技能。从本章介绍的基础和API出发,你可以继续探索其与LangChain工具的更深度集成、多Agent协作、持久化状态存储等高级主题,从而将你的AI想法高效地转化为现实。
🌟 感谢您耐心阅读到这里!
🚀 技术成长没有捷径,但每一次的阅读、思考和实践,都在默默缩短您与成功的距离。
💡 如果本文对您有所启发,欢迎点赞👍、收藏📌、分享📤给更多需要的伙伴!
🗣️ 期待在评论区看到您的想法、疑问或建议,我会认真回复,让我们共同探讨、一起进步~
🔔 关注我,持续获取更多干货内容!
🤗 我们下篇文章见!
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)