本系列教程将带你深入理解 LangGraph 的思维方式,学会用 LangGraph 的方式构建智能体。

本章主题:Step 2 - 识别四种节点类型及其特点 阅读时间:约 20 分钟


开篇:为什么需要分类节点?

不同节点有不同特点

在上一章,我们把邮件处理流程拆解成了 7 个节点。但这些节点并不都是一样的:

节点差异:

read_email:
- 从邮箱读取邮件
- 只读操作,没有副作用
- 可以缓存

classify_intent:
- 使用 LLM 分析邮件
- 调用外部 API
- 可能需要重试

search_documentation:
- 搜索知识库
- 只读操作
- 可以缓存

bug_tracking:
- 创建 Bug 工单
- 有副作用(创建记录)
- 不能缓存

draft_response:
- 使用 LLM 生成回复
- 调用外部 API
- 可能需要重试

human_review:
- 等待人工审核
- 可能等待很长时间
- 需要检查点

send_reply:
- 发送邮件
- 有副作用(发送邮件)
- 不能缓存

分类的好处

理解这些差异,是构建高质量智能体的关键:

好处一:正确的错误处理
- LLM 节点:处理 API 限流、解析错误
- 数据节点:处理网络超时、数据不存在
- 动作节点:处理执行失败、需要补偿
- 用户节点:处理超时、取消

好处二:正确的优化策略
- LLM 节点:可以缓存结果
- 数据节点:可以缓存、并行查询
- 动作节点:不能缓存,需要幂等性
- 用户节点:需要检查点

好处三:正确的测试策略
- LLM 节点:Mock LLM 响应
- 数据节点:Mock 数据源
- 动作节点:Mock 外部服务
- 用户节点:模拟用户输入

好处四:正确的监控策略
- LLM 节点:监控 token 使用、响应时间
- 数据节点:监控查询时间、缓存命中率
- 动作节点:监控成功率、失败原因
- 用户节点:监控等待时间、响应率

一、四种节点类型概览

LangGraph 中的节点可以分为四种类型:

┌─────────────────────────────────────────────────────────────┐
│                    四种节点类型                              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  1. LLM 节点                                                 │
│     调用大语言模型                                           │
│     示例:分类意图、起草回复、生成内容                        │
│     特点:可能失败、可缓存、需要重试                          │
│                                                             │
│  2. 数据节点                                                 │
│     查询外部数据源                                           │
│     示例:搜索文档、查询数据库、调用 API                      │
│     特点:只读、可缓存、需要重试                              │
│                                                             │
│  3. 动作节点                                                 │
│     执行外部动作                                             │
│     示例:发送邮件、创建工单、写入数据库                      │
│     特点:有副作用、不缓存、需要幂等性                        │
│                                                             │
│  4. 用户节点                                                 │
│     等待人工输入                                             │
│     示例:人工审核、确认操作、提供信息                        │
│     特点:可长时间等待、需要检查点                            │
│                                                             │
└─────────────────────────────────────────────────────────────┘

1.1 类型对比

类型 主要操作 是否有副作用 是否需要重试 是否可缓存 是否需要检查点
LLM 节点 调用模型 可缓存结果
数据节点 查询数据 可缓存结果
动作节点 执行动作 不缓存
用户节点 等待输入 不缓存

1.2 类型选择指南

如何选择节点类型?

问题 1:这个节点是否调用大模型?
├─ 是 → LLM 节点
└─ 否 → 继续问题 2

问题 2:这个节点是否等待用户输入?
├─ 是 → 用户节点
└─ 否 → 继续问题 3

问题 3:这个节点是否修改外部状态?
├─ 是 → 动作节点
└─ 否 → 数据节点

二、LLM 节点

2.1 什么是 LLM 节点?

LLM 节点是调用大语言模型的节点,用于:

用途一:理解
- 分析文本内容
- 提取关键信息
- 理解用户意图

用途二:生成
- 生成回复内容
- 撰写邮件、文档
- 翻译、改写

用途三:推理
- 做决策判断
- 分类归类
- 问题诊断

2.2 LLM 节点的特点

LLM 节点特点:

输入:
├── 静态上下文:提示词模板、指令、示例
└── 动态上下文:从状态获取的数据

处理:
├── 格式化提示词
├── 调用 LLM
└── 解析响应

输出:
└── 结构化的结果(存入状态)

