本系列教程将带你深入理解 LangGraph 的思维方式,学会用 LangGraph 的方式构建智能体。

本章主题:Step 4 - 为每个节点编写函数 阅读时间:约 25 分钟


开篇:节点是什么?

在 LangGraph 中,节点就是一个普通的 Python 函数

def my_node(state: MyState) -> dict:
    # 做一些工作
    result = do_something(state)

    # 返回状态更新
    return {"result": result}

就这么简单! 但有几个要点:

  1. 接收状态:函数的第一个参数是当前状态
  2. 返回更新:返回一个字典,LangGraph 自动合并到状态
  3. 单一职责:每个节点只做一件事

为什么节点设计得这么简单?

传统框架:
- 需要继承特定类
- 需要实现特定接口
- 学习成本高
- 代码耦合度高

LangGraph:
- 就是普通函数
- 不需要继承任何类
- 学习成本低
- 代码可复用性高

小贴士:你可以把节点函数当作普通的 Python 函数来测试,不需要启动整个图。


一、节点的基本结构

1.1 简单节点

最简单的节点:只读取状态,返回更新

def read_email(state: EmailState) -> dict:
    """读取邮件节点"""

    # 第一步:从状态获取输入
    email_id = state['email_id']

    # 第二步:执行工作
    email = email_service.get_email(email_id)

    # 第三步:返回更新
    return {
        'email_content': email.content,
        'sender_email': email.sender
    }

执行流程

1. LangGraph 调用 read_email(state)
2. 函数执行,返回 {"email_content": ..., "sender_email": ...}
3. LangGraph 自动合并到状态:
   state["email_content"] = ...
   state["sender_email"] = ...
4. 其他字段保持不变

1.2 带条件跳转的节点

有时候,节点需要决定下一步去哪。这时使用 Command

from langgraph.types import Command

def classify_intent(state: EmailState) -> Command:
    """分类意图节点"""

    # 执行工作
    classification = llm.classify(state['email_content'])

    # 决定下一步
    if classification['urgency'] == 'critical':
        next_node = 'human_review'  # 紧急,需要人工审核
    else:
        next_node = 'draft_response'  # 不紧急,直接起草

    # 返回更新 + 跳转
    return Command(
        update={'classification': classification},
        goto=next_node
    )

Command 是什么?

Command 是 LangGraph 的特殊返回类型,包含两部分:

1. update:状态更新(字典)
2. goto:下一个节点(字符串)

等价于:
- 先更新状态
- 然后跳转到指定节点

什么时候用 Command?

需要根据结果决定下一步时:

场景一:分类后不同处理
if classification == 'bug':
    goto = 'bug_tracking'
else:
    goto = 'draft_response'

场景二:检查后不同处理
if not state.get('email_content'):
    goto = 'read_email'  # 回去读取
else:
    goto = 'classify'

场景三:错误后不同处理
if error:
    goto = 'error_handler'
else:
    goto = 'next_step'

1.3 异步节点

如果节点需要调用异步 API(如异步 HTTP 请求),使用 async def

async def search_documentation(state: EmailState) -> dict:
    """异步搜索文档"""

    # 异步调用
    results = await knowledge_base.async_search(
        state['classification']['topic']
    )

    return {'search_results': results}

什么时候用异步?

需要异步的场景:
- 调用异步 HTTP API
- 访问异步数据库
- 并行执行多个 I/O 操作

不需要异步的场景:
- 纯计算
- 调用同步 API
- 简单的状态更新

混合使用

# 可以在同一个图中混合使用同步和异步节点
builder.add_node("sync_node", sync_node)      # 同步
builder.add_node("async_node", async_node)    # 异步

# LangGraph 会自动处理

二、实现邮件智能体的节点

现在,让我们逐个实现邮件智能体的七个节点。

2.1 节点一:读取邮件

