Agent 智能体开发实战指南(四):流式输出与状态管理

系列导读:这是《Agent 智能体开发实战指南》系列的第四篇,将深入讲解 LangChain Agent 的流式输出系统,包括各种 stream_mode 模式详解、状态管理、调试技巧,以及如何在生产环境中实现实时反馈。


一、为什么需要流式输出?

1.1 传统方式 vs 流式输出

传统方式(agent.invoke)

# 等待全部执行完成后一次性返回
result = agent.invoke(input_dict)
print(result)  # 几秒后,突然输出全部内容

问题

  • 用户等待时间长

流式输出(agent.stream)

# 实时输出每一步的结果
for chunk in agent.stream(input_dict):
    print(chunk, end="", flush=True)  # 实时显示

优势

  • 用户实时看到进展,让输出像对话一样,体验更好
  • 减少输出时间,优化用户体验

1.2 应用场景对比

场景 推荐方式 原因
后台批处理 invoke 无需实时反馈
CLI 调试 stream 观察执行过程
聊天机器人 stream 打字机效果
API 服务 stream SSE 推送

二、stream_mode 详解

2.1 七种模式总览

模式 输出内容 适用场景
values 完整的 State 字典 调试、状态快照
updates 增量更新 生产环境、UI 更新
messages LLM token 流 聊天机器人打字机效果
debug 详细执行事件 开发调试
custom 自定义数据 进度通知
checkpoints 检查点事件 状态恢复
tasks 任务生命周期 任务监控

2.2 values 模式:完整状态快照

特点:每个步骤后输出完整的 State 字典

for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "计算我的 BMI"}]},
    stream_mode="values"
):
    # chunk 是完整的 State 字典
    print(f"当前消息数:{len(chunk['messages'])}")
    print(f"最新消息:{chunk['messages'][-1].content}")

输出示例

当前消息数:1
最新消息:计算我的 BMI
---
当前消息数:2
最新消息:### 思考
要计算 BMI,需要知道体重和身高...
---
当前消息数:3
最新消息:90  # 工具返回
---
...

适用场景

  • 调试:查看每一步的完整状态
  • 日志记录:保存完整执行历史
  • 状态恢复:可以随时从任意点继续

注意事项

  • ⚠️ 如果 State 数据量大,传输开销大
  • ⚠️ 每次都是完整拷贝,内存占用高

2.3 updates 模式:增量更新

特点:只输出当前步骤的更新内容

for chunk in agent.stream(
    input_dict,
    stream_mode="updates"
):
    # chunk 是字典,键是节点名,值是该节点的输出
    print(f"节点更新:{chunk}")

输出示例

节点更新:{'agent': {'messages': [AIMessage(content='### 思考...')]}}
节点更新:{'tools': {'messages': [ToolMessage(content='90')]}}
节点更新:{'agent': {'messages': [AIMessage(content='### 观察...')]}}

适用场景

  • 生产环境:更高效
  • 前端 UI:只需更新变化部分
  • 带宽受限:减少传输量

注意事项

  • ⚠️ 需要在客户端维护完整 State
  • ⚠️ 通过不断 merge 更新来拼凑完整状态

2.4 messages 模式:LLM token 流

特点:实时输出 LLM 生成的每个 token

for chunk in agent.stream(
    input_dict,
    stream_mode="messages"
):
    # chunk 是元组:(message_chunk, metadata)
    message_chunk, metadata = chunk
    print(message_chunk.content, end="", flush=True)

输出示例

### 思
考
要
计
算
B
M
I
...

适用场景

  • 聊天机器人:打字机效果
  • 实时反馈:用户立即看到内容
  • 长文本生成:减少等待焦虑

注意事项

  • ⚠️ 此模式独立于 State 结构
  • ⚠️ 只捕获 LLM 输出,不包含工具调用

2.5 debug 模式:详细调试信息

特点:输出图执行的详细事件

for chunk in agent.stream(
    input_dict,
    stream_mode="debug"
):
    print(chunk)

输出示例