错误处理:
├── 重试:网络问题、限流
├── 回退:解析失败时重新调用
└── 降级:使用默认值

优化:
├── 缓存:相同提示词可以缓存
├── 流式:支持流式输出
└── 批量:多个请求可以批量处理

2.3 为什么 LLM 节点需要特殊处理?

原因一:可能失败
- 网络问题
- API 限流
- 模型过载
→ 需要重试策略

原因二:输出不确定
- 相同输入可能不同输出
- 可能不符合预期格式
→ 需要结构化输出

原因三:成本较高
- Token 消耗
- 延迟较高
→ 需要缓存和优化

原因四:可以做决策
- LLM 可以根据内容做判断
- 可以决定下一步去哪
→ 可以使用 Command

2.4 实例:分类意图节点

from langchain_openai import ChatOpenAI
from langgraph.types import Command
from pydantic import BaseModel

llm = ChatOpenAI(model="gpt-4")

def classify_intent(state: EmailState) -> Command:
    """
    LLM 节点:分类邮件意图

    静态上下文:分类类别定义
    动态上下文:邮件内容
    输出:分类结果 + 跳转决策
    """

    # 定义输出结构(使用 Pydantic)
    class Classification(BaseModel):
        intent: str      # question, bug, billing, feature, complex
        urgency: str     # low, medium, high, critical
        topic: str       # 主题
        summary: str     # 摘要

    # 使用结构化输出(确保格式正确)
    structured_llm = llm.with_structured_output(Classification)

    # 格式化提示词(动态 + 静态)
    prompt = f"""
    分析这封客户邮件并分类。

    邮件内容:
    {state['email_content']}

    发送者:
    {state['sender_email']}

    分类类别:
    - question:产品使用问题
    - bug:Bug 报告
    - billing:账单相关问题
    - feature:功能请求
    - complex:复杂技术问题

    紧急程度:
    - critical:需要立即处理
    - high:24小时内处理
    - medium:3天内处理
    - low:一周内处理
    """

    # 调用 LLM
    classification = structured_llm.invoke(prompt)

    # 决定下一步(LLM 节点可以做决策)
    if classification.intent == 'billing' or classification.urgency == 'critical':
        next_node = "human_review"
    elif classification.intent in ['question', 'feature']:
        next_node = "search_documentation"
    elif classification.intent == 'bug':
        next_node = "bug_tracking"
    else:
        next_node = "draft_response"

    # 返回更新和跳转
    return Command(
        update={"classification": classification.model_dump()},
        goto=next_node
    )

代码解析

1. 定义输出结构:
   - 使用 Pydantic BaseModel
   - 明确字段类型
   - LLM 会按这个格式输出

2. 使用结构化输出:
   - with_structured_output() 方法
   - 确保输出格式正确
   - 避免解析错误

3. 格式化提示词:
   - 静态部分:分类类别、紧急程度定义
   - 动态部分:邮件内容、发送者

4. 做决策:
   - 根据分类结果决定下一步
   - 使用 Command 指定跳转
   - 同时更新状态

2.5 实例:起草回复节点

def draft_response(state: EmailState) -> Command:
    """
    LLM 节点:起草邮件回复

    静态上下文:回复指南、语气要求
    动态上下文:邮件内容、分类结果、搜索结果
    输出:回复文本 + 是否需要审核
    """

    # 构建上下文(根据可用数据)
    context_parts = []

    if state.get('search_results'):
        docs = "\n".join([f"- {doc}" for doc in state['search_results']])
        context_parts.append(f"相关文档:\n{docs}")

    if state.get('ticket_id'):
        context_parts.append(f"Bug 工单:{state['ticket_id']}")

    # 格式化提示词
    prompt = f"""
    为这封客户邮件起草回复。

    原始邮件:
    {state['email_content']}

    分类信息:
    - 意图:{state['classification']['intent']}
    - 紧急程度:{state['classification']['urgency']}
    - 主题:{state['classification']['topic']}

    {chr(10).join(context_parts)}

    回复指南:
    - 专业、友好、有帮助
    - 直接回答客户问题
    - 如有相关文档,引用具体内容
    - 如是 Bug,告知工单号
    """

    # 调用 LLM
    response = llm.invoke(prompt)

    # 决定是否需要人工审核
    needs_review = (
        state['classification']['urgency'] in ['high', 'critical'] or
        state['classification']['intent'] == 'complex'
    )

    next_node = "human_review" if needs_review else "send_reply"

    return Command(
        update={"response_text": response.content},
        goto=next_node
    )