def read_email(state: EmailState) -> dict:
    """
    读取邮件节点

    类型:数据节点
    输入:email_id
    输出:email_content, sender_email
    """
    email_id = state['email_id']

    try:
        # 调用邮件服务
        email = email_service.get_email(email_id)

        # 返回邮件内容
        return {
            'email_content': email.content,
            'sender_email': email.sender,
            'email_subject': email.subject
        }

    except EmailNotFoundError:
        # 邮件不存在,记录错误
        logger.error(f"邮件 {email_id} 不存在")
        return {
            'error': f'邮件 {email_id} 不存在'
        }

设计要点

1. 输入明确:只需要 email_id
2. 输出完整:返回邮件的所有必要信息
3. 错误处理:邮件不存在时优雅处理
4. 日志记录:出错时记录日志

2.2 节点二:分类意图

这是第一个 LLM 节点,也是最重要的节点之一。

from langchain_openai import ChatOpenAI
from langgraph.types import Command
from pydantic import BaseModel

# 初始化 LLM
llm = ChatOpenAI(model='gpt-4')

# 定义输出结构
class Classification(BaseModel):
    intent: str      # 意图类型
    urgency: str     # 紧急程度
    topic: str       # 主题
    summary: str     # 摘要

def classify_intent(state: EmailState) -> Command:
    """
    分类意图节点

    类型:LLM 节点
    输入:email_content
    输出:classification
    """
    # 使用结构化输出,确保 LLM 返回正确格式
    structured_llm = llm.with_structured_output(Classification)

    # 格式化提示词
    prompt = f"""
    分析这封客户邮件并分类。

    邮件内容:
    {state['email_content']}

    发送者:
    {state['sender_email']}

    分类类别:
    - question:产品使用问题
    - bug:Bug 报告
    - billing:账单相关问题
    - feature:功能请求
    - complex:复杂技术问题

    紧急程度:
    - critical:需要立即处理
    - high:24小时内处理
    - medium:3天内处理
    - low:一周内处理
    """

    # 调用 LLM
    classification = structured_llm.invoke(prompt)

    # 根据分类决定下一步
    if classification.intent == 'billing' or classification.urgency == 'critical':
        # 账单问题或紧急问题,需要人工审核
        next_node = 'human_review'
    elif classification.intent in ['question', 'feature']:
        # 问题或功能请求,搜索文档
        next_node = 'search_documentation'
    elif classification.intent == 'bug':
        # Bug 报告,创建工单
        next_node = 'bug_tracking'
    else:
        # 复杂问题,直接起草
        next_node = 'draft_response'

    return Command(
        update={'classification': classification.model_dump()},
        goto=next_node
    )

设计要点

1. 结构化输出:
   - 使用 Pydantic 定义输出结构
   - 确保 LLM 返回正确格式
   - 避免解析错误

2. 提示词设计:
   - 清晰的任务描述
   - 明确的分类标准
   - 具体的例子

3. 条件跳转:
   - 根据分类结果决定下一步
   - 不同类型走不同路径
   - 紧急问题需要人工审核

为什么使用结构化输出?

不使用结构化输出:
response = llm.invoke(prompt)
# 返回字符串,需要手动解析
# 可能格式不对
# 解析可能失败

使用结构化输出:
structured_llm = llm.with_structured_output(Classification)
response = structured_llm.invoke(prompt)
# 返回 Pydantic 对象
# 格式保证正确
# 类型安全

2.3 节点三:搜索文档

def search_documentation(state: EmailState) -> Command:
    """
    搜索文档节点

    类型:数据节点
    输入:classification
    输出:search_results
    """
    classification = state['classification']

    # 构建查询
    query = f"{classification['intent']} {classification['topic']}"

    try:
        # 搜索知识库
        results = knowledge_base.search(
            query=query,
            top_k=5,        # 返回前 5 个结果
            min_score=0.6   # 最低相似度 0.6
        )

        # 提取文档内容
        search_results = [doc.content for doc in results]

        logger.info(f"搜索 '{query}' 找到 {len(search_results)} 个结果")

    except SearchError as e:
        # 搜索失败,记录错误并继续
        logger.warning(f"搜索失败: {e}")
        search_results = ['搜索暂时不可用,请稍后重试']

    return Command(
        update={'search_results': search_results},
        goto='draft_response'
    )

