12、LangChain 组件:Streaming (流式传输)
·
详细介绍 Streaming 核心功能、使用场景及实战示例,适用于需要提升 LLM 应用响应速度的开发者。
文章目录
一、概述
LangChain 提供的流传输系统可实时反馈代理运行状态,显著提升 LLM 应用的响应速度和用户体验。核心能力包括:
- 📊 流传输代理进度(每步状态更新)
- 🔤 流传输 LLM 令牌(逐词生成)
- 🤔 流传输思考/推理过程(模型内部推理步骤)
- 🎯 流传输自定义信号(如数据加载进度)
- 📦 多模式组合流传输(进度+令牌+自定义数据)
二、支持的流模式
通过 stream 或 astream 方法传入模式列表,支持以下三种核心模式:
| 模式 | 描述 |
|---|---|
updates |
代理每步执行后的状态更新(多节点执行时分开流传输) |
messages |
流传输 LLM 调用产生的 (token, metadata) 元组 |
custom |
通过流写入器传输自定义数据(如工具执行日志) |
三、核心功能实战
3.1 流传输代理进度
场景:实时获取代理执行的每一步状态(LLM 调用→工具执行→最终响应)
from langchain.agents import create_agent
def get_weather(city: str) -> str:
"""获取指定城市天气"""
return f"It's always sunny in {city}!"
# 创建代理
agent = create_agent(
model="gpt-5-nano",
tools=[get_weather],
)
# 流传输代理进度
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "What is the weather in SF?"}]},
stream_mode="updates",
version="v2",
):
if chunk["type"] == "updates":
for step, data in chunk["data"].items():
print(f"步骤: {step}")
print(f"内容: {data['messages'][-1].content_blocks}\n")
输出结果:
步骤: model
内容: [{'type': 'tool_call', 'name': 'get_weather', 'args': {'city': 'San Francisco'}, 'id': 'call_OW2NYNsNSKhRZpjW0wm2Aszd'}]
步骤: tools
内容: [{'type': 'text', 'text': "It's always sunny in San Francisco!"}]
步骤: model
内容: [{'type': 'text', 'text': "It's always sunny in San Francisco!"}]
3.2 流传输 LLM 令牌
场景:逐词获取 LLM 生成的内容(含工具调用过程)
from langchain.agents import create_agent
def get_weather(city: str) -> str:
"""获取指定城市天气"""
return f"It's always sunny in {city}!"
agent = create_agent(
model="gpt-5-nano",
tools=[get_weather],
)
# 流传输 LLM 令牌
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "What is the weather in SF?"}]},
stream_mode="messages",
version="v2",
):
if chunk["type"] == "messages":
token, metadata = chunk["data"]
print(f"节点: {metadata['langgraph_node']}")
print(f"内容: {token.content_blocks}\n")
关键说明:
- 工具调用过程会以
tool_call_chunk形式逐段流传输 - 最终响应会以文本块形式逐词生成
- 支持实时渲染打字机效果
3.3 流传输自定义更新
场景:在工具执行过程中插入自定义状态反馈(如数据加载进度)
from langchain.agents import create_agent
from langgraph.config import get_stream_writer
def get_weather(city: str) -> str:
"""获取指定城市天气(含自定义流更新)"""
writer = get_stream_writer()
# 发送自定义流消息
writer(f"正在查询城市: {city}")
writer(f"已获取 {city} 天气数据")
return f"It's always sunny in {city}!"
agent = create_agent(
model="claude-sonnet-4-6",
tools=[get_weather],
)
# 流传输自定义更新
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "What is the weather in SF?"}]},
stream_mode="custom",
version="v2",
):
if chunk["type"] == "custom":
print(chunk["data"])
输出结果:
正在查询城市: San Francisco
已获取 San Francisco 天气数据
⚠️ 注意:使用
get_stream_writer()后,工具只能在 LangGraph 执行上下文中调用
3.4 多模式流传输
场景:同时获取代理进度、LLM 令牌和自定义更新
from langchain.agents import create_agent
from langgraph.config import get_stream_writer
def get_weather(city: str) -> str:
"""获取指定城市天气"""
writer = get_stream_writer()
writer(f"查询中: {city}")
return f"It's always sunny in {city}!"
agent = create_agent(
model="gpt-5-nano",
tools=[get_weather],
)
# 多模式流传输(进度+自定义)
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "What is the weather in SF?"}]},
stream_mode=["updates", "custom"],
version="v2",
):
print(f"流模式: {chunk['type']}")
print(f"内容: {chunk['data']}\n")
四、常见应用模式
4.1 流传输思考/推理令牌
场景:展示模型的内部推理过程(需模型支持推理输出)
from langchain.agents import create_agent
from langchain.messages import AIMessageChunk
from langchain_anthropic import ChatAnthropic
from langchain_core.runnables import Runnable
def get_weather(city: str) -> str:
"""获取指定城市天气"""
return f"It's always sunny in {city}!"
# 配置支持推理的模型
model = ChatAnthropic(
model_name="claude-sonnet-4-6",
thinking={"type": "enabled", "budget_tokens": 5000},
)
agent: Runnable = create_agent(
model=model,
tools=[get_weather],
)
# 流传输推理过程
for token, metadata in agent.stream(
{"messages": [{"role": "user", "content": "What is the weather in SF?"}]},
stream_mode="messages",
):
if not isinstance(token, AIMessageChunk):
continue
# 过滤推理块和文本块
reasoning = [b for b in token.content_blocks if b["type"] == "reasoning"]
text = [b for b in token.content_blocks if b["type"] == "text"]
if reasoning:
print(f"[思考] {reasoning[0]['reasoning']}", end="")
if text:
print(text[0]["text"], end="")
输出结果:
[思考] The user is asking about the weather in San Francisco. I have a tool
[思考] available to get this information. Let me call the get_weather tool
[思考] with "San Francisco" as the city parameter.
The weather in San Francisco is: It's always sunny in San Francisco!
🔗 模型支持查询:models.dev
4.2 流传输工具调用
场景:实时获取工具调用的增量 JSON 和完整执行结果
from typing import Any
from langchain.agents import create_agent
from langchain.messages import AIMessage, AIMessageChunk, AnyMessage, ToolMessage
def get_weather(city: str) -> str:
"""获取指定城市天气"""
return f"It's always sunny in {city}!"
agent = create_agent("openai:gpt-5.2", tools=[get_weather])
def _render_message_chunk(token: AIMessageChunk) -> None:
"""渲染消息块"""
if token.text:
print(token.text, end="|")
if token.tool_call_chunks:
print(token.tool_call_chunks)
def _render_completed_message(message: AnyMessage) -> None:
"""渲染完整消息"""
if isinstance(message, AIMessage) and message.tool_calls:
print(f"工具调用: {message.tool_calls}")
if isinstance(message, ToolMessage):
print(f"工具响应: {message.content_blocks}")
# 执行流传输
input_message = {"role": "user", "content": "What is the weather in Boston?"}
for chunk in agent.stream(
{"messages": [input_message]},
stream_mode=["messages", "updates"],
version="v2",
):
if chunk["type"] == "messages":
token, metadata = chunk["data"]
if isinstance(token, AIMessageChunk):
_render_message_chunk(token)
elif chunk["type"] == "updates":
for source, update in chunk["data"].items():
if source in ("model", "tools"):
_render_completed_message(update["messages"][-1])
4.3 访问完整消息
场景:在流传输过程中获取完整的消息对象(适用于非状态跟踪场景)
input_message = {"role": "user", "content": "What is the weather in Boston?"}
full_message = None
for chunk in agent.stream(
{"messages": [input_message]},
stream_mode=["messages", "updates"],
version="v2",
):
if chunk["type"] == "messages":
token, metadata = chunk["data"]
if isinstance(token, AIMessageChunk):
# 聚合消息块
full_message = token if full_message is None else full_message + token
# 检查是否为最后一块
if token.chunk_position == "last":
if full_message.tool_calls:
print(f"完整工具调用: {full_message.tool_calls}")
full_message = None
4.4 人机交互流
场景:流传输过程中插入人工审批环节(如工具调用确认)
from typing import Any
from langchain.agents import create_agent
from langchain.agents.middleware import HumanInTheLoopMiddleware
from langchain.messages import AIMessage, AIMessageChunk, AnyMessage, ToolMessage
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import Command, Interrupt
def get_weather(city: str) -> str:
"""获取指定城市天气"""
return f"It's always sunny in {city}!"
# 配置检查点和人机交互中间件
checkpointer = InMemorySaver()
agent = create_agent(
"openai:gpt-5.2",
tools=[get_weather],
middleware=[
HumanInTheLoopMiddleware(interrupt_on={"get_weather": True}),
],
checkpointer=checkpointer,
)
# 流传输并处理中断
input_message = {
"role": "user",
"content": "Can you look up the weather in Boston and San Francisco?"
}
config = {"configurable": {"thread_id": "some_id"}}
interrupts = []
for chunk in agent.stream(
{"messages": [input_message]},
config=config,
stream_mode=["messages", "updates"],
version="v2",
):
if chunk["type"] == "updates" and "__interrupt__" in chunk["data"]:
interrupts.extend(chunk["data"]["__interrupt__"])
# 显示审批请求
for interrupt in chunk["data"]["__interrupt__"]:
for req in interrupt.value["action_requests"]:
print(f"需要审批: {req['description']}")
# 处理审批(示例:编辑波士顿请求,批准旧金山请求)
decisions = {}
for interrupt in interrupts:
decisions[interrupt.id] = {
"decisions": [
{
"type": "edit",
"edited_action": {"name": "get_weather", "args": {"city": "Boston, U.K."}}
},
{"type": "approve"}
]
}
# 继续执行流传输
for chunk in agent.stream(
Command(resume=decisions),
config=config,
stream_mode=["messages", "updates"],
version="v2",
):
# 处理后续流数据...
pass
4.5 子代理流传输
场景:多代理系统中区分不同代理的流输出
from typing import Any
from langchain.agents import create_agent
from langchain.chat_models import init_chat_model
from langchain.messages import AIMessage, AnyMessage
def get_weather(city: str) -> str:
"""获取指定城市天气"""
return f"It's always sunny in {city}!"
# 创建子代理(天气代理)
weather_model = init_chat_model("openai:gpt-5.2")
weather_agent = create_agent(
model=weather_model,
tools=[get_weather],
name="weather_agent", # 指定代理名称
)
# 创建父代理(管理代理)
def call_weather_agent(query: str) -> str:
"""调用天气代理"""
result = weather_agent.invoke({
"messages": [{"role": "user", "content": query}]
})
return result["messages"][-1].text
supervisor_model = init_chat_model("openai:gpt-5.2")
agent = create_agent(
model=supervisor_model,
tools=[call_weather_agent],
name="supervisor", # 指定代理名称
)
# 流传输并区分代理来源
current_agent = None
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "What is the weather in Boston?"}]},
stream_mode=["messages", "updates"],
subgraphs=True, # 启用子图流传输
version="v2",
):
if chunk["type"] == "messages":
token, metadata = chunk["data"]
# 识别代理名称
if agent_name := metadata.get("lc_agent_name"):
if agent_name != current_agent:
print(f"\n🤖 {agent_name}: ")
current_agent = agent_name
if isinstance(token, AIMessageChunk):
print(token.text, end="|")
输出结果:
🤖 supervisor:
[{'name': 'call_weather_agent', 'args': '', 'id': 'call_asorzUf0mB6sb7MiKfgojp7I', 'index': 0, 'type': 'tool_call_chunk'}]
...
🤖 weather_agent:
[{'name': 'get_weather', 'args': '', 'id': 'call_LZ89lT8fW6w8vqck5pZeaDIx', 'index': 0, 'type': 'tool_call_chunk'}]
...
Boston| weather| right| now|:| **|Sunny|**|.
五、禁用流传输
场景:多代理系统中控制特定模型的流输出
from langchain_openai import ChatOpenAI
# 禁用流传输
model = ChatOpenAI(
model="gpt-4.1",
streaming=False # 禁用流传输
)
# 兼容不支持 streaming 参数的模型
model = ChatOpenAI(
model="gpt-4.1",
disable_streaming=True # 通用禁用参数
)
六、v2 流格式(LangGraph ≥1.1)
v2 格式提供统一的流输出结构,支持类型 narrowing 和 Pydantic 模型转换:
核心优势
- 统一格式:所有流块均为
{"type": "...", "ns": "...", "data": "..."} - 简化调用:无需手动解包元组
- 增强的
invoke()返回值:分离状态和中断信息
使用示例
# 流传输示例
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "What is the weather in SF?"}]},
stream_mode=["updates", "custom"],
version="v2",
):
print(f"类型: {chunk['type']}")
print(f"数据: {chunk['data']}\n")
# invoke() 增强示例
result = agent.invoke(
{"messages": [{"role": "user", "content": "Hello"}]},
version="v2",
)
print("状态结果:", result.value)
print("中断信息:", result.interrupts)
七、相关资源
- 前端流传输 - 构建 React 实时交互界面
- Chat 模型流传输 - 直接流传输模型输出
- 推理功能 - 配置模型推理输出
- 标准内容块 - 内容格式规范
- LangGraph 流传输 - 高级流传输选项
- LangChain 官方文档
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)