2.6 LLM 节点最佳实践

1. 使用结构化输出

好的做法:
class Classification(BaseModel):
    intent: str
    urgency: str

structured_llm = llm.with_structured_output(Classification)
result = structured_llm.invoke(prompt)
# result.intent 和 result.urgency 一定存在

不好的做法:
result = llm.invoke(prompt)
# 需要手动解析,可能失败
intent = parse_intent(result.content)  # 可能抛出异常

2. 分离静态和动态上下文

好的做法:
STATIC_PROMPT = """
分类类别:
- question:产品问题
- bug:Bug 报告
"""

def classify(state):
    prompt = f"{STATIC_PROMPT}\n邮件:{state['email']}"
    ...

不好的做法:
def classify(state):
    prompt = """
    分类类别:
    - question:产品问题
    - bug:Bug 报告

    邮件:{state['email']}
    """
    # 静态和动态混在一起,难以维护

3. 在节点内做决策

好的做法:
def classify(state):
    result = llm.invoke(prompt)
    if result.intent == 'bug':
        return Command(goto='bug_tracking')
    else:
        return Command(goto='draft')

不好的做法:
def classify(state):
    result = llm.invoke(prompt)
    return {"intent": result.intent}
# 跳转逻辑在图定义中,不清晰

4. 处理解析错误

好的做法:
def classify(state):
    try:
        result = structured_llm.invoke(prompt)
    except OutputParserException:
        # 解析失败,重试或使用默认值
        result = llm.invoke(prompt + "\n请严格按照格式输出")

    return {"classification": result}

不好的做法:
def classify(state):
    result = structured_llm.invoke(prompt)  # 可能失败
    return {"classification": result}

三、数据节点

3.1 什么是数据节点?

数据节点是从外部数据源获取信息的节点,用于:

用途一:查询
- 搜索文档
- 查询数据库
- 检索知识

用途二:调用
- 调用外部 API
- 获取配置
- 读取文件

用途三:聚合
- 合并多个数据源
- 过滤和排序
- 格式转换

3.2 数据节点的特点

数据节点特点:

输入:
└── 查询参数(从状态构建)

处理:
├── 构建查询
├── 调用数据源
└── 处理响应

输出:
└── 原始数据(存入状态)

错误处理:
├── 重试:网络问题、超时
├── 回退:使用缓存或默认值
└── 降级:返回部分结果

优化:
├── 缓存:缓存常见查询
├── 并行:多个查询并行执行
└── 预取:预测并预取数据

注意:
├── 只读:不修改外部状态
└── 原始数据:存储原始数据,不格式化

3.3 为什么数据节点需要特殊处理?

原因一:可能失败
- 网络问题
- 数据源不可用
- 查询超时
→ 需要重试策略

原因二:可能很慢
- 数据库查询慢
- API 响应慢
- 数据量大
→ 需要缓存和优化

原因三:数据可能不存在
- 查询结果为空
- 数据已删除
- 权限不足
→ 需要优雅处理

原因四:可以并行
- 多个独立查询
- 可以同时执行
→ 可以使用异步

3.4 实例:文档搜索节点

from langgraph.types import Command, RetryPolicy

def search_documentation(state: EmailState) -> Command:
    """
    数据节点:搜索知识库

    输入:分类结果中的主题
    输出:相关文档列表
    """

    # 构建查询
    classification = state['classification']
    query = f"{classification['intent']} {classification['topic']}"

    try:
        # 调用搜索 API
        results = knowledge_base.search(
            query=query,
            top_k=5,
            min_score=0.7
        )

        # 存储原始结果(不格式化)
        search_results = [doc.content for doc in results]

    except NetworkError as e:
        # 网络错误:记录并继续
        logger.warning(f"搜索网络错误: {e}")
        search_results = ["搜索暂时不可用,请稍后重试"]

    except TimeoutError as e:
        # 超时:使用缓存
        logger.warning(f"搜索超时: {e}")
        cached = cache.get(query)
        search_results = cached or ["搜索超时,请稍后重试"]

    except SearchError as e:
        # 其他错误:降级处理
        logger.error(f"搜索失败: {e}")
        search_results = []

    return Command(
        update={"search_results": search_results},
        goto="draft_response"
    )