设计要点

1. 查询构建:
   - 使用分类结果构建查询
   - 意图 + 主题 = 更准确的查询

2. 参数调优:
   - top_k:返回结果数量
   - min_score:最低相似度
   - 根据实际效果调整

3. 错误处理:
   - 搜索失败不影响流程
   - 提供友好的错误信息
   - 继续执行下一步

2.4 节点四:Bug 跟踪

def bug_tracking(state: EmailState) -> Command:
    """
    Bug 跟踪节点

    类型:动作节点
    输入:email_content, classification
    输出:ticket_id
    """
    try:
        # 创建 Bug 工单
        ticket = bug_tracker.create(
            title=f"Bug: {state['classification']['topic']}",
            description=state['email_content'],
            priority=state['classification']['urgency'],
            reporter=state['sender_email']
        )

        logger.info(f"创建 Bug 工单: {ticket.id}")

        return Command(
            update={'ticket_id': ticket.id},
            goto='draft_response'
        )

    except BugTrackerError as e:
        logger.error(f"创建 Bug 工单失败: {e}")

        # 即使失败也继续,只是没有工单号
        return Command(
            update={'ticket_id': 'FAILED'},
            goto='draft_response'
        )

设计要点

1. 动作节点特点:
   - 有副作用(创建工单)
   - 需要错误处理
   - 失败后如何处理

2. 错误处理策略:
   - 记录错误日志
   - 设置失败标记
   - 继续执行(不中断流程)

3. 为什么不中断?
   - 工单创建失败不影响回复
   - 可以在回复中说明
   - 人工可以后续处理

2.5 节点五:起草回复

这是第二个 LLM 节点,负责生成回复内容。

def draft_response(state: EmailState) -> Command:
    """
    起草回复节点

    类型:LLM 节点
    输入:email_content, classification, search_results
    输出:response_text
    """
    classification = state['classification']

    # 构建上下文
    context_parts = []

    # 添加搜索结果
    if state.get('search_results'):
        docs = "\n".join([f"- {doc}" for doc in state['search_results']])
        context_parts.append(f"相关文档:\n{docs}")

    # 添加工单号
    if state.get('ticket_id'):
        context_parts.append(f"Bug 工单号:{state['ticket_id']}")

    # 添加审核反馈(如果有)
    if state.get('feedback'):
        context_parts.append(f"审核反馈:{state['feedback']}")

    # 格式化提示词
    prompt = f"""
    为这封客户邮件起草回复。

    原始邮件:
    {state['email_content']}

    分类信息:
    - 意图:{classification['intent']}
    - 紧急程度:{classification['urgency']}
    - 主题:{classification['topic']}

    {chr(10).join(context_parts)}

    回复指南:
    - 专业、友好、有帮助
    - 直接回答客户问题
    - 如有相关文档,引用具体内容
    - 如是 Bug,告知工单号和后续跟进
    - 语言简洁,避免过于冗长
    """

    # 调用 LLM
    response = llm.invoke(prompt)

    # 决定是否需要人工审核
    needs_review = (
        classification['urgency'] in ['high', 'critical'] or
        classification['intent'] == 'complex'
    )

    next_node = 'human_review' if needs_review else 'send_reply'

    return Command(
        update={'response_text': response.content},
        goto=next_node
    )

设计要点

1. 上下文构建:
   - 检查可选字段是否存在
   - 按需添加上下文
   - 格式化清晰

2. 提示词设计:
   - 包含所有相关信息
   - 明确回复要求
   - 提供回复指南

3. 审核决策:
   - 紧急问题需要审核
   - 复杂问题需要审核
   - 其他直接发送

2.6 节点六:人工审核

这是用户节点,使用 interrupt() 暂停执行。

from langgraph.types import interrupt