{'type': 'node_start', 'node': 'agent', 'timestamp': '...'}
{'type': 'node_end', 'node': 'agent', 'duration': 1.23}
{'type': 'tool_call', 'tool': 'get_weight', 'args': {...}}
{'type': 'state_update', 'changes': {...}}

适用场景

  • 开发调试:排查逻辑错误
  • 性能分析:找出瓶颈节点
  • 学习理解:深入理解执行流程

注意事项

  • 信息量巨大
  • 不适合生产环境
  • 不适合面向用户

2.6 模式对比总结

维度 values updates messages debug
数据量 最小 巨大
完整性 完整 增量 仅 token 元数据
调试价值 最高
生产适用 ⚠️

三、实战:获取最新消息

3.1 标准写法

for chunk in agent.stream(input_dict, stream_mode="values"):
    # 获取最新消息
    latest_message = chunk['messages'][-1]
    
    # 处理消息内容
    if latest_message.content:
        print(latest_message.content)
    
    # 检查是否有工具调用
    try:
        if latest_message.tool_calls:
            print(f"工具调用:{[tc['name'] for tc in latest_message.tool_calls]}")
    except AttributeError:
        # 某些消息类型没有 tool_calls 属性
        pass

3.2 为什么用 [-1]?

原因:流式输出时,每个 chunk 包含从开始到当前的所有消息

chunk 1: [用户消息]
chunk 2: [用户消息,Agent 思考]
chunk 3: [用户消息,Agent 思考,工具返回]
chunk 4: [用户消息,Agent 思考,工具返回,Agent 观察]
...

最新消息 = 当前步骤的输出 = chunk[‘messages’][-1]

3.3 完整处理逻辑

def process_stream(agent, query):
    input_dict = {
        "messages": [{"role": "user", "content": query}]
    }
    
    for chunk in agent.stream(input_dict, stream_mode="values"):
        latest = chunk['messages'][-1]
        msg_type = type(latest).__name__
        
        if msg_type == "HumanMessage":
            print(f"👤 用户:{latest.content}")
        
        elif msg_type == "AIMessage":
            if latest.content:
                print(f"🤖 Agent: {latest.content}")
            
            # 检查工具调用
            if hasattr(latest, 'tool_calls') and latest.tool_calls:
                tools = [tc['name'] for tc in latest.tool_calls]
                print(f"🔧 调用工具:{tools}")
        
        elif msg_type == "ToolMessage":
            print(f"📦 工具返回:{latest.content[:50]}...")
        
        print("-" * 50)

四、Streamlit 集成实战

4.1 完整示例

import time
import streamlit as st
from agent.react_agent import ReactAgent

# 页面配置
st.title("智扫通机器人智能客服")
st.divider()

# 初始化 Agent
if "agent" not in st.session_state:
    st.session_state["agent"] = ReactAgent()

# 初始化消息历史
if "messages" not in st.session_state:
    st.session_state["messages"] = []

# 显示历史消息
for message in st.session_state["messages"]:
    st.chat_message(message["role"]).write(message["content"])

# 用户输入
prompt = st.chat_input()

if prompt:
    # 显示用户消息
    st.chat_message("user").write(prompt)
    st.session_state["messages"].append({"role": "user", "content": prompt})
    
    # 生成回复
    response_messages = []
    with st.spinner("智能客服思考中..."):
        res_stream = st.session_state["agent"].execute_stream(prompt)
        
        # 流式显示
        def capture(generator, cache_list):
            for chunk in generator:
                cache_list.append(chunk)
                # 打字机效果
                for char in chunk:
                    time.sleep(0.01)
                    yield char
        
        # 显示助手回复
        st.chat_message("assistant").write_stream(
            capture(res_stream, response_messages)
        )
        
        # 保存回复
        st.session_state["messages"].append({
            "role": "assistant",
            "content": response_messages[-1]
        })
        
        st.rerun()

4.2 关键点解析

1. Session State 管理

# Agent 实例持久化
if "agent" not in st.session_state:
    st.session_state["agent"] = ReactAgent()