# 配置重试策略(在图定义时)
graph.add_node(
    "search_documentation",
    search_documentation,
    retry_policy=RetryPolicy(
        max_attempts=3,          # 最多重试 3 次
        initial_interval=1.0,    # 初始等待 1 秒
        max_interval=10.0,       # 最大等待 10 秒
        backoff_factor=2.0       # 指数退避因子
    )
)

代码解析

1. 构建查询:
   - 从状态获取分类结果
   - 组合意图和主题作为查询

2. 调用搜索:
   - 设置参数(top_k, min_score)
   - 获取搜索结果

3. 存储原始数据:
   - 只存储文档内容
   - 不格式化(在起草节点格式化)

4. 错误处理:
   - NetworkError:记录日志,返回提示
   - TimeoutError:尝试使用缓存
   - SearchError:降级处理

5. 重试策略:
   - 在图定义时配置
   - LangGraph 自动重试

3.5 实例:客户历史查询节点

def lookup_customer_history(state: EmailState) -> Command:
    """
    数据节点:查询客户历史

    输入:客户邮箱
    输出:客户历史数据
    """

    customer_email = state['sender_email']

    try:
        # 查询 CRM 系统
        customer_data = crm.get_customer(customer_email)

        # 存储原始数据
        return Command(
            update={"customer_history": customer_data},
            goto="draft_response"
        )

    except CustomerNotFoundError:
        # 客户不存在:使用默认值
        logger.info(f"新客户: {customer_email}")
        return Command(
            update={"customer_history": {"tier": "standard", "is_new": True}},
            goto="draft_response"
        )

    except CRMAPIError as e:
        # API 错误:降级处理
        logger.error(f"CRM API 错误: {e}")
        return Command(
            update={"customer_history": {"tier": "standard", "error": True}},
            goto="draft_response"
        )

3.6 数据节点最佳实践

1. 存储原始数据

好的做法:
def search_docs(state):
    results = search(query)
    return {"search_results": results}  # 原始数据

def draft(state):
    # 在使用时格式化
    docs = "\n".join(state['search_results'])
    ...

不好的做法:
def search_docs(state):
    results = search(query)
    formatted = "\n".join(results)  # 格式化
    return {"formatted_docs": formatted}
# 其他节点可能需要不同格式

2. 设置重试策略

好的做法:
graph.add_node(
    "search",
    search_docs,
    retry_policy=RetryPolicy(max_attempts=3)
)

不好的做法:
def search_docs(state):
    for i in range(3):  # 手动重试
        try:
            return search(query)
        except:
            continue
# 重试逻辑应该在图配置中

3. 优雅降级

好的做法:
def search_docs(state):
    try:
        return search(query)
    except:
        return []  # 返回空列表,流程继续

不好的做法:
def search_docs(state):
    return search(query)  # 可能抛出异常
# 异常会中断整个流程

4. 考虑缓存

好的做法:
@cache.memoize(timeout=300)  # 缓存 5 分钟
def search_docs(state):
    return search(query)

不好的做法:
def search_docs(state):
    return search(query)  # 每次都查询
# 相同查询重复执行

四、动作节点

4.1 什么是动作节点?

动作节点是执行外部动作的节点,用于:

用途一:发送
- 发送邮件
- 推送通知
- 发送消息

用途二:创建
- 创建工单
- 创建记录
- 创建订单

用途三:更新
- 更新数据库
- 修改状态
- 更新配置

用途四:删除
- 删除记录
- 取消订单
- 清理数据

4.2 动作节点的特点

动作节点特点:

输入:
└── 动作参数(从状态获取)

处理:
├── 执行动作
└── 确认成功

输出:
└── 动作结果(如工单 ID)

错误处理:
├── 重试:网络问题
├── 补偿:失败时执行补偿动作
└── 告警:关键动作失败时告警

注意:
├── 有副作用:会改变外部状态
├── 不缓存:每次动作都是唯一的
├── 幂等性:相同参数多次执行结果相同
└── 需要确认:确保动作成功

4.3 为什么动作节点需要特殊处理?

原因一:有副作用
- 会改变外部状态
- 不能随意重试
→ 需要幂等性

原因二:不能缓存
- 每次动作都是唯一的
- 缓存会导致重复执行
→ 不应该缓存