def human_review(state: EmailState) -> Command:
    """
    人工审核节点

    类型:用户节点
    输入:email_content, response_text, classification
    输出:approved, edited_response, feedback
    """
    # 暂停执行,等待用户输入
    user_input = interrupt({
        'email': state['email_content'],
        'draft': state['response_text'],
        'classification': state['classification'],
        'message': '请审核回复内容'
    })

    # 用户响应后继续
    if user_input.get('approved'):
        # 审核通过
        final_response = user_input.get('edited_response', state['response_text'])

        return Command(
            update={
                'response_text': final_response,
                'approved': True
            },
            goto='send_reply'
        )
    else:
        # 审核不通过,重新起草
        return Command(
            update={
                'approved': False,
                'feedback': user_input.get('feedback', '')
            },
            goto='draft_response'
        )

interrupt() 是什么?

interrupt() 是 LangGraph 的特殊函数:

1. 调用时:
   - 图执行暂停
   - 返回传入的数据给调用者
   - 等待用户输入

2. 用户响应后:
   - 图从暂停点继续
   - user_input 接收用户数据
   - 继续执行后续代码

3. 使用场景:
   - 需要人工确认
   - 需要人工输入
   - 需要人工选择

工作流程

1. 图执行到 human_review 节点
2. interrupt() 被调用
3. 图暂停,返回数据给用户
4. 用户审核,提供输入
5. 图恢复,user_input 接收数据
6. 根据用户输入决定下一步

2.7 节点七:发送回复

def send_reply(state: EmailState) -> dict:
    """
    发送回复节点

    类型:动作节点
    输入:sender_email, response_text
    输出:无
    """
    try:
        # 发送邮件
        result = email_service.send(
            to=state['sender_email'],
            subject=f"Re: {state.get('email_subject', '您的咨询')}",
            body=state['response_text']
        )

        logger.info(f"邮件已发送给 {state['sender_email']}")

        return {'sent': True, 'message_id': result.id}

    except EmailSendError as e:
        logger.error(f"邮件发送失败: {e}")
        raise  # 让 LangGraph 处理错误

设计要点

1. 动作节点:
   - 发送邮件(有副作用)
   - 失败时抛出异常

2. 错误处理:
   - 记录错误日志
   - 抛出异常让 LangGraph 处理
   - 可以配置重试策略

3. 为什么这里抛出异常?
   - 发送失败是严重错误
   - 需要重试或人工介入
   - 不应该静默失败

三、节点实现最佳实践

3.1 添加文档字符串

每个节点都应该有清晰的文档字符串:

def search_documentation(state: EmailState) -> Command:
    """
    搜索文档节点

    类型:数据节点
    输入:state["classification"]["topic"]
    输出:state["search_results"]

    从知识库搜索相关文档,为起草回复提供上下文。

    错误处理:
    - 搜索失败时返回错误提示
    - 不中断流程
    """
    ...

文档字符串应该包含

  • 节点类型
  • 输入字段
  • 输出字段
  • 功能描述
  • 错误处理策略

3.2 处理边界情况

节点应该处理各种边界情况:

def draft_response(state: EmailState) -> Command:
    # 检查必需字段
    if not state.get('classification'):
        logger.warning("缺少分类信息,返回分类节点")
        return Command(goto='classify_intent')

    if not state.get('email_content'):
        logger.warning("缺少邮件内容,返回读取节点")
        return Command(goto='read_email')

    # 处理可选字段
    search_results = state.get('search_results', [])
    ticket_id = state.get('ticket_id')
    feedback = state.get('feedback')

    ...

常见的边界情况

  • 必需字段缺失
  • 可选字段缺失
  • 数据格式不对
  • 空值处理

3.3 记录日志

良好的日志记录有助于调试和监控:

import logging

logger = logging.getLogger(__name__)

def bug_tracking(state: EmailState) -> Command:
    logger.info(f"开始创建 Bug 工单")
    logger.debug(f"邮件主题: {state['classification']['topic']}")

    try:
        ticket = bug_tracker.create(...)
        logger.info(f"Bug 工单创建成功: {ticket.id}")
        ...
    except Exception as e:
        logger.error(f"Bug 工单创建失败: {e}")
        ...

日志级别

  • DEBUG:详细调试信息
  • INFO:正常操作信息
  • WARNING:警告信息
  • ERROR:错误信息

