LangGraph 思维模式教程(三):识别节点类型
·
本系列教程将带你深入理解 LangGraph 的思维方式,学会用 LangGraph 的方式构建智能体。
本章主题:Step 2 - 识别四种节点类型及其特点 阅读时间:约 20 分钟
开篇:为什么需要分类节点?
不同节点有不同特点
在上一章,我们把邮件处理流程拆解成了 7 个节点。但这些节点并不都是一样的:
节点差异:
read_email:
- 从邮箱读取邮件
- 只读操作,没有副作用
- 可以缓存
classify_intent:
- 使用 LLM 分析邮件
- 调用外部 API
- 可能需要重试
search_documentation:
- 搜索知识库
- 只读操作
- 可以缓存
bug_tracking:
- 创建 Bug 工单
- 有副作用(创建记录)
- 不能缓存
draft_response:
- 使用 LLM 生成回复
- 调用外部 API
- 可能需要重试
human_review:
- 等待人工审核
- 可能等待很长时间
- 需要检查点
send_reply:
- 发送邮件
- 有副作用(发送邮件)
- 不能缓存
分类的好处
理解这些差异,是构建高质量智能体的关键:
好处一:正确的错误处理
- LLM 节点:处理 API 限流、解析错误
- 数据节点:处理网络超时、数据不存在
- 动作节点:处理执行失败、需要补偿
- 用户节点:处理超时、取消
好处二:正确的优化策略
- LLM 节点:可以缓存结果
- 数据节点:可以缓存、并行查询
- 动作节点:不能缓存,需要幂等性
- 用户节点:需要检查点
好处三:正确的测试策略
- LLM 节点:Mock LLM 响应
- 数据节点:Mock 数据源
- 动作节点:Mock 外部服务
- 用户节点:模拟用户输入
好处四:正确的监控策略
- LLM 节点:监控 token 使用、响应时间
- 数据节点:监控查询时间、缓存命中率
- 动作节点:监控成功率、失败原因
- 用户节点:监控等待时间、响应率
一、四种节点类型概览
LangGraph 中的节点可以分为四种类型:
┌─────────────────────────────────────────────────────────────┐
│ 四种节点类型 │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. LLM 节点 │
│ 调用大语言模型 │
│ 示例:分类意图、起草回复、生成内容 │
│ 特点:可能失败、可缓存、需要重试 │
│ │
│ 2. 数据节点 │
│ 查询外部数据源 │
│ 示例:搜索文档、查询数据库、调用 API │
│ 特点:只读、可缓存、需要重试 │
│ │
│ 3. 动作节点 │
│ 执行外部动作 │
│ 示例:发送邮件、创建工单、写入数据库 │
│ 特点:有副作用、不缓存、需要幂等性 │
│ │
│ 4. 用户节点 │
│ 等待人工输入 │
│ 示例:人工审核、确认操作、提供信息 │
│ 特点:可长时间等待、需要检查点 │
│ │
└─────────────────────────────────────────────────────────────┘
1.1 类型对比
| 类型 | 主要操作 | 是否有副作用 | 是否需要重试 | 是否可缓存 | 是否需要检查点 |
|---|---|---|---|---|---|
| LLM 节点 | 调用模型 | 否 | 是 | 可缓存结果 | 否 |
| 数据节点 | 查询数据 | 否 | 是 | 可缓存结果 | 否 |
| 动作节点 | 执行动作 | 是 | 是 | 不缓存 | 否 |
| 用户节点 | 等待输入 | 否 | 否 | 不缓存 | 是 |
1.2 类型选择指南
如何选择节点类型?
问题 1:这个节点是否调用大模型?
├─ 是 → LLM 节点
└─ 否 → 继续问题 2
问题 2:这个节点是否等待用户输入?
├─ 是 → 用户节点
└─ 否 → 继续问题 3
问题 3:这个节点是否修改外部状态?
├─ 是 → 动作节点
└─ 否 → 数据节点
二、LLM 节点
2.1 什么是 LLM 节点?
LLM 节点是调用大语言模型的节点,用于:
用途一:理解
- 分析文本内容
- 提取关键信息
- 理解用户意图
用途二:生成
- 生成回复内容
- 撰写邮件、文档
- 翻译、改写
用途三:推理
- 做决策判断
- 分类归类
- 问题诊断
2.2 LLM 节点的特点
LLM 节点特点:
输入:
├── 静态上下文:提示词模板、指令、示例
└── 动态上下文:从状态获取的数据
处理:
├── 格式化提示词
├── 调用 LLM
└── 解析响应
输出:
└── 结构化的结果(存入状态)
错误处理:
├── 重试:网络问题、限流
├── 回退:解析失败时重新调用
└── 降级:使用默认值
优化:
├── 缓存:相同提示词可以缓存
├── 流式:支持流式输出
└── 批量:多个请求可以批量处理
2.3 为什么 LLM 节点需要特殊处理?
原因一:可能失败
- 网络问题
- API 限流
- 模型过载
→ 需要重试策略
原因二:输出不确定
- 相同输入可能不同输出
- 可能不符合预期格式
→ 需要结构化输出
原因三:成本较高
- Token 消耗
- 延迟较高
→ 需要缓存和优化
原因四:可以做决策
- LLM 可以根据内容做判断
- 可以决定下一步去哪
→ 可以使用 Command
2.4 实例:分类意图节点
from langchain_openai import ChatOpenAI
from langgraph.types import Command
from pydantic import BaseModel
llm = ChatOpenAI(model="gpt-4")
def classify_intent(state: EmailState) -> Command:
"""
LLM 节点:分类邮件意图
静态上下文:分类类别定义
动态上下文:邮件内容
输出:分类结果 + 跳转决策
"""
# 定义输出结构(使用 Pydantic)
class Classification(BaseModel):
intent: str # question, bug, billing, feature, complex
urgency: str # low, medium, high, critical
topic: str # 主题
summary: str # 摘要
# 使用结构化输出(确保格式正确)
structured_llm = llm.with_structured_output(Classification)
# 格式化提示词(动态 + 静态)
prompt = f"""
分析这封客户邮件并分类。
邮件内容:
{state['email_content']}
发送者:
{state['sender_email']}
分类类别:
- question:产品使用问题
- bug:Bug 报告
- billing:账单相关问题
- feature:功能请求
- complex:复杂技术问题
紧急程度:
- critical:需要立即处理
- high:24小时内处理
- medium:3天内处理
- low:一周内处理
"""
# 调用 LLM
classification = structured_llm.invoke(prompt)
# 决定下一步(LLM 节点可以做决策)
if classification.intent == 'billing' or classification.urgency == 'critical':
next_node = "human_review"
elif classification.intent in ['question', 'feature']:
next_node = "search_documentation"
elif classification.intent == 'bug':
next_node = "bug_tracking"
else:
next_node = "draft_response"
# 返回更新和跳转
return Command(
update={"classification": classification.model_dump()},
goto=next_node
)
代码解析:
1. 定义输出结构:
- 使用 Pydantic BaseModel
- 明确字段类型
- LLM 会按这个格式输出
2. 使用结构化输出:
- with_structured_output() 方法
- 确保输出格式正确
- 避免解析错误
3. 格式化提示词:
- 静态部分:分类类别、紧急程度定义
- 动态部分:邮件内容、发送者
4. 做决策:
- 根据分类结果决定下一步
- 使用 Command 指定跳转
- 同时更新状态
2.5 实例:起草回复节点
def draft_response(state: EmailState) -> Command:
"""
LLM 节点:起草邮件回复
静态上下文:回复指南、语气要求
动态上下文:邮件内容、分类结果、搜索结果
输出:回复文本 + 是否需要审核
"""
# 构建上下文(根据可用数据)
context_parts = []
if state.get('search_results'):
docs = "\n".join([f"- {doc}" for doc in state['search_results']])
context_parts.append(f"相关文档:\n{docs}")
if state.get('ticket_id'):
context_parts.append(f"Bug 工单:{state['ticket_id']}")
# 格式化提示词
prompt = f"""
为这封客户邮件起草回复。
原始邮件:
{state['email_content']}
分类信息:
- 意图:{state['classification']['intent']}
- 紧急程度:{state['classification']['urgency']}
- 主题:{state['classification']['topic']}
{chr(10).join(context_parts)}
回复指南:
- 专业、友好、有帮助
- 直接回答客户问题
- 如有相关文档,引用具体内容
- 如是 Bug,告知工单号
"""
# 调用 LLM
response = llm.invoke(prompt)
# 决定是否需要人工审核
needs_review = (
state['classification']['urgency'] in ['high', 'critical'] or
state['classification']['intent'] == 'complex'
)
next_node = "human_review" if needs_review else "send_reply"
return Command(
update={"response_text": response.content},
goto=next_node
)
2.6 LLM 节点最佳实践
1. 使用结构化输出
好的做法:
class Classification(BaseModel):
intent: str
urgency: str
structured_llm = llm.with_structured_output(Classification)
result = structured_llm.invoke(prompt)
# result.intent 和 result.urgency 一定存在
不好的做法:
result = llm.invoke(prompt)
# 需要手动解析,可能失败
intent = parse_intent(result.content) # 可能抛出异常
2. 分离静态和动态上下文
好的做法:
STATIC_PROMPT = """
分类类别:
- question:产品问题
- bug:Bug 报告
"""
def classify(state):
prompt = f"{STATIC_PROMPT}\n邮件:{state['email']}"
...
不好的做法:
def classify(state):
prompt = """
分类类别:
- question:产品问题
- bug:Bug 报告
邮件:{state['email']}
"""
# 静态和动态混在一起,难以维护
3. 在节点内做决策
好的做法:
def classify(state):
result = llm.invoke(prompt)
if result.intent == 'bug':
return Command(goto='bug_tracking')
else:
return Command(goto='draft')
不好的做法:
def classify(state):
result = llm.invoke(prompt)
return {"intent": result.intent}
# 跳转逻辑在图定义中,不清晰
4. 处理解析错误
好的做法:
def classify(state):
try:
result = structured_llm.invoke(prompt)
except OutputParserException:
# 解析失败,重试或使用默认值
result = llm.invoke(prompt + "\n请严格按照格式输出")
return {"classification": result}
不好的做法:
def classify(state):
result = structured_llm.invoke(prompt) # 可能失败
return {"classification": result}
三、数据节点
3.1 什么是数据节点?
数据节点是从外部数据源获取信息的节点,用于:
用途一:查询
- 搜索文档
- 查询数据库
- 检索知识
用途二:调用
- 调用外部 API
- 获取配置
- 读取文件
用途三:聚合
- 合并多个数据源
- 过滤和排序
- 格式转换
3.2 数据节点的特点
数据节点特点:
输入:
└── 查询参数(从状态构建)
处理:
├── 构建查询
├── 调用数据源
└── 处理响应
输出:
└── 原始数据(存入状态)
错误处理:
├── 重试:网络问题、超时
├── 回退:使用缓存或默认值
└── 降级:返回部分结果
优化:
├── 缓存:缓存常见查询
├── 并行:多个查询并行执行
└── 预取:预测并预取数据
注意:
├── 只读:不修改外部状态
└── 原始数据:存储原始数据,不格式化
3.3 为什么数据节点需要特殊处理?
原因一:可能失败
- 网络问题
- 数据源不可用
- 查询超时
→ 需要重试策略
原因二:可能很慢
- 数据库查询慢
- API 响应慢
- 数据量大
→ 需要缓存和优化
原因三:数据可能不存在
- 查询结果为空
- 数据已删除
- 权限不足
→ 需要优雅处理
原因四:可以并行
- 多个独立查询
- 可以同时执行
→ 可以使用异步
3.4 实例:文档搜索节点
from langgraph.types import Command, RetryPolicy
def search_documentation(state: EmailState) -> Command:
"""
数据节点:搜索知识库
输入:分类结果中的主题
输出:相关文档列表
"""
# 构建查询
classification = state['classification']
query = f"{classification['intent']} {classification['topic']}"
try:
# 调用搜索 API
results = knowledge_base.search(
query=query,
top_k=5,
min_score=0.7
)
# 存储原始结果(不格式化)
search_results = [doc.content for doc in results]
except NetworkError as e:
# 网络错误:记录并继续
logger.warning(f"搜索网络错误: {e}")
search_results = ["搜索暂时不可用,请稍后重试"]
except TimeoutError as e:
# 超时:使用缓存
logger.warning(f"搜索超时: {e}")
cached = cache.get(query)
search_results = cached or ["搜索超时,请稍后重试"]
except SearchError as e:
# 其他错误:降级处理
logger.error(f"搜索失败: {e}")
search_results = []
return Command(
update={"search_results": search_results},
goto="draft_response"
)
# 配置重试策略(在图定义时)
graph.add_node(
"search_documentation",
search_documentation,
retry_policy=RetryPolicy(
max_attempts=3, # 最多重试 3 次
initial_interval=1.0, # 初始等待 1 秒
max_interval=10.0, # 最大等待 10 秒
backoff_factor=2.0 # 指数退避因子
)
)
代码解析:
1. 构建查询:
- 从状态获取分类结果
- 组合意图和主题作为查询
2. 调用搜索:
- 设置参数(top_k, min_score)
- 获取搜索结果
3. 存储原始数据:
- 只存储文档内容
- 不格式化(在起草节点格式化)
4. 错误处理:
- NetworkError:记录日志,返回提示
- TimeoutError:尝试使用缓存
- SearchError:降级处理
5. 重试策略:
- 在图定义时配置
- LangGraph 自动重试
3.5 实例:客户历史查询节点
def lookup_customer_history(state: EmailState) -> Command:
"""
数据节点:查询客户历史
输入:客户邮箱
输出:客户历史数据
"""
customer_email = state['sender_email']
try:
# 查询 CRM 系统
customer_data = crm.get_customer(customer_email)
# 存储原始数据
return Command(
update={"customer_history": customer_data},
goto="draft_response"
)
except CustomerNotFoundError:
# 客户不存在:使用默认值
logger.info(f"新客户: {customer_email}")
return Command(
update={"customer_history": {"tier": "standard", "is_new": True}},
goto="draft_response"
)
except CRMAPIError as e:
# API 错误:降级处理
logger.error(f"CRM API 错误: {e}")
return Command(
update={"customer_history": {"tier": "standard", "error": True}},
goto="draft_response"
)
3.6 数据节点最佳实践
1. 存储原始数据
好的做法:
def search_docs(state):
results = search(query)
return {"search_results": results} # 原始数据
def draft(state):
# 在使用时格式化
docs = "\n".join(state['search_results'])
...
不好的做法:
def search_docs(state):
results = search(query)
formatted = "\n".join(results) # 格式化
return {"formatted_docs": formatted}
# 其他节点可能需要不同格式
2. 设置重试策略
好的做法:
graph.add_node(
"search",
search_docs,
retry_policy=RetryPolicy(max_attempts=3)
)
不好的做法:
def search_docs(state):
for i in range(3): # 手动重试
try:
return search(query)
except:
continue
# 重试逻辑应该在图配置中
3. 优雅降级
好的做法:
def search_docs(state):
try:
return search(query)
except:
return [] # 返回空列表,流程继续
不好的做法:
def search_docs(state):
return search(query) # 可能抛出异常
# 异常会中断整个流程
4. 考虑缓存
好的做法:
@cache.memoize(timeout=300) # 缓存 5 分钟
def search_docs(state):
return search(query)
不好的做法:
def search_docs(state):
return search(query) # 每次都查询
# 相同查询重复执行
四、动作节点
4.1 什么是动作节点?
动作节点是执行外部动作的节点,用于:
用途一:发送
- 发送邮件
- 推送通知
- 发送消息
用途二:创建
- 创建工单
- 创建记录
- 创建订单
用途三:更新
- 更新数据库
- 修改状态
- 更新配置
用途四:删除
- 删除记录
- 取消订单
- 清理数据
4.2 动作节点的特点
动作节点特点:
输入:
└── 动作参数(从状态获取)
处理:
├── 执行动作
└── 确认成功
输出:
└── 动作结果(如工单 ID)
错误处理:
├── 重试:网络问题
├── 补偿:失败时执行补偿动作
└── 告警:关键动作失败时告警
注意:
├── 有副作用:会改变外部状态
├── 不缓存:每次动作都是唯一的
├── 幂等性:相同参数多次执行结果相同
└── 需要确认:确保动作成功
4.3 为什么动作节点需要特殊处理?
原因一:有副作用
- 会改变外部状态
- 不能随意重试
→ 需要幂等性
原因二:不能缓存
- 每次动作都是唯一的
- 缓存会导致重复执行
→ 不应该缓存
原因三:失败影响大
- 发送失败:客户收不到回复
- 创建失败:数据丢失
→ 需要补偿机制
原因四:需要确认
- 不能假设成功
- 需要确认动作完成
→ 需要检查结果
4.4 什么是幂等性?
幂等性:相同参数多次执行,结果相同
示例一:发送邮件(非幂等)
send_email(to="a@example.com", body="Hello")
第一次:发送成功
第二次:又发送一封(重复了!)
→ 不是幂等的
示例二:发送邮件(幂等)
send_email_with_id(id="msg-001", to="a@example.com", body="Hello")
第一次:发送成功
第二次:发现 id 已存在,跳过
→ 是幂等的
示例三:创建工单(非幂等)
create_bug(title="Bug")
第一次:创建工单 #123
第二次:创建工单 #124(重复了!)
→ 不是幂等的
示例四:创建工单(幂等)
create_bug_if_not_exists(email_id="email-001", title="Bug")
第一次:创建工单 #123
第二次:发现已存在,返回 #123
→ 是幂等的
4.5 实例:发送邮件节点
def send_reply(state: EmailState) -> dict:
"""
动作节点:发送邮件回复
输入:收件人、回复内容
输出:发送结果
"""
try:
# 发送邮件(幂等操作)
result = email_service.send(
message_id=state['email_id'], # 使用邮件 ID 作为幂等键
to=state['sender_email'],
subject=f"Re: {state.get('email_subject', '您的咨询')}",
body=state['response_text']
)
# 记录发送成功
logger.info(f"邮件已发送: {state['email_id']} -> {state['sender_email']}")
return {"sent": True, "message_id": result.id}
except EmailSendError as e:
# 发送失败:记录并抛出
logger.error(f"邮件发送失败: {state['email_id']}, 错误: {e}")
# 发送失败通知
alert_service.notify(
level="error",
message=f"邮件发送失败: {state['email_id']}",
details=str(e)
)
raise # 让 LangGraph 处理错误(可能重试)
# 配置重试策略
graph.add_node(
"send_reply",
send_reply,
retry_policy=RetryPolicy(max_attempts=3)
)
4.6 实例:Bug 跟踪节点
def bug_tracking(state: EmailState) -> Command:
"""
动作节点:创建 Bug 工单
输入:邮件内容、分类结果
输出:工单 ID
"""
try:
# 创建 Bug 工单(幂等操作)
ticket = bug_tracker.create_if_not_exists(
external_id=state['email_id'], # 幂等键
title=f"Bug: {state['classification']['topic']}",
description=state['email_content'],
priority=map_priority(state['classification']['urgency']),
reporter=state['sender_email'],
labels=['auto-created', 'from-email']
)
logger.info(f"Bug 工单已创建: {ticket.id}")
return Command(
update={"ticket_id": ticket.id},
goto="draft_response"
)
except BugTrackerError as e:
# 创建失败:记录并继续
logger.error(f"Bug 工单创建失败: {e}")
# 返回失败标记,流程继续
return Command(
update={"ticket_id": "FAILED", "bug_error": str(e)},
goto="draft_response"
)
4.7 动作节点最佳实践
1. 确保幂等性
好的做法:
def send_email(state):
return email_service.send(
message_id=state['email_id'], # 幂等键
...
)
不好的做法:
def send_email(state):
return email_service.send(...) # 没有幂等键
# 重试时会重复发送
2. 不缓存结果
好的做法:
def send_email(state):
return email_service.send(...) # 每次都执行
不好的做法:
@cache.memoize()
def send_email(state):
return email_service.send(...)
# 缓存会导致不执行
3. 记录日志
好的做法:
def send_email(state):
logger.info(f"发送邮件: {state['email_id']}")
result = email_service.send(...)
logger.info(f"发送成功: {result.id}")
return result
不好的做法:
def send_email(state):
return email_service.send(...)
# 没有日志,难以调试
4. 考虑补偿
好的做法:
def send_email(state):
try:
return email_service.send(...)
except:
# 补偿:发送失败通知
alert_service.notify("邮件发送失败")
raise
不好的做法:
def send_email(state):
return email_service.send(...) # 失败时没有补偿
五、用户节点
5.1 什么是用户节点?
用户节点是等待人工输入的节点,用于:
用途一:审核
- 审核 AI 生成的结果
- 确认是否正确
- 修改或拒绝
用途二:确认
- 确认是否执行某个动作
- 批准或拒绝
- 选择选项
用途三:提供信息
- 提供 AI 缺少的信息
- 回答问题
- 做出选择
用途四:干预
- 修正 AI 的错误
- 提供额外指导
- 接管处理
5.2 用户节点的特点
用户节点特点:
输入:
└── 展示给用户的信息
处理:
├── 使用 interrupt() 暂停
├── 等待用户输入
└── 恢复执行
输出:
└── 用户输入的数据
注意:
├── 可无限期等待
├── 需要配置检查点
├── 恢复时传入用户输入
└── 可能需要超时处理
5.3 为什么用户节点需要特殊处理?
原因一:可能等待很长时间
- 用户可能不在线
- 可能几小时后才响应
→ 需要检查点持久化
原因二:需要恢复执行
- 用户响应后继续执行
- 需要记住之前的状态
→ 需要检查点
原因三:可能取消
- 用户可能取消操作
- 需要处理取消逻辑
→ 需要处理取消
原因四:需要上下文
- 用户需要看到相关信息
- 才能做出正确决策
→ 需要提供足够信息
5.4 什么是 interrupt()?
interrupt() 是 LangGraph 提供的函数,用于暂停执行:
工作流程:
1. 调用 interrupt(data)
2. LangGraph 暂停执行
3. 保存当前状态到检查点
4. 返回 data 给调用者
5. 等待用户输入
6. 用户输入后,恢复执行
7. interrupt() 返回用户输入
示例:
def human_review(state):
# 暂停,等待用户审核
user_input = interrupt({
"draft": state['draft'],
"message": "请审核"
})
# 用户输入后继续
if user_input['approved']:
return {"approved": True}
else:
return {"approved": False}
5.5 实例:人工审核节点
from langgraph.types import interrupt, Command
def human_review(state: EmailState) -> Command:
"""
用户节点:等待人工审核
展示:原始邮件、草稿回复、分类信息
等待:审核结果 + 可选修改
"""
# 暂停执行,等待用户输入
user_input = interrupt({
"email": state['email_content'],
"sender": state['sender_email'],
"draft": state['response_text'],
"classification": state['classification'],
"message": "请审核回复内容",
"actions": ["approve", "edit", "reject"]
})
# 用户输入后继续执行
action = user_input.get('action', 'approve')
if action == 'approve':
# 审核通过
return Command(
update={"approved": True},
goto="send_reply"
)
elif action == 'edit':
# 修改后通过
edited_response = user_input.get('edited_response', state['response_text'])
return Command(
update={
"response_text": edited_response,
"approved": True
},
goto="send_reply"
)
else: # reject
# 审核不通过,重新起草
feedback = user_input.get('feedback', '')
return Command(
update={
"approved": False,
"feedback": feedback
},
goto="draft_response"
)
使用示例:
# 编译图(需要检查点)
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
# 第一次调用(会暂停)
result = graph.invoke(
{"email_id": "email-001"},
config={"configurable": {"thread_id": "thread-001"}}
)
# 返回:{"__interrupt__": {...}}
# 用户审核后,恢复执行
result = graph.invoke(
{"action": "approve"}, # 用户输入
config={"configurable": {"thread_id": "thread-001"}}
)
# 继续执行,发送邮件
5.6 实例:请求缺失信息节点
def request_missing_info(state: EmailState) -> Command:
"""
用户节点:请求缺失的信息
场景:需要客户 ID 但邮件中没有
"""
# 检查是否有客户 ID
if not state.get('customer_id'):
# 暂停,请求客户 ID
user_input = interrupt({
"message": "需要客户 ID 才能查询历史",
"request": "请提供客户的账号 ID",
"email": state['sender_email']
})
# 用户输入后继续
return Command(
update={"customer_id": user_input['customer_id']},
goto="lookup_customer_history" # 重新查询
)
# 已有客户 ID,继续
return Command(goto="lookup_customer_history")
5.7 用户节点最佳实践
1. 提供足够上下文
好的做法:
user_input = interrupt({
"email": state['email'],
"draft": state['draft'],
"classification": state['classification'],
"message": "请审核"
})
不好的做法:
user_input = interrupt({
"draft": state['draft']
})
# 用户看不到原始邮件,难以判断
2. 支持多种操作
好的做法:
if user_input['action'] == 'approve':
...
elif user_input['action'] == 'edit':
...
elif user_input['action'] == 'reject':
...
不好的做法:
if user_input['approved']:
...
else:
...
# 只支持通过/拒绝,不支持修改
3. 配置检查点
好的做法:
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
不好的做法:
graph = builder.compile() # 没有检查点
# interrupt() 会失败
4. 设置超时提醒
好的做法:
def human_review(state):
# 设置超时
user_input = interrupt({
"message": "请审核",
"timeout": 3600 # 1 小时超时
})
if user_input.get('timeout'):
# 超时处理
return Command(goto="auto_approve")
...
不好的做法:
def human_review(state):
user_input = interrupt({"message": "请审核"})
# 没有超时处理,可能永远等待
六、节点类型决策树
如何判断一个节点属于哪种类型?
这个节点做什么?
├─ 是否调用大模型?
│ ├─ 是 → LLM 节点
│ └─ 否 → 继续
│
├─ 是否等待用户输入?
│ ├─ 是 → 用户节点
│ └─ 否 → 继续
│
├─ 是否修改外部状态?
│ ├─ 是 → 动作节点
│ └─ 否 → 继续
│
├─ 是否查询外部数据?
│ ├─ 是 → 数据节点
│ └─ 否 → 纯计算节点
│
└─ 纯计算节点
- 不需要外部交互
- 纯数据处理
- 示例:格式转换、计算
邮件智能体节点分类
| 节点 | 类型 | 判断依据 |
|---|---|---|
| read_email | 数据节点 | 从邮箱读取,只读操作 |
| classify_intent | LLM 节点 | 使用 LLM 分类 |
| search_documentation | 数据节点 | 搜索知识库,只读操作 |
| bug_tracking | 动作节点 | 创建工单,有副作用 |
| draft_response | LLM 节点 | 使用 LLM 生成 |
| human_review | 用户节点 | 等待人工审核 |
| send_reply | 动作节点 | 发送邮件,有副作用 |
七、本章小结
7.1 四种节点类型
| 类型 | 用途 | 特点 | 最佳实践 |
|---|---|---|---|
| LLM 节点 | 理解、生成、推理 | 使用结构化输出 | 分离静态/动态上下文 |
| 数据节点 | 查询、检索 | 可缓存、需重试 | 存储原始数据 |
| 动作节点 | 发送、创建、更新 | 有副作用、不缓存 | 确保幂等性 |
| 用户节点 | 审核、确认、提供信息 | 使用 interrupt() | 提供足够上下文 |
7.2 类型选择指南
问题 1:是否调用大模型?
├─ 是 → LLM 节点
└─ 否 → 问题 2
问题 2:是否等待用户输入?
├─ 是 → 用户节点
└─ 否 → 问题 3
问题 3:是否修改外部状态?
├─ 是 → 动作节点
└─ 否 → 数据节点
7.3 常见错误
错误一:LLM 节点不使用结构化输出
→ 结果:解析失败,流程中断
错误二:数据节点格式化数据
→ 结果:其他节点无法使用原始数据
错误三:动作节点不保证幂等性
→ 结果:重试时重复执行
错误四:用户节点不配置检查点
→ 结果:interrupt() 失败
7.4 下一章预告
下一章,我们将深入讲解 Step 3:设计状态。
我们会学习:
- 状态应该包含什么数据
- 状态设计原则
- 如何存储原始数据
- 状态更新机制
八、思考题
思考题一:翻译节点
如果要增加一个"翻译回复"节点(将回复翻译成客户语言),应该是什么类型?
提示:
- 是否需要调用大模型?
- 是否需要查询数据?
- 是否有副作用?
思考题二:节点类型区别
数据节点和动作节点有什么本质区别?
提示:
- 是否有副作用?
- 是否可以缓存?
- 是否需要幂等性?
思考题三:检查点的作用
用户节点为什么需要检查点?如果没有检查点会怎样?
提示:
- 用户可能很久才响应
- 如何记住之前的状态?
- 如何恢复执行?
系列导航
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)