原因三:失败影响大
- 发送失败:客户收不到回复
- 创建失败:数据丢失
→ 需要补偿机制

原因四:需要确认
- 不能假设成功
- 需要确认动作完成
→ 需要检查结果

4.4 什么是幂等性?

幂等性:相同参数多次执行,结果相同

示例一:发送邮件(非幂等)
send_email(to="a@example.com", body="Hello")
第一次:发送成功
第二次:又发送一封(重复了!)
→ 不是幂等的

示例二:发送邮件(幂等)
send_email_with_id(id="msg-001", to="a@example.com", body="Hello")
第一次:发送成功
第二次:发现 id 已存在,跳过
→ 是幂等的

示例三:创建工单(非幂等)
create_bug(title="Bug")
第一次:创建工单 #123
第二次:创建工单 #124(重复了!)
→ 不是幂等的

示例四:创建工单(幂等)
create_bug_if_not_exists(email_id="email-001", title="Bug")
第一次:创建工单 #123
第二次:发现已存在,返回 #123
→ 是幂等的

4.5 实例:发送邮件节点

def send_reply(state: EmailState) -> dict:
    """
    动作节点:发送邮件回复

    输入:收件人、回复内容
    输出:发送结果
    """

    try:
        # 发送邮件(幂等操作)
        result = email_service.send(
            message_id=state['email_id'],  # 使用邮件 ID 作为幂等键
            to=state['sender_email'],
            subject=f"Re: {state.get('email_subject', '您的咨询')}",
            body=state['response_text']
        )

        # 记录发送成功
        logger.info(f"邮件已发送: {state['email_id']} -> {state['sender_email']}")

        return {"sent": True, "message_id": result.id}

    except EmailSendError as e:
        # 发送失败:记录并抛出
        logger.error(f"邮件发送失败: {state['email_id']}, 错误: {e}")

        # 发送失败通知
        alert_service.notify(
            level="error",
            message=f"邮件发送失败: {state['email_id']}",
            details=str(e)
        )

        raise  # 让 LangGraph 处理错误(可能重试)

# 配置重试策略
graph.add_node(
    "send_reply",
    send_reply,
    retry_policy=RetryPolicy(max_attempts=3)
)

4.6 实例:Bug 跟踪节点

def bug_tracking(state: EmailState) -> Command:
    """
    动作节点:创建 Bug 工单

    输入:邮件内容、分类结果
    输出:工单 ID
    """

    try:
        # 创建 Bug 工单(幂等操作)
        ticket = bug_tracker.create_if_not_exists(
            external_id=state['email_id'],  # 幂等键
            title=f"Bug: {state['classification']['topic']}",
            description=state['email_content'],
            priority=map_priority(state['classification']['urgency']),
            reporter=state['sender_email'],
            labels=['auto-created', 'from-email']
        )

        logger.info(f"Bug 工单已创建: {ticket.id}")

        return Command(
            update={"ticket_id": ticket.id},
            goto="draft_response"
        )

    except BugTrackerError as e:
        # 创建失败:记录并继续
        logger.error(f"Bug 工单创建失败: {e}")

        # 返回失败标记,流程继续
        return Command(
            update={"ticket_id": "FAILED", "bug_error": str(e)},
            goto="draft_response"
        )

4.7 动作节点最佳实践

1. 确保幂等性

好的做法:
def send_email(state):
    return email_service.send(
        message_id=state['email_id'],  # 幂等键
        ...
    )

不好的做法:
def send_email(state):
    return email_service.send(...)  # 没有幂等键
# 重试时会重复发送

2. 不缓存结果

好的做法:
def send_email(state):
    return email_service.send(...)  # 每次都执行

不好的做法:
@cache.memoize()
def send_email(state):
    return email_service.send(...)
# 缓存会导致不执行

3. 记录日志

好的做法:
def send_email(state):
    logger.info(f"发送邮件: {state['email_id']}")
    result = email_service.send(...)
    logger.info(f"发送成功: {result.id}")
    return result

不好的做法:
def send_email(state):
    return email_service.send(...)
# 没有日志,难以调试

4. 考虑补偿

好的做法:
def send_email(state):
    try:
        return email_service.send(...)
    except:
        # 补偿:发送失败通知
        alert_service.notify("邮件发送失败")
        raise