3.4 使用类型提示

类型提示让代码更清晰,IDE 支持更好:

from typing import Optional, TypedDict

def search_documentation(
    state: EmailState
) -> Command[TypedDict, str]:
    """
    类型提示说明:
    - state: EmailState - 输入状态类型
    - 返回 Command[dict, str] - 更新类型和跳转目标类型
    """
    ...

四、测试节点

4.1 单元测试

每个节点可以单独测试,这是 LangGraph 的一大优势:

def test_classify_intent():
    """测试分类节点"""

    # 构造测试状态
    state = {
        'email_content': '如何重置密码?',
        'sender_email': 'test@example.com'
    }

    # 调用节点
    result = classify_intent(state)

    # 验证结果
    assert result.update['classification']['intent'] == 'question'
    assert result.goto in ['search_documentation', 'human_review', 'draft_response']

def test_search_documentation():
    """测试搜索节点"""

    state = {
        'classification': {
            'intent': 'question',
            'topic': '密码重置'
        }
    }

    result = search_documentation(state)

    assert 'search_results' in result.update
    assert result.goto == 'draft_response'

4.2 Mock 外部依赖

测试时需要 Mock 外部依赖:

from unittest.mock import patch, Mock

def test_search_documentation_with_mock():
    state = {
        'classification': {
            'intent': 'question',
            'topic': '密码重置'
        }
    }

    # Mock 知识库搜索
    with patch('knowledge_base.search') as mock_search:
        # 设置返回值
        mock_search.return_value = [
            Mock(content='重置密码步骤...'),
            Mock(content='密码安全要求...')
        ]

        # 调用节点
        result = search_documentation(state)

        # 验证
        assert len(result.update['search_results']) == 2
        assert result.goto == 'draft_response'

4.3 测试 LLM 节点

测试 LLM 节点需要 Mock LLM:

def test_classify_intent_with_mock_llm():
    state = {
        'email_content': '我发现了 Bug',
        'sender_email': 'test@example.com'
    }

    # Mock LLM
    with patch('llm.with_structured_output') as mock_structured:
        mock_llm = Mock()
        mock_llm.invoke.return_value = Classification(
            intent='bug',
            urgency='high',
            topic='Bug 报告',
            summary='用户报告了 Bug'
        )
        mock_structured.return_value = mock_llm

        # 调用节点
        result = classify_intent(state)

        # 验证
        assert result.update['classification']['intent'] == 'bug'
        assert result.goto == 'bug_tracking'

五、本章小结

5.1 节点实现要点

  1. 基本结构

    • 接收状态作为参数
    • 执行工作
    • 返回更新(字典或 Command)
  2. 使用 Command

    • 返回更新 + 跳转目标
    • 控制流程走向
    • 实现条件分支
  3. 错误处理

    • 捕获预期错误
    • 记录日志
    • 优雅降级或抛出异常
  4. 最佳实践

    • 添加文档字符串
    • 处理边界情况
    • 使用类型提示
    • 编写单元测试

5.2 七个节点总结

节点 类型 核心操作 关键技术
read_email 数据节点 读取邮件 错误处理
classify_intent LLM 节点 LLM 分类 结构化输出、Command
search_documentation 数据节点 搜索知识库 错误处理
bug_tracking 动作节点 创建工单 副作用处理
draft_response LLM 节点 LLM 生成回复 上下文构建
human_review 用户节点 等待审核 interrupt
send_reply 动作节点 发送邮件 异常抛出

5.3 下一章预告

下一章,我们将深入讲解 Step 5:连接节点与错误处理

我们会学习:

  • 如何组装图
  • 如何定义边和条件跳转
  • 错误处理策略

六、思考题

  1. 如果要增加一个"翻译回复"节点,应该怎么实现?提示:考虑放在哪个位置,需要什么输入输出。

  2. 人工审核节点为什么使用 interrupt()?如果不使用会怎样?提示:考虑执行流程和用户体验。

  3. 如何测试一个包含 LLM 调用的节点?提示:考虑 Mock LLM 的返回值。


系列导航

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