# 消息历史持久化
if "messages" not in st.session_state:
    st.session_state["messages"] = []

2. 流式捕获

def capture(generator, cache_list):
    for chunk in generator:
        cache_list.append(chunk)  # 保存完整内容
        for char in chunk:        # 逐字输出
            time.sleep(0.01)      # 打字机延迟
            yield char

3. 自动刷新

st.rerun()  # 重新渲染页面,显示更新后的消息历史

五、状态管理进阶

5.1 State 结构解析

# 典型的 Agent State 结构
{
    "messages": [           # 消息历史
        HumanMessage(...),
        AIMessage(...),
        ToolMessage(...),
        ...
    ],
    # 可以扩展自定义字段
    "user_id": "1001",
    "session_context": {...},
}

5.2 自定义 State 字段

from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages

class AgentState(TypedDict):
    messages: Annotated[list, add_messages]  # 自动合并消息
    user_id: str                              # 自定义字段
    report_mode: bool                         # 标记是否为报告生成场景

使用场景

  • 在中间件中读取/修改自定义字段
  • 跨步骤传递上下文信息
  • 实现复杂的业务逻辑

5.3 状态快照与恢复

# 保存检查点
checkpoints = []

for chunk in agent.stream(input_dict, stream_mode="checkpoints"):
    checkpoints.append(chunk)

# 恢复状态
restored_state = checkpoints[-1]['state']

六、生产环境考量

6.1 性能优化

问题:流式输出增加网络请求次数

解决方案

  1. 批量推送:累积一定内容后推送
  2. SSE(Server-Sent Events):保持长连接
  3. WebSocket:双向实时通信

6.2 错误处理

try:
    for chunk in agent.stream(input_dict, stream_mode="values"):
        yield chunk['messages'][-1].content
except Exception as e:
    logger.error(f"流式输出失败:{e}")
    yield "抱歉,处理过程中出现错误,请稍后重试。"

6.3 超时控制

from langgraph.config import get_config

for chunk in agent.stream(
    input_dict,
    stream_mode="values",
    config={"timeout": 30}  # 30 秒超时
):
    yield chunk

七、调试技巧

7.1 多模式组合

# 同时获取 values 和 debug
for chunk in agent.stream(
    input_dict,
    stream_mode=["values", "debug"]
):
    mode, data = chunk
    if mode == "values":
        print(f"状态:{len(data['messages'])} 条消息")
    elif mode == "debug":
        print(f"调试:{data['type']}")

7.2 日志记录

import logging

logger = logging.getLogger("agent")

for chunk in agent.stream(input_dict, stream_mode="values"):
    latest = chunk['messages'][-1]
    msg_type = type(latest).__name__
    
    logger.debug(f"消息类型:{msg_type}")
    logger.debug(f"消息内容:{latest.content[:100]}")

八、本章小结

核心要点

  1. stream_mode 选择

    • values:调试、完整状态
    • updates:生产、增量更新
    • messages:打字机效果
    • debug:深度调试
  2. 最新消息获取chunk['messages'][-1]

  3. Streamlit 集成:session_state + write_stream

  4. 状态管理:可扩展自定义字段

下章预告

下一篇我们将讲解 中间件系统,学习:

  • Hooks 机制原理
  • 工具调用监控
  • 动态提示词切换
  • 安全防护实现

  1. Agent 智能体开发实战指南(一):从 LLM 到 Agent 的认知升级
  2. Agent 智能体开发实战指南(二):工具调用系统深度解析
  3. Agent 智能体开发实战指南(三):ReAct 框架深度解析
  4. Agent 智能体开发实战指南(四):流式输出与状态管理
  5. Agent 智能体开发实战指南(五):中间件系统与动态提示词
  6. Agent 智能体开发实战指南(六):RAG 与向量存储实战
  7. Agent 智能体开发实战指南(七):项目架构设计与工程化实践
  8. Agent 智能体开发实战指南(八):UI 集成与生产部署

本文是《Agent 智能体开发实战指南》系列的第四篇,下一篇将深入讲解中间件系统。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