不好的做法:
def send_email(state):
    return email_service.send(...)  # 失败时没有补偿

五、用户节点

5.1 什么是用户节点?

用户节点是等待人工输入的节点,用于:

用途一:审核
- 审核 AI 生成的结果
- 确认是否正确
- 修改或拒绝

用途二:确认
- 确认是否执行某个动作
- 批准或拒绝
- 选择选项

用途三:提供信息
- 提供 AI 缺少的信息
- 回答问题
- 做出选择

用途四:干预
- 修正 AI 的错误
- 提供额外指导
- 接管处理

5.2 用户节点的特点

用户节点特点:

输入:
└── 展示给用户的信息

处理:
├── 使用 interrupt() 暂停
├── 等待用户输入
└── 恢复执行

输出:
└── 用户输入的数据

注意:
├── 可无限期等待
├── 需要配置检查点
├── 恢复时传入用户输入
└── 可能需要超时处理

5.3 为什么用户节点需要特殊处理?

原因一:可能等待很长时间
- 用户可能不在线
- 可能几小时后才响应
→ 需要检查点持久化

原因二:需要恢复执行
- 用户响应后继续执行
- 需要记住之前的状态
→ 需要检查点

原因三:可能取消
- 用户可能取消操作
- 需要处理取消逻辑
→ 需要处理取消

原因四:需要上下文
- 用户需要看到相关信息
- 才能做出正确决策
→ 需要提供足够信息

5.4 什么是 interrupt()?

interrupt() 是 LangGraph 提供的函数,用于暂停执行:

工作流程:
1. 调用 interrupt(data)
2. LangGraph 暂停执行
3. 保存当前状态到检查点
4. 返回 data 给调用者
5. 等待用户输入
6. 用户输入后,恢复执行
7. interrupt() 返回用户输入

示例:
def human_review(state):
    # 暂停,等待用户审核
    user_input = interrupt({
        "draft": state['draft'],
        "message": "请审核"
    })

    # 用户输入后继续
    if user_input['approved']:
        return {"approved": True}
    else:
        return {"approved": False}

5.5 实例:人工审核节点

from langgraph.types import interrupt, Command

def human_review(state: EmailState) -> Command:
    """
    用户节点:等待人工审核

    展示:原始邮件、草稿回复、分类信息
    等待:审核结果 + 可选修改
    """

    # 暂停执行,等待用户输入
    user_input = interrupt({
        "email": state['email_content'],
        "sender": state['sender_email'],
        "draft": state['response_text'],
        "classification": state['classification'],
        "message": "请审核回复内容",
        "actions": ["approve", "edit", "reject"]
    })

    # 用户输入后继续执行
    action = user_input.get('action', 'approve')

    if action == 'approve':
        # 审核通过
        return Command(
            update={"approved": True},
            goto="send_reply"
        )

    elif action == 'edit':
        # 修改后通过
        edited_response = user_input.get('edited_response', state['response_text'])
        return Command(
            update={
                "response_text": edited_response,
                "approved": True
            },
            goto="send_reply"
        )

    else:  # reject
        # 审核不通过,重新起草
        feedback = user_input.get('feedback', '')
        return Command(
            update={
                "approved": False,
                "feedback": feedback
            },
            goto="draft_response"
        )

使用示例

# 编译图(需要检查点)
from langgraph.checkpoint.memory import MemorySaver

checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

# 第一次调用(会暂停)
result = graph.invoke(
    {"email_id": "email-001"},
    config={"configurable": {"thread_id": "thread-001"}}
)
# 返回:{"__interrupt__": {...}}

# 用户审核后,恢复执行
result = graph.invoke(
    {"action": "approve"},  # 用户输入
    config={"configurable": {"thread_id": "thread-001"}}
)
# 继续执行,发送邮件

5.6 实例:请求缺失信息节点

def request_missing_info(state: EmailState) -> Command:
    """
    用户节点:请求缺失的信息

    场景:需要客户 ID 但邮件中没有
    """

    # 检查是否有客户 ID
    if not state.get('customer_id'):
        # 暂停,请求客户 ID
        user_input = interrupt({
            "message": "需要客户 ID 才能查询历史",
            "request": "请提供客户的账号 ID",
            "email": state['sender_email']
        })

        # 用户输入后继续
        return Command(
            update={"customer_id": user_input['customer_id']},
            goto="lookup_customer_history"  # 重新查询
        )

    # 已有客户 ID,继续
    return Command(goto="lookup_customer_history")

