04- Agent 智能体开发实战指南(四):流式输出与状态管理
·
Agent 智能体开发实战指南(四):流式输出与状态管理
系列导读:这是《Agent 智能体开发实战指南》系列的第四篇,将深入讲解 LangChain Agent 的流式输出系统,包括各种 stream_mode 模式详解、状态管理、调试技巧,以及如何在生产环境中实现实时反馈。
一、为什么需要流式输出?
1.1 传统方式 vs 流式输出
传统方式(agent.invoke):
# 等待全部执行完成后一次性返回
result = agent.invoke(input_dict)
print(result) # 几秒后,突然输出全部内容
问题:
- 用户等待时间长
流式输出(agent.stream):
# 实时输出每一步的结果
for chunk in agent.stream(input_dict):
print(chunk, end="", flush=True) # 实时显示
优势:
- 用户实时看到进展,让输出像对话一样,体验更好
- 减少输出时间,优化用户体验
1.2 应用场景对比
| 场景 | 推荐方式 | 原因 |
|---|---|---|
| 后台批处理 | invoke | 无需实时反馈 |
| CLI 调试 | stream | 观察执行过程 |
| 聊天机器人 | stream | 打字机效果 |
| API 服务 | stream | SSE 推送 |
二、stream_mode 详解
2.1 七种模式总览
| 模式 | 输出内容 | 适用场景 |
|---|---|---|
| values | 完整的 State 字典 | 调试、状态快照 |
| updates | 增量更新 | 生产环境、UI 更新 |
| messages | LLM token 流 | 聊天机器人打字机效果 |
| debug | 详细执行事件 | 开发调试 |
| custom | 自定义数据 | 进度通知 |
| checkpoints | 检查点事件 | 状态恢复 |
| tasks | 任务生命周期 | 任务监控 |
2.2 values 模式:完整状态快照
特点:每个步骤后输出完整的 State 字典
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "计算我的 BMI"}]},
stream_mode="values"
):
# chunk 是完整的 State 字典
print(f"当前消息数:{len(chunk['messages'])}")
print(f"最新消息:{chunk['messages'][-1].content}")
输出示例:
当前消息数:1
最新消息:计算我的 BMI
---
当前消息数:2
最新消息:### 思考
要计算 BMI,需要知道体重和身高...
---
当前消息数:3
最新消息:90 # 工具返回
---
...
适用场景:
- 调试:查看每一步的完整状态
- 日志记录:保存完整执行历史
- 状态恢复:可以随时从任意点继续
注意事项:
- ⚠️ 如果 State 数据量大,传输开销大
- ⚠️ 每次都是完整拷贝,内存占用高
2.3 updates 模式:增量更新
特点:只输出当前步骤的更新内容
for chunk in agent.stream(
input_dict,
stream_mode="updates"
):
# chunk 是字典,键是节点名,值是该节点的输出
print(f"节点更新:{chunk}")
输出示例:
节点更新:{'agent': {'messages': [AIMessage(content='### 思考...')]}}
节点更新:{'tools': {'messages': [ToolMessage(content='90')]}}
节点更新:{'agent': {'messages': [AIMessage(content='### 观察...')]}}
适用场景:
- 生产环境:更高效
- 前端 UI:只需更新变化部分
- 带宽受限:减少传输量
注意事项:
- ⚠️ 需要在客户端维护完整 State
- ⚠️ 通过不断 merge 更新来拼凑完整状态
2.4 messages 模式:LLM token 流
特点:实时输出 LLM 生成的每个 token
for chunk in agent.stream(
input_dict,
stream_mode="messages"
):
# chunk 是元组:(message_chunk, metadata)
message_chunk, metadata = chunk
print(message_chunk.content, end="", flush=True)
输出示例:
### 思
考
要
计
算
B
M
I
...
适用场景:
- 聊天机器人:打字机效果
- 实时反馈:用户立即看到内容
- 长文本生成:减少等待焦虑
注意事项:
- ⚠️ 此模式独立于 State 结构
- ⚠️ 只捕获 LLM 输出,不包含工具调用
2.5 debug 模式:详细调试信息
特点:输出图执行的详细事件
for chunk in agent.stream(
input_dict,
stream_mode="debug"
):
print(chunk)
输出示例:
{'type': 'node_start', 'node': 'agent', 'timestamp': '...'}
{'type': 'node_end', 'node': 'agent', 'duration': 1.23}
{'type': 'tool_call', 'tool': 'get_weight', 'args': {...}}
{'type': 'state_update', 'changes': {...}}
适用场景:
- 开发调试:排查逻辑错误
- 性能分析:找出瓶颈节点
- 学习理解:深入理解执行流程
注意事项:
- 信息量巨大
- 不适合生产环境
- 不适合面向用户
2.6 模式对比总结
| 维度 | values | updates | messages | debug |
|---|---|---|---|---|
| 数据量 | 大 | 小 | 最小 | 巨大 |
| 完整性 | 完整 | 增量 | 仅 token | 元数据 |
| 调试价值 | 高 | 中 | 低 | 最高 |
| 生产适用 | ⚠️ | ✅ | ✅ | ❌ |
三、实战:获取最新消息
3.1 标准写法
for chunk in agent.stream(input_dict, stream_mode="values"):
# 获取最新消息
latest_message = chunk['messages'][-1]
# 处理消息内容
if latest_message.content:
print(latest_message.content)
# 检查是否有工具调用
try:
if latest_message.tool_calls:
print(f"工具调用:{[tc['name'] for tc in latest_message.tool_calls]}")
except AttributeError:
# 某些消息类型没有 tool_calls 属性
pass
3.2 为什么用 [-1]?
原因:流式输出时,每个 chunk 包含从开始到当前的所有消息
chunk 1: [用户消息]
chunk 2: [用户消息,Agent 思考]
chunk 3: [用户消息,Agent 思考,工具返回]
chunk 4: [用户消息,Agent 思考,工具返回,Agent 观察]
...
最新消息 = 当前步骤的输出 = chunk[‘messages’][-1]
3.3 完整处理逻辑
def process_stream(agent, query):
input_dict = {
"messages": [{"role": "user", "content": query}]
}
for chunk in agent.stream(input_dict, stream_mode="values"):
latest = chunk['messages'][-1]
msg_type = type(latest).__name__
if msg_type == "HumanMessage":
print(f"👤 用户:{latest.content}")
elif msg_type == "AIMessage":
if latest.content:
print(f"🤖 Agent: {latest.content}")
# 检查工具调用
if hasattr(latest, 'tool_calls') and latest.tool_calls:
tools = [tc['name'] for tc in latest.tool_calls]
print(f"🔧 调用工具:{tools}")
elif msg_type == "ToolMessage":
print(f"📦 工具返回:{latest.content[:50]}...")
print("-" * 50)
四、Streamlit 集成实战
4.1 完整示例
import time
import streamlit as st
from agent.react_agent import ReactAgent
# 页面配置
st.title("智扫通机器人智能客服")
st.divider()
# 初始化 Agent
if "agent" not in st.session_state:
st.session_state["agent"] = ReactAgent()
# 初始化消息历史
if "messages" not in st.session_state:
st.session_state["messages"] = []
# 显示历史消息
for message in st.session_state["messages"]:
st.chat_message(message["role"]).write(message["content"])
# 用户输入
prompt = st.chat_input()
if prompt:
# 显示用户消息
st.chat_message("user").write(prompt)
st.session_state["messages"].append({"role": "user", "content": prompt})
# 生成回复
response_messages = []
with st.spinner("智能客服思考中..."):
res_stream = st.session_state["agent"].execute_stream(prompt)
# 流式显示
def capture(generator, cache_list):
for chunk in generator:
cache_list.append(chunk)
# 打字机效果
for char in chunk:
time.sleep(0.01)
yield char
# 显示助手回复
st.chat_message("assistant").write_stream(
capture(res_stream, response_messages)
)
# 保存回复
st.session_state["messages"].append({
"role": "assistant",
"content": response_messages[-1]
})
st.rerun()
4.2 关键点解析
1. Session State 管理:
# Agent 实例持久化
if "agent" not in st.session_state:
st.session_state["agent"] = ReactAgent()
# 消息历史持久化
if "messages" not in st.session_state:
st.session_state["messages"] = []
2. 流式捕获:
def capture(generator, cache_list):
for chunk in generator:
cache_list.append(chunk) # 保存完整内容
for char in chunk: # 逐字输出
time.sleep(0.01) # 打字机延迟
yield char
3. 自动刷新:
st.rerun() # 重新渲染页面,显示更新后的消息历史
五、状态管理进阶
5.1 State 结构解析
# 典型的 Agent State 结构
{
"messages": [ # 消息历史
HumanMessage(...),
AIMessage(...),
ToolMessage(...),
...
],
# 可以扩展自定义字段
"user_id": "1001",
"session_context": {...},
}
5.2 自定义 State 字段
from typing import TypedDict, Annotated
from langgraph.graph.message import add_messages
class AgentState(TypedDict):
messages: Annotated[list, add_messages] # 自动合并消息
user_id: str # 自定义字段
report_mode: bool # 标记是否为报告生成场景
使用场景:
- 在中间件中读取/修改自定义字段
- 跨步骤传递上下文信息
- 实现复杂的业务逻辑
5.3 状态快照与恢复
# 保存检查点
checkpoints = []
for chunk in agent.stream(input_dict, stream_mode="checkpoints"):
checkpoints.append(chunk)
# 恢复状态
restored_state = checkpoints[-1]['state']
六、生产环境考量
6.1 性能优化
问题:流式输出增加网络请求次数
解决方案:
- 批量推送:累积一定内容后推送
- SSE(Server-Sent Events):保持长连接
- WebSocket:双向实时通信
6.2 错误处理
try:
for chunk in agent.stream(input_dict, stream_mode="values"):
yield chunk['messages'][-1].content
except Exception as e:
logger.error(f"流式输出失败:{e}")
yield "抱歉,处理过程中出现错误,请稍后重试。"
6.3 超时控制
from langgraph.config import get_config
for chunk in agent.stream(
input_dict,
stream_mode="values",
config={"timeout": 30} # 30 秒超时
):
yield chunk
七、调试技巧
7.1 多模式组合
# 同时获取 values 和 debug
for chunk in agent.stream(
input_dict,
stream_mode=["values", "debug"]
):
mode, data = chunk
if mode == "values":
print(f"状态:{len(data['messages'])} 条消息")
elif mode == "debug":
print(f"调试:{data['type']}")
7.2 日志记录
import logging
logger = logging.getLogger("agent")
for chunk in agent.stream(input_dict, stream_mode="values"):
latest = chunk['messages'][-1]
msg_type = type(latest).__name__
logger.debug(f"消息类型:{msg_type}")
logger.debug(f"消息内容:{latest.content[:100]}")
八、本章小结
核心要点
-
stream_mode 选择:
- values:调试、完整状态
- updates:生产、增量更新
- messages:打字机效果
- debug:深度调试
-
最新消息获取:
chunk['messages'][-1] -
Streamlit 集成:session_state + write_stream
-
状态管理:可扩展自定义字段
下章预告
下一篇我们将讲解 中间件系统,学习:
- Hooks 机制原理
- 工具调用监控
- 动态提示词切换
- 安全防护实现
- Agent 智能体开发实战指南(一):从 LLM 到 Agent 的认知升级
- Agent 智能体开发实战指南(二):工具调用系统深度解析
- Agent 智能体开发实战指南(三):ReAct 框架深度解析
- Agent 智能体开发实战指南(四):流式输出与状态管理
- Agent 智能体开发实战指南(五):中间件系统与动态提示词
- Agent 智能体开发实战指南(六):RAG 与向量存储实战
- Agent 智能体开发实战指南(七):项目架构设计与工程化实践
- Agent 智能体开发实战指南(八):UI 集成与生产部署
本文是《Agent 智能体开发实战指南》系列的第四篇,下一篇将深入讲解中间件系统。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)