LangGraph 思维模式教程(五):实现节点
本系列教程将带你深入理解 LangGraph 的思维方式,学会用 LangGraph 的方式构建智能体。
本章主题:Step 4 - 为每个节点编写函数 阅读时间:约 25 分钟
开篇:节点是什么?
在 LangGraph 中,节点就是一个普通的 Python 函数。
def my_node(state: MyState) -> dict:
# 做一些工作
result = do_something(state)
# 返回状态更新
return {"result": result}
就这么简单! 但有几个要点:
- 接收状态:函数的第一个参数是当前状态
- 返回更新:返回一个字典,LangGraph 自动合并到状态
- 单一职责:每个节点只做一件事
为什么节点设计得这么简单?
传统框架:
- 需要继承特定类
- 需要实现特定接口
- 学习成本高
- 代码耦合度高
LangGraph:
- 就是普通函数
- 不需要继承任何类
- 学习成本低
- 代码可复用性高
小贴士:你可以把节点函数当作普通的 Python 函数来测试,不需要启动整个图。
一、节点的基本结构
1.1 简单节点
最简单的节点:只读取状态,返回更新
def read_email(state: EmailState) -> dict:
"""读取邮件节点"""
# 第一步:从状态获取输入
email_id = state['email_id']
# 第二步:执行工作
email = email_service.get_email(email_id)
# 第三步:返回更新
return {
'email_content': email.content,
'sender_email': email.sender
}
执行流程:
1. LangGraph 调用 read_email(state)
2. 函数执行,返回 {"email_content": ..., "sender_email": ...}
3. LangGraph 自动合并到状态:
state["email_content"] = ...
state["sender_email"] = ...
4. 其他字段保持不变
1.2 带条件跳转的节点
有时候,节点需要决定下一步去哪。这时使用 Command:
from langgraph.types import Command
def classify_intent(state: EmailState) -> Command:
"""分类意图节点"""
# 执行工作
classification = llm.classify(state['email_content'])
# 决定下一步
if classification['urgency'] == 'critical':
next_node = 'human_review' # 紧急,需要人工审核
else:
next_node = 'draft_response' # 不紧急,直接起草
# 返回更新 + 跳转
return Command(
update={'classification': classification},
goto=next_node
)
Command 是什么?
Command 是 LangGraph 的特殊返回类型,包含两部分:
1. update:状态更新(字典)
2. goto:下一个节点(字符串)
等价于:
- 先更新状态
- 然后跳转到指定节点
什么时候用 Command?
需要根据结果决定下一步时:
场景一:分类后不同处理
if classification == 'bug':
goto = 'bug_tracking'
else:
goto = 'draft_response'
场景二:检查后不同处理
if not state.get('email_content'):
goto = 'read_email' # 回去读取
else:
goto = 'classify'
场景三:错误后不同处理
if error:
goto = 'error_handler'
else:
goto = 'next_step'
1.3 异步节点
如果节点需要调用异步 API(如异步 HTTP 请求),使用 async def:
async def search_documentation(state: EmailState) -> dict:
"""异步搜索文档"""
# 异步调用
results = await knowledge_base.async_search(
state['classification']['topic']
)
return {'search_results': results}
什么时候用异步?
需要异步的场景:
- 调用异步 HTTP API
- 访问异步数据库
- 并行执行多个 I/O 操作
不需要异步的场景:
- 纯计算
- 调用同步 API
- 简单的状态更新
混合使用:
# 可以在同一个图中混合使用同步和异步节点
builder.add_node("sync_node", sync_node) # 同步
builder.add_node("async_node", async_node) # 异步
# LangGraph 会自动处理
二、实现邮件智能体的节点
现在,让我们逐个实现邮件智能体的七个节点。
2.1 节点一:读取邮件
def read_email(state: EmailState) -> dict:
"""
读取邮件节点
类型:数据节点
输入:email_id
输出:email_content, sender_email
"""
email_id = state['email_id']
try:
# 调用邮件服务
email = email_service.get_email(email_id)
# 返回邮件内容
return {
'email_content': email.content,
'sender_email': email.sender,
'email_subject': email.subject
}
except EmailNotFoundError:
# 邮件不存在,记录错误
logger.error(f"邮件 {email_id} 不存在")
return {
'error': f'邮件 {email_id} 不存在'
}
设计要点:
1. 输入明确:只需要 email_id
2. 输出完整:返回邮件的所有必要信息
3. 错误处理:邮件不存在时优雅处理
4. 日志记录:出错时记录日志
2.2 节点二:分类意图
这是第一个 LLM 节点,也是最重要的节点之一。
from langchain_openai import ChatOpenAI
from langgraph.types import Command
from pydantic import BaseModel
# 初始化 LLM
llm = ChatOpenAI(model='gpt-4')
# 定义输出结构
class Classification(BaseModel):
intent: str # 意图类型
urgency: str # 紧急程度
topic: str # 主题
summary: str # 摘要
def classify_intent(state: EmailState) -> Command:
"""
分类意图节点
类型:LLM 节点
输入:email_content
输出:classification
"""
# 使用结构化输出,确保 LLM 返回正确格式
structured_llm = llm.with_structured_output(Classification)
# 格式化提示词
prompt = f"""
分析这封客户邮件并分类。
邮件内容:
{state['email_content']}
发送者:
{state['sender_email']}
分类类别:
- question:产品使用问题
- bug:Bug 报告
- billing:账单相关问题
- feature:功能请求
- complex:复杂技术问题
紧急程度:
- critical:需要立即处理
- high:24小时内处理
- medium:3天内处理
- low:一周内处理
"""
# 调用 LLM
classification = structured_llm.invoke(prompt)
# 根据分类决定下一步
if classification.intent == 'billing' or classification.urgency == 'critical':
# 账单问题或紧急问题,需要人工审核
next_node = 'human_review'
elif classification.intent in ['question', 'feature']:
# 问题或功能请求,搜索文档
next_node = 'search_documentation'
elif classification.intent == 'bug':
# Bug 报告,创建工单
next_node = 'bug_tracking'
else:
# 复杂问题,直接起草
next_node = 'draft_response'
return Command(
update={'classification': classification.model_dump()},
goto=next_node
)
设计要点:
1. 结构化输出:
- 使用 Pydantic 定义输出结构
- 确保 LLM 返回正确格式
- 避免解析错误
2. 提示词设计:
- 清晰的任务描述
- 明确的分类标准
- 具体的例子
3. 条件跳转:
- 根据分类结果决定下一步
- 不同类型走不同路径
- 紧急问题需要人工审核
为什么使用结构化输出?
不使用结构化输出:
response = llm.invoke(prompt)
# 返回字符串,需要手动解析
# 可能格式不对
# 解析可能失败
使用结构化输出:
structured_llm = llm.with_structured_output(Classification)
response = structured_llm.invoke(prompt)
# 返回 Pydantic 对象
# 格式保证正确
# 类型安全
2.3 节点三:搜索文档
def search_documentation(state: EmailState) -> Command:
"""
搜索文档节点
类型:数据节点
输入:classification
输出:search_results
"""
classification = state['classification']
# 构建查询
query = f"{classification['intent']} {classification['topic']}"
try:
# 搜索知识库
results = knowledge_base.search(
query=query,
top_k=5, # 返回前 5 个结果
min_score=0.6 # 最低相似度 0.6
)
# 提取文档内容
search_results = [doc.content for doc in results]
logger.info(f"搜索 '{query}' 找到 {len(search_results)} 个结果")
except SearchError as e:
# 搜索失败,记录错误并继续
logger.warning(f"搜索失败: {e}")
search_results = ['搜索暂时不可用,请稍后重试']
return Command(
update={'search_results': search_results},
goto='draft_response'
)
设计要点:
1. 查询构建:
- 使用分类结果构建查询
- 意图 + 主题 = 更准确的查询
2. 参数调优:
- top_k:返回结果数量
- min_score:最低相似度
- 根据实际效果调整
3. 错误处理:
- 搜索失败不影响流程
- 提供友好的错误信息
- 继续执行下一步
2.4 节点四:Bug 跟踪
def bug_tracking(state: EmailState) -> Command:
"""
Bug 跟踪节点
类型:动作节点
输入:email_content, classification
输出:ticket_id
"""
try:
# 创建 Bug 工单
ticket = bug_tracker.create(
title=f"Bug: {state['classification']['topic']}",
description=state['email_content'],
priority=state['classification']['urgency'],
reporter=state['sender_email']
)
logger.info(f"创建 Bug 工单: {ticket.id}")
return Command(
update={'ticket_id': ticket.id},
goto='draft_response'
)
except BugTrackerError as e:
logger.error(f"创建 Bug 工单失败: {e}")
# 即使失败也继续,只是没有工单号
return Command(
update={'ticket_id': 'FAILED'},
goto='draft_response'
)
设计要点:
1. 动作节点特点:
- 有副作用(创建工单)
- 需要错误处理
- 失败后如何处理
2. 错误处理策略:
- 记录错误日志
- 设置失败标记
- 继续执行(不中断流程)
3. 为什么不中断?
- 工单创建失败不影响回复
- 可以在回复中说明
- 人工可以后续处理
2.5 节点五:起草回复
这是第二个 LLM 节点,负责生成回复内容。
def draft_response(state: EmailState) -> Command:
"""
起草回复节点
类型:LLM 节点
输入:email_content, classification, search_results
输出:response_text
"""
classification = state['classification']
# 构建上下文
context_parts = []
# 添加搜索结果
if state.get('search_results'):
docs = "\n".join([f"- {doc}" for doc in state['search_results']])
context_parts.append(f"相关文档:\n{docs}")
# 添加工单号
if state.get('ticket_id'):
context_parts.append(f"Bug 工单号:{state['ticket_id']}")
# 添加审核反馈(如果有)
if state.get('feedback'):
context_parts.append(f"审核反馈:{state['feedback']}")
# 格式化提示词
prompt = f"""
为这封客户邮件起草回复。
原始邮件:
{state['email_content']}
分类信息:
- 意图:{classification['intent']}
- 紧急程度:{classification['urgency']}
- 主题:{classification['topic']}
{chr(10).join(context_parts)}
回复指南:
- 专业、友好、有帮助
- 直接回答客户问题
- 如有相关文档,引用具体内容
- 如是 Bug,告知工单号和后续跟进
- 语言简洁,避免过于冗长
"""
# 调用 LLM
response = llm.invoke(prompt)
# 决定是否需要人工审核
needs_review = (
classification['urgency'] in ['high', 'critical'] or
classification['intent'] == 'complex'
)
next_node = 'human_review' if needs_review else 'send_reply'
return Command(
update={'response_text': response.content},
goto=next_node
)
设计要点:
1. 上下文构建:
- 检查可选字段是否存在
- 按需添加上下文
- 格式化清晰
2. 提示词设计:
- 包含所有相关信息
- 明确回复要求
- 提供回复指南
3. 审核决策:
- 紧急问题需要审核
- 复杂问题需要审核
- 其他直接发送
2.6 节点六:人工审核
这是用户节点,使用 interrupt() 暂停执行。
from langgraph.types import interrupt
def human_review(state: EmailState) -> Command:
"""
人工审核节点
类型:用户节点
输入:email_content, response_text, classification
输出:approved, edited_response, feedback
"""
# 暂停执行,等待用户输入
user_input = interrupt({
'email': state['email_content'],
'draft': state['response_text'],
'classification': state['classification'],
'message': '请审核回复内容'
})
# 用户响应后继续
if user_input.get('approved'):
# 审核通过
final_response = user_input.get('edited_response', state['response_text'])
return Command(
update={
'response_text': final_response,
'approved': True
},
goto='send_reply'
)
else:
# 审核不通过,重新起草
return Command(
update={
'approved': False,
'feedback': user_input.get('feedback', '')
},
goto='draft_response'
)
interrupt() 是什么?
interrupt() 是 LangGraph 的特殊函数:
1. 调用时:
- 图执行暂停
- 返回传入的数据给调用者
- 等待用户输入
2. 用户响应后:
- 图从暂停点继续
- user_input 接收用户数据
- 继续执行后续代码
3. 使用场景:
- 需要人工确认
- 需要人工输入
- 需要人工选择
工作流程:
1. 图执行到 human_review 节点
2. interrupt() 被调用
3. 图暂停,返回数据给用户
4. 用户审核,提供输入
5. 图恢复,user_input 接收数据
6. 根据用户输入决定下一步
2.7 节点七:发送回复
def send_reply(state: EmailState) -> dict:
"""
发送回复节点
类型:动作节点
输入:sender_email, response_text
输出:无
"""
try:
# 发送邮件
result = email_service.send(
to=state['sender_email'],
subject=f"Re: {state.get('email_subject', '您的咨询')}",
body=state['response_text']
)
logger.info(f"邮件已发送给 {state['sender_email']}")
return {'sent': True, 'message_id': result.id}
except EmailSendError as e:
logger.error(f"邮件发送失败: {e}")
raise # 让 LangGraph 处理错误
设计要点:
1. 动作节点:
- 发送邮件(有副作用)
- 失败时抛出异常
2. 错误处理:
- 记录错误日志
- 抛出异常让 LangGraph 处理
- 可以配置重试策略
3. 为什么这里抛出异常?
- 发送失败是严重错误
- 需要重试或人工介入
- 不应该静默失败
三、节点实现最佳实践
3.1 添加文档字符串
每个节点都应该有清晰的文档字符串:
def search_documentation(state: EmailState) -> Command:
"""
搜索文档节点
类型:数据节点
输入:state["classification"]["topic"]
输出:state["search_results"]
从知识库搜索相关文档,为起草回复提供上下文。
错误处理:
- 搜索失败时返回错误提示
- 不中断流程
"""
...
文档字符串应该包含:
- 节点类型
- 输入字段
- 输出字段
- 功能描述
- 错误处理策略
3.2 处理边界情况
节点应该处理各种边界情况:
def draft_response(state: EmailState) -> Command:
# 检查必需字段
if not state.get('classification'):
logger.warning("缺少分类信息,返回分类节点")
return Command(goto='classify_intent')
if not state.get('email_content'):
logger.warning("缺少邮件内容,返回读取节点")
return Command(goto='read_email')
# 处理可选字段
search_results = state.get('search_results', [])
ticket_id = state.get('ticket_id')
feedback = state.get('feedback')
...
常见的边界情况:
- 必需字段缺失
- 可选字段缺失
- 数据格式不对
- 空值处理
3.3 记录日志
良好的日志记录有助于调试和监控:
import logging
logger = logging.getLogger(__name__)
def bug_tracking(state: EmailState) -> Command:
logger.info(f"开始创建 Bug 工单")
logger.debug(f"邮件主题: {state['classification']['topic']}")
try:
ticket = bug_tracker.create(...)
logger.info(f"Bug 工单创建成功: {ticket.id}")
...
except Exception as e:
logger.error(f"Bug 工单创建失败: {e}")
...
日志级别:
DEBUG:详细调试信息INFO:正常操作信息WARNING:警告信息ERROR:错误信息
3.4 使用类型提示
类型提示让代码更清晰,IDE 支持更好:
from typing import Optional, TypedDict
def search_documentation(
state: EmailState
) -> Command[TypedDict, str]:
"""
类型提示说明:
- state: EmailState - 输入状态类型
- 返回 Command[dict, str] - 更新类型和跳转目标类型
"""
...
四、测试节点
4.1 单元测试
每个节点可以单独测试,这是 LangGraph 的一大优势:
def test_classify_intent():
"""测试分类节点"""
# 构造测试状态
state = {
'email_content': '如何重置密码?',
'sender_email': 'test@example.com'
}
# 调用节点
result = classify_intent(state)
# 验证结果
assert result.update['classification']['intent'] == 'question'
assert result.goto in ['search_documentation', 'human_review', 'draft_response']
def test_search_documentation():
"""测试搜索节点"""
state = {
'classification': {
'intent': 'question',
'topic': '密码重置'
}
}
result = search_documentation(state)
assert 'search_results' in result.update
assert result.goto == 'draft_response'
4.2 Mock 外部依赖
测试时需要 Mock 外部依赖:
from unittest.mock import patch, Mock
def test_search_documentation_with_mock():
state = {
'classification': {
'intent': 'question',
'topic': '密码重置'
}
}
# Mock 知识库搜索
with patch('knowledge_base.search') as mock_search:
# 设置返回值
mock_search.return_value = [
Mock(content='重置密码步骤...'),
Mock(content='密码安全要求...')
]
# 调用节点
result = search_documentation(state)
# 验证
assert len(result.update['search_results']) == 2
assert result.goto == 'draft_response'
4.3 测试 LLM 节点
测试 LLM 节点需要 Mock LLM:
def test_classify_intent_with_mock_llm():
state = {
'email_content': '我发现了 Bug',
'sender_email': 'test@example.com'
}
# Mock LLM
with patch('llm.with_structured_output') as mock_structured:
mock_llm = Mock()
mock_llm.invoke.return_value = Classification(
intent='bug',
urgency='high',
topic='Bug 报告',
summary='用户报告了 Bug'
)
mock_structured.return_value = mock_llm
# 调用节点
result = classify_intent(state)
# 验证
assert result.update['classification']['intent'] == 'bug'
assert result.goto == 'bug_tracking'
五、本章小结
5.1 节点实现要点
-
基本结构:
- 接收状态作为参数
- 执行工作
- 返回更新(字典或 Command)
-
使用 Command:
- 返回更新 + 跳转目标
- 控制流程走向
- 实现条件分支
-
错误处理:
- 捕获预期错误
- 记录日志
- 优雅降级或抛出异常
-
最佳实践:
- 添加文档字符串
- 处理边界情况
- 使用类型提示
- 编写单元测试
5.2 七个节点总结
| 节点 | 类型 | 核心操作 | 关键技术 |
|---|---|---|---|
| read_email | 数据节点 | 读取邮件 | 错误处理 |
| classify_intent | LLM 节点 | LLM 分类 | 结构化输出、Command |
| search_documentation | 数据节点 | 搜索知识库 | 错误处理 |
| bug_tracking | 动作节点 | 创建工单 | 副作用处理 |
| draft_response | LLM 节点 | LLM 生成回复 | 上下文构建 |
| human_review | 用户节点 | 等待审核 | interrupt |
| send_reply | 动作节点 | 发送邮件 | 异常抛出 |
5.3 下一章预告
下一章,我们将深入讲解 Step 5:连接节点与错误处理。
我们会学习:
- 如何组装图
- 如何定义边和条件跳转
- 错误处理策略
六、思考题
-
如果要增加一个"翻译回复"节点,应该怎么实现?提示:考虑放在哪个位置,需要什么输入输出。
-
人工审核节点为什么使用
interrupt()?如果不使用会怎样?提示:考虑执行流程和用户体验。 -
如何测试一个包含 LLM 调用的节点?提示:考虑 Mock LLM 的返回值。
系列导航
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)