5.7 用户节点最佳实践

1. 提供足够上下文

好的做法:
user_input = interrupt({
    "email": state['email'],
    "draft": state['draft'],
    "classification": state['classification'],
    "message": "请审核"
})

不好的做法:
user_input = interrupt({
    "draft": state['draft']
})
# 用户看不到原始邮件,难以判断

2. 支持多种操作

好的做法:
if user_input['action'] == 'approve':
    ...
elif user_input['action'] == 'edit':
    ...
elif user_input['action'] == 'reject':
    ...

不好的做法:
if user_input['approved']:
    ...
else:
    ...
# 只支持通过/拒绝,不支持修改

3. 配置检查点

好的做法:
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

不好的做法:
graph = builder.compile()  # 没有检查点
# interrupt() 会失败

4. 设置超时提醒

好的做法:
def human_review(state):
    # 设置超时
    user_input = interrupt({
        "message": "请审核",
        "timeout": 3600  # 1 小时超时
    })

    if user_input.get('timeout'):
        # 超时处理
        return Command(goto="auto_approve")

    ...

不好的做法:
def human_review(state):
    user_input = interrupt({"message": "请审核"})
    # 没有超时处理,可能永远等待

六、节点类型决策树

如何判断一个节点属于哪种类型?

这个节点做什么?

├─ 是否调用大模型?
│   ├─ 是 → LLM 节点
│   └─ 否 → 继续
│
├─ 是否等待用户输入?
│   ├─ 是 → 用户节点
│   └─ 否 → 继续
│
├─ 是否修改外部状态?
│   ├─ 是 → 动作节点
│   └─ 否 → 继续
│
├─ 是否查询外部数据?
│   ├─ 是 → 数据节点
│   └─ 否 → 纯计算节点
│
└─ 纯计算节点
    - 不需要外部交互
    - 纯数据处理
    - 示例:格式转换、计算

邮件智能体节点分类

节点 类型 判断依据
read_email 数据节点 从邮箱读取,只读操作
classify_intent LLM 节点 使用 LLM 分类
search_documentation 数据节点 搜索知识库,只读操作
bug_tracking 动作节点 创建工单,有副作用
draft_response LLM 节点 使用 LLM 生成
human_review 用户节点 等待人工审核
send_reply 动作节点 发送邮件,有副作用

七、本章小结

7.1 四种节点类型

类型 用途 特点 最佳实践
LLM 节点 理解、生成、推理 使用结构化输出 分离静态/动态上下文
数据节点 查询、检索 可缓存、需重试 存储原始数据
动作节点 发送、创建、更新 有副作用、不缓存 确保幂等性
用户节点 审核、确认、提供信息 使用 interrupt() 提供足够上下文

7.2 类型选择指南

问题 1:是否调用大模型?
├─ 是 → LLM 节点
└─ 否 → 问题 2

问题 2:是否等待用户输入?
├─ 是 → 用户节点
└─ 否 → 问题 3

问题 3:是否修改外部状态?
├─ 是 → 动作节点
└─ 否 → 数据节点

7.3 常见错误

错误一:LLM 节点不使用结构化输出
→ 结果:解析失败,流程中断

错误二:数据节点格式化数据
→ 结果:其他节点无法使用原始数据

错误三:动作节点不保证幂等性
→ 结果:重试时重复执行

错误四:用户节点不配置检查点
→ 结果:interrupt() 失败

7.4 下一章预告

下一章,我们将深入讲解 Step 3:设计状态

我们会学习:

  • 状态应该包含什么数据
  • 状态设计原则
  • 如何存储原始数据
  • 状态更新机制

八、思考题

思考题一:翻译节点

如果要增加一个"翻译回复"节点(将回复翻译成客户语言),应该是什么类型?

提示:
- 是否需要调用大模型?
- 是否需要查询数据?
- 是否有副作用?

思考题二:节点类型区别

数据节点和动作节点有什么本质区别?

提示:
- 是否有副作用?
- 是否可以缓存?
- 是否需要幂等性?

思考题三:检查点的作用

用户节点为什么需要检查点?如果没有检查点会怎样?

提示:
- 用户可能很久才响应
- 如何记住之前的状态?
- 如何恢复执行?

系列导航

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