LangGraph 实战:如何实现 Human-in-the-Loop(人机协同)工作流
LangGraph 实战:如何实现 Human-in-the-Loop(人机协同)工作流
一、引言 (Introduction)
钩子 (The Hook)
想象一下:你在开发一款面向法律从业者的智能合同起草助手,系统用 GPT-4o 或 Claude 3.5 Sonnet 快速生成了一份看似完美的租赁合同,但当你把它丢给法务总监时,发现系统偷偷在“不可抗力条款”里加了“陨石撞击导致土地使用权灭失”这种极端但完全没必要且增加争议风险的条目;又或者,你在做一个电商客服工单分类与自动回复系统,系统把“客户投诉收到的iPhone是砖头模型”这种需要人工紧急介入的工单,误分到了“普通物流延迟建议”的队列里——这两个场景,是当前AI应用落地中遇到的90%以上的致命痛点之一:纯LLM驱动的系统,没有“边界感”、缺乏“领域深度常识”、做不出“需要道德/法律/业务规则严格约束的决策”。
那么问题来了:有没有一种技术,能让我们在AI的“思考链条”(Chain of Thought, CoT)或者“执行流程”(Workflow)中,精准地“插缝”让人介入——比如在LLM生成极端不可抗力条款前,先让法务专家“预定义”允许的条款范围?或者在LLM对工单分类置信度低于95%时,强制弹框让客服主管“复核确认”?甚至在自动回复生成前,先让资深客服“审核修改”再发送?
答案当然是肯定的,而且已经有了成熟的生产级框架——LangGraph。
定义问题/阐述背景 (The “Why”)
1.1 纯LLM应用的三大核心局限
在深入LangGraph和Human-in-the-Loop(以下简称HITL)之前,我们必须先搞清楚,为什么纯LLM应用无法直接落地到高价值、高风险场景:
- 幻觉(Hallucination)问题不可避免:即使是GPT-4o这种顶级模型,在生成非事实性内容(尤其是专业领域的细节)时,幻觉率仍然在10%-30%之间——对于医疗、法律、金融、航空航天这些“一次错误可能倾家荡产/危及生命”的场景,这个风险是绝对不可接受的。
- 置信度评估(Confidence Estimation)能力不足:LLM本质上是一个“文本预测机器”,它输出的内容,我们很难直观地知道“它到底有多大把握说对了”——比如它生成的一份财务审计报告,可能看起来头头是道,但其中的资产负债率计算其实错了一个小数点,而LLM自己根本意识不到。
- 复杂业务规则(Complex Business Rules)的硬编码与软约束难以平衡:有些场景下,业务规则是绝对刚性的(比如电商的“退款必须在7天无理由期内,且商品未拆封”),纯LLM很难100%严格遵守(它可能会忘记“未拆封”这个条件);而有些场景下,业务规则是软约束的(比如客服回复要“亲切但不越界”),硬编码又会显得太死板——如何在这两者之间找到平衡,是纯LLM应用的另一个难题。
1.2 HITL人机协同的核心价值
正是为了解决纯LLM应用的这些局限,Human-in-the-Loop(人机协同) 应运而生——它的核心思想是:把AI的“计算能力”、“速度优势”和“初步处理能力”,与人的“道德判断”、“领域常识”、“复杂决策能力”结合起来,形成一个“1+1>2”的闭环系统。
HITL人机协同在AI应用落地中,有以下几个不可替代的核心价值:
- 降低幻觉风险:在AI生成内容或做出决策的关键节点,让人介入“复核确认”或“修改优化”,可以把最终输出的幻觉率降到1%以下(甚至接近0%)。
- 提升决策可靠性:通过预定义的“置信度阈值触发规则”或“业务规则触发规则”,让人介入处理AI“拿不准”或“不符合规则”的情况,确保最终输出是100%可靠的。
- 平衡刚性与柔性规则:可以把刚性规则硬编码在流程的“条件分支”或“工具调用验证”环节,把柔性规则交给人在介入时“灵活调整”。
- 持续优化模型/系统:通过记录人在介入时的“修改记录”、“反馈信息”,可以持续训练微调LLM,或者优化流程的触发规则,让系统变得越来越“智能”,越来越“少需要人介入”。
1.3 LangGraph为什么是实现HITL的最佳选择?
现在市面上有很多实现HITL的框架和工具——比如你可以用纯Python的异步队列(asyncio + Redis Queue)手写一个,或者用LangChain的旧版SequentialChain/LLMChain + 自定义回调(Custom Callback)来实现,甚至可以用Zapier、Make这种无代码工具来搭建——但为什么LangGraph是生产级HITL应用的最佳选择呢?
我们先看一下LangGraph的定义:LangGraph是LangChain团队推出的一个基于有向无环图(DAG)和状态机(State Machine)的多Agent/LLM应用开发框架,它专门用于构建“有状态的、可循环的、支持人机协同的、生产级的”复杂LLM应用。
接下来,我们用一个简单的对比表,来看一下LangGraph和其他实现HITL的方式相比,有哪些核心优势:
| 对比维度 | 纯Python异步队列手写 | LangChain旧版SequentialChain | Zapier/Make无代码工具 | LangGraph |
|---|---|---|---|---|
| 有状态支持 | 需自己管理Redis/DB的状态持久化 | 无原生状态持久化,需自定义存储 | 状态简单,仅支持变量传递 | 原生支持图级状态(Graph State) 和节点级状态(Node State),自带状态持久化接口(支持LangSmith、SQLite、PostgreSQL、Redis等) |
| 循环支持 | 需自己写循环逻辑和退出条件 | 无原生循环支持(只能顺序执行) | 循环支持较弱,需复杂嵌套条件 | 原生支持有向循环(Directed Cycles),可以轻松实现“反复生成-审核-修改”的HITL闭环 |
| 人机协同支持 | 需自己实现“暂停-等待-恢复”机制 | 无原生暂停/恢复机制,需自定义回调 | 仅支持简单的人工审核节点,扩展性差 | 原生支持暂停机制(Interruption)、等待机制(Wait For Input)、恢复机制(Resume),可以精准控制在哪个节点、什么条件下让人介入 |
| 多Agent支持 | 需自己管理Agent之间的通信和协调 | 无原生多Agent支持,需自定义逻辑 | 仅支持单Agent或简单的多Agent串联 | 原生支持多Agent协作,可以轻松实现“Agent生成-工具验证-人审核-另一个Agent优化”的复杂流程 |
| 生产级特性 | 需自己实现日志、监控、追踪、测试 | 仅支持LangSmith的部分监控功能 | 无原生日志、监控、追踪功能,不适合高并发场景 | 原生集成LangSmith(日志、监控、追踪、调试),自带并发控制、超时控制、重试机制,适合生产级高并发场景 |
| 扩展性 | 高,但开发成本高 | 低,仅支持LangChain的组件 | 极低,仅支持内置或第三方集成 | 极高,可以轻松集成任何LangChain的组件(LLMs、Tools、Retrievers)、任何第三方API、任何自定义代码 |
从这个对比表可以看出,LangGraph几乎完美解决了其他实现HITL方式的所有痛点——它既有纯Python手写的高扩展性,又有无代码工具的低开发成本;既有LangChain旧版的组件生态,又有生产级应用所需的所有特性;更重要的是,它原生支持有状态、有循环、有人机协同的复杂流程——这正是我们实现高价值、高风险场景HITL应用的核心需求。
亮明观点/文章目标 (The “What” & “How”)
读完这篇文章,你将从零开始,掌握LangGraph实现HITL人机协同工作流的所有核心技能,包括:
- 理解LangGraph的核心概念:图级状态、节点、边、条件边、暂停机制、等待机制、恢复机制等。
- 掌握LangGraph的核心语法和API:如何定义状态、如何定义节点、如何定义边、如何编译图、如何运行图、如何暂停和恢复图、如何持久化状态等。
- 通过3个 从简单到复杂的生产级实战案例,深入理解HITL的不同应用场景和实现方式:
- 案例一:简单的文本生成-人工审核-修改循环:适合入门,理解HITL的基本流程和LangGraph的核心语法。
- 案例二:带置信度阈值触发的工单分类-人工复核-自动回复系统:适合中级,理解如何用业务规则和置信度阈值触发HITL。
- 案例三:带工具调用验证的法律合同起草-法务预审核-专家修改-LLM定稿系统:适合高级,理解如何用多Agent、工具调用、状态持久化、LangSmith监控构建生产级HITL应用。
- 掌握LangGraph实现HITL的最佳实践和常见陷阱:如何选择合适的HITL触发方式、如何优化状态管理、如何提升系统性能、如何避免常见的错误等。
- 了解LangGraph和HITL的未来发展趋势:比如LangGraph Cloud、自动置信度评估、主动式HITL等。
接下来,我们就正式进入文章的核心内容。
二、基础知识/背景铺垫 (Foundational Concepts)
在开始实战之前,我们必须先掌握LangGraph的核心概念和HITL的常见模式——这是我们后续实战的基础。
2.1 LangGraph的核心概念
LangGraph的核心概念其实非常简单,它本质上是一个**“有状态的多Agent状态机”——我们可以把它想象成一个“带记忆的智能流程图”**:
- 流程图的节点(Node):可以是LLM、可以是工具、可以是自定义代码、也可以是“等待人工输入”的占位符。
- 流程图的边(Edge):连接节点的箭头,表示流程的执行顺序——可以是“无条件边”(执行完上一个节点,直接执行下一个节点),也可以是“条件边”(根据上一个节点的输出状态,选择执行不同的下一个节点)。
- 流程图的记忆(Graph State):贯穿整个流程的“共享状态”——所有节点都可以读取和修改这个状态,这是LangGraph和LangChain旧版SequentialChain最大的区别之一。
- 状态机的暂停/恢复机制:可以在流程的任何节点、任何条件下“暂停”流程的执行,“等待”人工输入,然后“恢复”流程的执行——这是LangGraph实现HITL的核心。
接下来,我们逐一深入解释这些核心概念。
2.1.1 图级状态(Graph State)
2.1.1.1 核心概念
图级状态(Graph State) 是LangGraph中最重要、最基础的概念——它是一个贯穿整个流程执行的共享数据结构,所有节点都可以读取(Read)、**修改(Write)或追加(Append)**这个状态。
我们可以把图级状态想象成一个**“多人协作的在线文档”**:
- 每个节点(可以是LLM、工具、自定义代码、人工)都是一个“编辑者”。
- 编辑者可以随时查看文档的内容(读取状态)。
- 编辑者可以修改文档的内容(修改状态)。
- 编辑者也可以在文档的末尾追加内容(追加状态)。
- 当所有编辑者都完成工作后,文档的最终内容就是整个流程的输出。
2.1.1.2 状态的定义方式
LangGraph支持两种定义图级状态的方式:
- 使用TypedDict(推荐用于Python 3.8+):可以给状态的每个字段指定类型,提高代码的可读性和可维护性,同时也方便IDE进行类型检查。
- 使用Pydantic BaseModel(推荐用于需要复杂验证的场景):可以给状态的每个字段指定类型、默认值、验证规则(比如最小值、最大值、正则表达式等),确保状态的合法性。
接下来,我们用一个简单的例子,来看一下如何用TypedDict和Pydantic BaseModel定义图级状态:
例子:定义一个“文本生成-人工审核-修改循环”的图级状态
这个状态需要包含以下字段:
prompt:用户的初始输入(字符串,必填)。generated_text:LLM生成的文本(字符串,初始为空)。human_feedback:人工的反馈/修改意见(字符串,初始为空)。approved:人工是否批准了生成的文本(布尔值,初始为False)。iteration_count:循环的次数(整数,初始为0,最大值为5,避免无限循环)。
(1)使用TypedDict定义状态
from typing import TypedDict, Optional, Annotated
from langgraph.graph.message import add_messages # 用于追加消息的辅助函数
# 定义一个TypedDict类型的图级状态
class TextGenerationState(TypedDict):
# 用户的初始输入:必填,字符串类型
prompt: str
# LLM生成的文本:可选,字符串类型,初始为空
generated_text: Optional[str]
# 人工的反馈/修改意见:可选,字符串类型,初始为空
human_feedback: Optional[str]
# 人工是否批准了生成的文本:必填,布尔值类型,初始为False
approved: bool
# 循环的次数:必填,整数类型,初始为0
iteration_count: int
# (可选)如果需要保存LLM和人工的对话历史,可以用Annotated + add_messages
# 这里的Annotated是Python 3.9+的类型注解,用于给类型添加元数据
# add_messages是LangGraph提供的辅助函数,用于追加消息到列表中,而不是覆盖
conversation_history: Annotated[list, add_messages]
(2)使用Pydantic BaseModel定义状态
from pydantic import BaseModel, Field, validator
from typing import Optional, Annotated
from langgraph.graph.message import add_messages
# 定义一个Pydantic BaseModel类型的图级状态
class TextGenerationState(BaseModel):
# 用户的初始输入:必填,字符串类型,最小长度为1
prompt: str = Field(..., min_length=1, description="用户的初始输入")
# LLM生成的文本:可选,字符串类型,初始为空
generated_text: Optional[str] = Field("", description="LLM生成的文本")
# 人工的反馈/修改意见:可选,字符串类型,初始为空
human_feedback: Optional[str] = Field("", description="人工的反馈/修改意见")
# 人工是否批准了生成的文本:必填,布尔值类型,初始为False
approved: bool = Field(False, description="人工是否批准了生成的文本")
# 循环的次数:必填,整数类型,初始为0,最大值为5,避免无限循环
iteration_count: int = Field(0, ge=0, le=5, description="循环的次数(最大值为5)")
# (可选)如果需要保存LLM和人工的对话历史,可以用Annotated + add_messages
# 注意:Pydantic的BaseModel默认会对可变类型(比如list)进行深拷贝,所以需要用extra = "allow"或者自定义类型
# 这里我们用自定义的Annotated类型,配合LangGraph的add_messages函数
conversation_history: Annotated[list, add_messages] = Field(default_factory=list, description="对话历史")
# 自定义验证器:确保循环次数不超过最大值
@validator("iteration_count")
def check_iteration_count(cls, v):
if v > 5:
raise ValueError("循环次数不能超过5次")
return v
2.1.1.3 状态的更新方式
LangGraph支持三种更新图级状态的方式:
- 覆盖更新(Overwrite):直接用新的值覆盖状态中对应字段的旧值——适合更新“单个值”的字段(比如
approved、iteration_count)。 - 追加更新(Append):在状态中对应字段的旧值后面追加新的值——适合更新“列表类型”的字段(比如
conversation_history),这时候需要用Annotated[list, add_messages]来定义字段类型(其中add_messages是LangGraph提供的辅助函数)。 - 合并更新(Merge):用新的字典合并状态中对应字段的旧字典——适合更新“字典类型”的字段,这时候需要用
Annotated[dict, add_dict]来定义字段类型(其中add_dict是我们自定义的辅助函数)。
接下来,我们用一个简单的例子,来看一下这三种更新方式的区别:
例子:状态更新方式的演示
from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages
# 自定义合并字典的辅助函数
def add_dict(left: dict, right: dict) -> dict:
return {**left, **right}
# 定义一个包含三种更新方式的状态
class StateUpdateDemoState(TypedDict):
# 覆盖更新的字段:单个值
overwrite_field: int
# 追加更新的字段:列表
append_field: Annotated[list, add_messages]
# 合并更新的字段:字典
merge_field: Annotated[dict, add_dict]
# 初始化状态
initial_state: StateUpdateDemoState = {
"overwrite_field": 1,
"append_field": [{"role": "user", "content": "Hello"}],
"merge_field": {"key1": "value1"}
}
# 覆盖更新:把overwrite_field的值从1改为2
overwrite_update = {"overwrite_field": 2}
# 注意:LangGraph的状态更新是“增量更新”——只需要提供要更新的字段,不需要提供所有字段
updated_state1 = {**initial_state, **overwrite_update} # 模拟LangGraph的覆盖更新
print("覆盖更新后的状态:")
print(updated_state1)
# 输出:
# {
# "overwrite_field": 2,
# "append_field": [{"role": "user", "content": "Hello"}],
# "merge_field": {"key1": "value1"}
# }
# 追加更新:在append_field的末尾追加一条新消息
append_update = {"append_field": [{"role": "assistant", "content": "Hi there"}]}
# 模拟LangGraph的追加更新:需要用add_messages函数
updated_append_field = add_messages(initial_state["append_field"], append_update["append_field"])
updated_state2 = {**initial_state, **overwrite_update, "append_field": updated_append_field}
print("\n追加更新后的状态:")
print(updated_state2)
# 输出:
# {
# "overwrite_field": 2,
# "append_field": [
# {"role": "user", "content": "Hello"},
# {"role": "assistant", "content": "Hi there"}
# ],
# "merge_field": {"key1": "value1"}
# }
# 合并更新:在merge_field中添加一个新的key-value对
merge_update = {"merge_field": {"key2": "value2"}}
# 模拟LangGraph的合并更新:需要用add_dict函数
updated_merge_field = add_dict(initial_state["merge_field"], merge_update["merge_field"])
updated_state3 = {**initial_state, **overwrite_update, "append_field": updated_append_field, "merge_field": updated_merge_field}
print("\n合并更新后的状态:")
print(updated_state3)
# 输出:
# {
# "overwrite_field": 2,
# "append_field": [
# {"role": "user", "content": "Hello"},
# {"role": "assistant", "content": "Hi there"}
# ],
# "merge_field": {"key1": "value1", "key2": "value2"}
# }
2.1.1.4 状态的持久化
LangGraph原生支持状态的持久化(State Persistence)——我们可以把流程执行过程中的状态保存到数据库(SQLite、PostgreSQL)、缓存(Redis)、文件系统或者LangSmith中,这样即使程序崩溃了,我们也可以从上次暂停的地方恢复流程的执行。
LangGraph的状态持久化是通过**Checkpointer(检查点器)** 接口实现的——我们只需要在编译图的时候,传入一个Checkpointer对象,LangGraph就会自动在流程执行的每个节点之后,保存当前的状态(检查点)。
LangGraph官方提供了以下几种Checkpointer的实现:
MemorySaver:把状态保存到内存中——适合开发和测试,不适合生产环境(因为程序崩溃后状态会丢失)。SqliteSaver:把状态保存到SQLite数据库中——适合单机生产环境。PostgresSaver:把状态保存到PostgreSQL数据库中——适合分布式生产环境。RedisSaver:把状态保存到Redis缓存中——适合高并发生产环境。
接下来,我们用一个简单的例子,来看一下如何使用MemorySaver和SqliteSaver持久化状态:
例子:状态持久化的演示
from typing import TypedDict
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.checkpoint.sqlite import SqliteSaver
# 定义一个简单的状态
class SimpleState(TypedDict):
count: int
# 定义一个简单的节点:每次把count加1
def increment_count(state: SimpleState) -> SimpleState:
return {"count": state["count"] + 1}
# 定义一个条件边:如果count小于3,就继续执行increment_count;否则,就结束
def should_continue(state: SimpleState) -> str:
if state["count"] < 3:
return "increment_count"
else:
return END
# 构建图
graph_builder = StateGraph(SimpleState)
graph_builder.add_node("increment_count", increment_count)
graph_builder.set_entry_point("increment_count")
graph_builder.add_conditional_edges("increment_count", should_continue)
# ------------------------------
# 使用MemorySaver持久化状态(开发/测试)
# ------------------------------
print("=== 使用MemorySaver持久化状态 ===")
memory_saver = MemorySaver()
# 编译图的时候传入Checkpointer
memory_graph = graph_builder.compile(checkpointer=memory_saver)
# 初始化状态
initial_state = {"count": 0}
# 初始化配置:必须包含thread_id,用于区分不同的流程实例
config = {"configurable": {"thread_id": "thread-1"}}
# 第一次运行图
print("\n第一次运行图:")
for event in memory_graph.stream(initial_state, config=config, stream_mode="values"):
print(event)
# 输出:
# {'count': 1}
# {'count': 2}
# {'count': 3}
# 检查当前的状态
print("\n当前的状态:")
current_state = memory_graph.get_state(config=config)
print(current_state.values)
# 输出:{'count': 3}
# 修改状态:把count改为1
print("\n修改状态:把count改为1")
memory_graph.update_state(config=config, {"count": 1})
# 检查修改后的状态
print("\n修改后的状态:")
current_state = memory_graph.get_state(config=config)
print(current_state.values)
# 输出:{'count': 1}
# 第二次运行图:从修改后的状态继续执行
print("\n第二次运行图:从修改后的状态继续执行")
for event in memory_graph.stream(None, config=config, stream_mode="values"):
print(event)
# 输出:
# {'count': 2}
# {'count': 3}
# ------------------------------
# 使用SqliteSaver持久化状态(单机生产)
# ------------------------------
print("\n=== 使用SqliteSaver持久化状态 ===")
# 创建SqliteSaver对象,连接到test.db数据库
sqlite_saver = SqliteSaver.from_conn_string("test.db")
# 编译图的时候传入Checkpointer
sqlite_graph = graph_builder.compile(checkpointer=sqlite_saver)
# 初始化配置
config = {"configurable": {"thread_id": "thread-2"}}
# 第一次运行图
print("\n第一次运行图:")
for event in sqlite_graph.stream(initial_state, config=config, stream_mode="values"):
print(event)
# 输出:
# {'count': 1}
# {'count': 2}
# {'count': 3}
# 程序关闭后,重新打开,仍然可以从上次的状态恢复(这里就不演示了,大家可以自己试一下)
2.1.2 节点(Node)
2.1.2.1 核心概念
节点(Node) 是LangGraph中执行具体任务的单元——它可以是任何东西:
- LLM节点:调用LLM生成文本或做出决策。
- 工具节点:调用外部工具(比如搜索工具、计算器工具、数据库工具等)获取信息或执行操作。
- 自定义代码节点:执行自定义的Python代码(比如数据清洗、状态验证、业务逻辑处理等)。
- 等待人工输入节点:暂停流程的执行,等待人工输入信息或修改状态——这是LangGraph实现HITL的核心节点之一。
2.1.2.2 节点的定义方式
LangGraph中定义节点的方式非常简单——只需要写一个Python函数,这个函数需要满足以下两个条件:
- 输入参数:必须包含一个
state参数,类型是图级状态的类型(比如TextGenerationState)。 - 返回值:必须返回一个字典,这个字典包含要更新的图级状态的字段(增量更新,不需要返回所有字段)。
接下来,我们用一个简单的例子,来看一下如何定义LLM节点、工具节点、自定义代码节点和等待人工输入节点:
例子:定义不同类型的节点
首先,我们需要安装必要的依赖包:
pip install langgraph langchain-openai langchain-community python-dotenv
然后,我们创建一个.env文件,存放我们的API密钥:
OPENAI_API_KEY=your-openai-api-key-here
接下来,我们写代码定义不同类型的节点:
import os
from typing import TypedDict, Annotated
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.messages import HumanMessage, AIMessage, ToolMessage
from langgraph.graph.message import add_messages
# 加载环境变量
load_dotenv()
# ------------------------------
# 1. 定义图级状态
# ------------------------------
class MultiNodeDemoState(TypedDict):
prompt: str
llm_response: str
tool_result: str
custom_code_result: int
human_input: str
conversation_history: Annotated[list, add_messages]
# ------------------------------
# 2. 初始化LLM
# ------------------------------
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# ------------------------------
# 3. 定义不同类型的节点
# ------------------------------
# ------------------------------
# 3.1 LLM节点:调用LLM生成文本
# ------------------------------
def llm_node(state: MultiNodeDemoState) -> MultiNodeDemoState:
print("--- 正在执行LLM节点 ---")
# 构建LLM的输入:用户的prompt + 对话历史
messages = state["conversation_history"] + [HumanMessage(content=state["prompt"])]
# 调用LLM
response = llm.invoke(messages)
# 更新状态
return {
"llm_response": response.content,
"conversation_history": [HumanMessage(content=state["prompt"]), response]
}
# ------------------------------
# 3.2 工具节点:调用外部工具(这里定义一个简单的计算器工具)
# ------------------------------
@tool
def calculator_tool(expression: str) -> str:
"""一个简单的计算器工具,用于计算数学表达式的结果。"""
print("--- 正在执行工具节点(计算器) ---")
try:
# 注意:在生产环境中,不要直接使用eval函数,因为它存在安全风险
# 这里只是为了演示,生产环境中应该使用更安全的库(比如ast.literal_eval + 简单的解析器)
result = eval(expression)
return f"计算结果:{result}"
except Exception as e:
return f"计算错误:{str(e)}"
# 定义工具节点的函数
def tool_node(state: MultiNodeDemoState) -> MultiNodeDemoState:
# 绑定工具到LLM
llm_with_tools = llm.bind_tools([calculator_tool])
# 构建LLM的输入:对话历史
messages = state["conversation_history"]
# 调用LLM,获取工具调用请求
response = llm_with_tools.invoke(messages)
# 如果LLM返回了工具调用请求,就执行工具
if response.tool_calls:
tool_call = response.tool_calls[0]
# 执行工具
tool_response = calculator_tool.invoke(tool_call)
# 更新状态
return {
"tool_result": tool_response.content,
"conversation_history": [response, ToolMessage(content=tool_response.content, tool_call_id=tool_call["id"])]
}
else:
# 如果LLM没有返回工具调用请求,就直接返回LLM的响应
return {
"llm_response": response.content,
"conversation_history": [response]
}
# ------------------------------
# 3.3 自定义代码节点:执行自定义的Python代码(这里计算一个字符串的长度)
# ------------------------------
def custom_code_node(state: MultiNodeDemoState) -> MultiNodeDemoState:
print("--- 正在执行自定义代码节点 ---")
# 计算LLM响应的长度
llm_response_length = len(state["llm_response"])
# 更新状态
return {
"custom_code_result": llm_response_length
}
# ------------------------------
# 3.4 等待人工输入节点:暂停流程的执行,等待人工输入(这里我们先留空,后面在HITL部分详细讲)
# ------------------------------
# 注意:LangGraph中实现等待人工输入的方式有两种:
# 1. 使用InterruptBefore/InterruptAfter:在某个节点之前或之后暂停流程的执行。
# 2. 使用自定义的等待节点:在节点中抛出一个特殊的异常,暂停流程的执行。
# 我们后面在HITL部分详细讲这两种方式。
2.1.3 边(Edge)
2.1.3.1 核心概念
边(Edge) 是LangGraph中连接节点的箭头,它表示流程的执行顺序——可以是“无条件边”,也可以是“条件边”。
2.1.3.2 边的类型
LangGraph支持以下三种类型的边:
- 无条件边(Unconditional Edge):执行完上一个节点后,直接执行下一个节点,不需要任何条件判断。
- 条件边(Conditional Edge):执行完上一个节点后,根据上一个节点的输出状态,选择执行不同的下一个节点。
- 入口点(Entry Point):流程开始执行的第一个节点。
- 结束点(END):流程执行结束的标志——可以是一个节点,也可以是条件边的一个返回值。
2.1.3.3 边的定义方式
LangGraph中定义边的方式非常简单——我们只需要使用StateGraph对象的以下方法:
set_entry_point(node_name: str):设置流程的入口点。add_edge(start_node: str, end_node: str):添加一条无条件边,从start_node指向end_node。add_conditional_edges(start_node: str, condition_func: Callable, path_map: Optional[Dict[str, str]] = None):添加一条条件边,从start_node出发,根据condition_func的返回值,选择执行不同的下一个节点——如果path_map不为空,就把condition_func的返回值映射到path_map中的节点名;如果path_map为空,就直接把condition_func的返回值作为节点名。
接下来,我们用一个简单的例子,来看一下如何定义不同类型的边:
例子:定义不同类型的边
我们继续使用上一个例子中的状态和节点,来构建一个包含不同类型边的图:
from langgraph.graph import StateGraph, END
# ------------------------------
# 1. 构建图
# ------------------------------
graph_builder = StateGraph(MultiNodeDemoState)
# ------------------------------
# 2. 添加节点
# ------------------------------
graph_builder.add_node("llm_node", llm_node)
graph_builder.add_node("tool_node", tool_node)
graph_builder.add_node("custom_code_node", custom_code_node)
# ------------------------------
# 3. 定义条件边的函数
# ------------------------------
def should_use_tool(state: MultiNodeDemoState) -> str:
"""判断是否需要使用工具:如果用户的prompt中包含“计算”、“math”、“calculator”等关键词,就使用工具;否则,就直接执行自定义代码节点。"""
prompt = state["prompt"].lower()
if any(keyword in prompt for keyword in ["计算", "math", "calculator", "sum", "difference", "product", "quotient"]):
return "use_tool"
else:
return "skip_tool"
# ------------------------------
# 4. 添加边
# ------------------------------
# 设置入口点为llm_node
graph_builder.set_entry_point("llm_node")
# 添加条件边:从llm_node出发,根据should_use_tool的返回值选择执行不同的节点
graph_builder.add_conditional_edges(
start_node="llm_node",
condition=should_use_tool,
path_map={
"use_tool": "tool_node",
"skip_tool": "custom_code_node"
}
)
# 添加无条件边:从tool_node指向custom_code_node
graph_builder.add_edge("tool_node", "custom_code_node")
# 添加无条件边:从custom_code_node指向END
graph_builder.add_edge("custom_code_node", END)
# ------------------------------
# 5. 编译图
# ------------------------------
graph = graph_builder.compile()
# ------------------------------
# 6. 可视化图(可选,需要安装graphviz)
# ------------------------------
try:
from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))
except ImportError:
print("请安装graphviz和IPython来可视化图:pip install graphviz IPython")
# 或者直接打印mermaid代码
print(graph.get_graph().draw_mermaid())
如果我们安装了graphviz和IPython,就可以看到这个图的可视化效果——它是一个典型的“先LLM,再根据条件选择是否使用工具,最后执行自定义代码”的流程。
接下来,我们可以测试一下这个图:
# ------------------------------
# 7. 测试图:不需要使用工具的情况
# ------------------------------
print("=== 测试图:不需要使用工具的情况 ===")
initial_state = {
"prompt": "请用一句话介绍LangGraph。",
"conversation_history": []
}
for event in graph.stream(initial_state, stream_mode="values"):
print("\n当前状态:")
print(event)
# ------------------------------
# 8. 测试图:需要使用工具的情况
# ------------------------------
print("\n=== 测试图:需要使用工具的情况 ===")
initial_state = {
"prompt": "请计算123 + 456 * 789的结果。",
"conversation_history": []
}
for event in graph.stream(initial_state, stream_mode="values"):
print("\n当前状态:")
print(event)
2.1.4 暂停机制(Interruption)与等待机制(Wait For Input)
2.1.4.1 核心概念
暂停机制(Interruption) 和等待机制(Wait For Input) 是LangGraph实现HITL的核心中的核心——它们允许我们在流程的任何节点、任何条件下,暂停流程的执行,等待人工输入信息或修改状态,然后恢复流程的执行。
LangGraph中实现暂停和等待的方式主要有两种:
- 使用
InterruptBefore或InterruptAfter:在编译图的时候,指定在某个节点之前或之后暂停流程的执行——这是最简单、最常用的方式。 - 使用自定义的
NodeInterrupt异常:在节点的函数中,抛出一个langgraph.errors.NodeInterrupt异常,暂停流程的执行——这是更灵活的方式,可以根据节点的输出状态,动态决定是否暂停。
接下来,我们逐一深入解释这两种方式。
2.1.4.2 使用InterruptBefore或InterruptAfter实现暂停
这是LangGraph实现HITL最简单、最常用的方式——我们只需要在编译图的时候,传入interrupt_before或interrupt_after参数,指定在哪些节点之前或之后暂停流程的执行。
interrupt_before和interrupt_after参数的取值可以是:
- 一个字符串:表示在单个节点之前或之后暂停。
- 一个字符串列表:表示在多个节点之前或之后暂停。
- 一个函数:表示根据当前的状态,动态决定是否在某个节点之前或之后暂停——函数的输入参数是当前的状态,返回值是
True(暂停)或False(不暂停)。
接下来,我们用一个简单的例子,来看一下如何使用InterruptBefore实现暂停:
例子:使用InterruptBefore实现暂停(文本生成-人工审核-修改循环的简化版)
from typing import TypedDict, Optional
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
# ------------------------------
# 1. 定义图级状态
# ------------------------------
class SimpleHITLState(TypedDict):
prompt: str
generated_text: Optional[str]
approved: bool
iteration_count: int
# ------------------------------
# 2. 初始化LLM
# ------------------------------
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
# ------------------------------
# 3. 定义节点
# ------------------------------
# 定义生成文本的节点
def generate_text_node(state: SimpleHITLState) -> SimpleHITLState:
print("--- 正在执行生成文本节点 ---")
# 如果是第一次循环,就直接根据用户的prompt生成文本;否则,就根据人工的反馈修改文本
if state["iteration_count"] == 0:
prompt = state["prompt"]
else:
# 这里我们简化一下,人工的反馈直接作为新的prompt(后面在实战案例中会更完善)
prompt = f"请根据以下反馈修改之前生成的文本:\n反馈:{state.get('human_feedback', '')}\n之前的文本:{state['generated_text']}"
# 调用LLM生成文本
response = llm.invoke(prompt)
# 更新状态:把iteration_count加1
return {
"generated_text": response.content,
"iteration_count": state["iteration_count"] + 1
}
# 定义条件边的函数:如果人工批准了,就结束;否则,就继续生成
def should_continue(state: SimpleHITLState) -> str:
if state["approved"]:
return END
else:
return "generate_text_node"
# ------------------------------
# 4. 构建图
# ------------------------------
graph_builder = StateGraph(SimpleHITLState)
# 添加节点
graph_builder.add_node("generate_text_node", generate_text_node)
# 设置入口点
graph_builder.set_entry_point("generate_text_node")
# 添加条件边:注意,这里我们需要先暂停,等待人工审核,然后再判断是否继续
# 所以我们把条件边的起点设置为一个“虚拟节点”?不,我们可以在generate_text_node之后暂停,然后等待人工修改状态(approved和human_feedback),然后再继续执行条件边
# 或者更简单的方式:我们添加一个“人工审核节点”,在这个节点之前暂停
# 这里我们先添加一个“人工审核节点”,其实它什么都不做,只是作为一个暂停的标记
def human_review_node(state: SimpleHITLState) -> SimpleHITLState:
print("--- 正在执行人工审核节点(只是一个标记) ---")
return {}
graph_builder.add_node("human_review_node", human_review_node)
# 添加无条件边:从generate_text_node指向human_review_node
graph_builder.add_edge("generate_text_node", "human_review_node")
# 添加条件边:从human_review_node出发,根据should_continue的返回值选择执行不同的节点
graph_builder.add_conditional_edges("human_review_node", should_continue)
# ------------------------------
# 5. 编译图:在human_review_node之前暂停
# ------------------------------
memory_saver = MemorySaver()
# 注意:我们这里使用interrupt_before=["human_review_node"],表示在执行human_review_node之前暂停流程的执行
graph = graph_builder.compile(
checkpointer=memory_saver,
interrupt_before=["human_review_node"]
)
# ------------------------------
# 6. 可视化图
# ------------------------------
try:
from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))
except ImportError:
print("请安装graphviz和IPython来可视化图:pip install graphviz IPython")
print(graph.get_graph().draw_mermaid())
# ------------------------------
# 7. 测试图
# ------------------------------
print("=== 测试HITL图 ===")
# 初始化状态
initial_state = {
"prompt": "请用一句话介绍Python编程语言。",
"approved": False,
"iteration_count": 0
}
# 初始化配置
config = {"configurable": {"thread_id": "thread-hitl-1"}}
# ------------------------------
# 7.1 第一次运行图:生成文本,然后暂停在human_review_node之前
# ------------------------------
print("\n--- 第一次运行图:生成文本,然后暂停 ---")
for event in graph.stream(initial_state, config=config, stream_mode="values"):
print("\n
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)