详细介绍 Streaming 核心功能、使用场景及实战示例,适用于需要提升 LLM 应用响应速度的开发者。

一、概述

LangChain 提供的流传输系统可实时反馈代理运行状态,显著提升 LLM 应用的响应速度和用户体验。核心能力包括:

  • 📊 流传输代理进度(每步状态更新)
  • 🔤 流传输 LLM 令牌(逐词生成)
  • 🤔 流传输思考/推理过程(模型内部推理步骤)
  • 🎯 流传输自定义信号(如数据加载进度)
  • 📦 多模式组合流传输(进度+令牌+自定义数据)

二、支持的流模式

通过 streamastream 方法传入模式列表,支持以下三种核心模式:

模式 描述
updates 代理每步执行后的状态更新(多节点执行时分开流传输)
messages 流传输 LLM 调用产生的 (token, metadata) 元组
custom 通过流写入器传输自定义数据(如工具执行日志)

三、核心功能实战

3.1 流传输代理进度

场景:实时获取代理执行的每一步状态(LLM 调用→工具执行→最终响应)

from langchain.agents import create_agent

def get_weather(city: str) -> str:
    """获取指定城市天气"""
    return f"It's always sunny in {city}!"

# 创建代理
agent = create_agent(
    model="gpt-5-nano",
    tools=[get_weather],
)

# 流传输代理进度
for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "What is the weather in SF?"}]},
    stream_mode="updates",
    version="v2",
):
    if chunk["type"] == "updates":
        for step, data in chunk["data"].items():
            print(f"步骤: {step}")
            print(f"内容: {data['messages'][-1].content_blocks}\n")

输出结果

步骤: model
内容: [{'type': 'tool_call', 'name': 'get_weather', 'args': {'city': 'San Francisco'}, 'id': 'call_OW2NYNsNSKhRZpjW0wm2Aszd'}]

步骤: tools
内容: [{'type': 'text', 'text': "It's always sunny in San Francisco!"}]

步骤: model
内容: [{'type': 'text', 'text': "It's always sunny in San Francisco!"}]

3.2 流传输 LLM 令牌

场景:逐词获取 LLM 生成的内容(含工具调用过程)

from langchain.agents import create_agent

def get_weather(city: str) -> str:
    """获取指定城市天气"""
    return f"It's always sunny in {city}!"

agent = create_agent(
    model="gpt-5-nano",
    tools=[get_weather],
)

# 流传输 LLM 令牌
for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "What is the weather in SF?"}]},
    stream_mode="messages",
    version="v2",
):
    if chunk["type"] == "messages":
        token, metadata = chunk["data"]
        print(f"节点: {metadata['langgraph_node']}")
        print(f"内容: {token.content_blocks}\n")

关键说明

  • 工具调用过程会以 tool_call_chunk 形式逐段流传输
  • 最终响应会以文本块形式逐词生成
  • 支持实时渲染打字机效果

3.3 流传输自定义更新

场景:在工具执行过程中插入自定义状态反馈(如数据加载进度)

from langchain.agents import create_agent
from langgraph.config import get_stream_writer

def get_weather(city: str) -> str:
    """获取指定城市天气(含自定义流更新)"""
    writer = get_stream_writer()
    # 发送自定义流消息
    writer(f"正在查询城市: {city}")
    writer(f"已获取 {city} 天气数据")
    return f"It's always sunny in {city}!"

agent = create_agent(
    model="claude-sonnet-4-6",
    tools=[get_weather],
)

# 流传输自定义更新
for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "What is the weather in SF?"}]},
    stream_mode="custom",
    version="v2",
):
    if chunk["type"] == "custom":
        print(chunk["data"])

输出结果

正在查询城市: San Francisco
已获取 San Francisco 天气数据

⚠️ 注意:使用 get_stream_writer() 后,工具只能在 LangGraph 执行上下文中调用

3.4 多模式流传输

场景:同时获取代理进度、LLM 令牌和自定义更新

from langchain.agents import create_agent
from langgraph.config import get_stream_writer

def get_weather(city: str) -> str:
    """获取指定城市天气"""
    writer = get_stream_writer()
    writer(f"查询中: {city}")
    return f"It's always sunny in {city}!"

agent = create_agent(
    model="gpt-5-nano",
    tools=[get_weather],
)

# 多模式流传输(进度+自定义)
for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "What is the weather in SF?"}]},
    stream_mode=["updates", "custom"],
    version="v2",
):
    print(f"流模式: {chunk['type']}")
    print(f"内容: {chunk['data']}\n")

四、常见应用模式

4.1 流传输思考/推理令牌

场景:展示模型的内部推理过程(需模型支持推理输出)

from langchain.agents import create_agent
from langchain.messages import AIMessageChunk
from langchain_anthropic import ChatAnthropic
from langchain_core.runnables import Runnable

def get_weather(city: str) -> str:
    """获取指定城市天气"""
    return f"It's always sunny in {city}!"

# 配置支持推理的模型
model = ChatAnthropic(
    model_name="claude-sonnet-4-6",
    thinking={"type": "enabled", "budget_tokens": 5000},
)

agent: Runnable = create_agent(
    model=model,
    tools=[get_weather],
)

# 流传输推理过程
for token, metadata in agent.stream(
    {"messages": [{"role": "user", "content": "What is the weather in SF?"}]},
    stream_mode="messages",
):
    if not isinstance(token, AIMessageChunk):
        continue
    # 过滤推理块和文本块
    reasoning = [b for b in token.content_blocks if b["type"] == "reasoning"]
    text = [b for b in token.content_blocks if b["type"] == "text"]
    
    if reasoning:
        print(f"[思考] {reasoning[0]['reasoning']}", end="")
    if text:
        print(text[0]["text"], end="")

输出结果

[思考] The user is asking about the weather in San Francisco. I have a tool
[思考]  available to get this information. Let me call the get_weather tool
[思考]  with "San Francisco" as the city parameter.
The weather in San Francisco is: It's always sunny in San Francisco!

🔗 模型支持查询:models.dev

4.2 流传输工具调用

场景:实时获取工具调用的增量 JSON 和完整执行结果

from typing import Any
from langchain.agents import create_agent
from langchain.messages import AIMessage, AIMessageChunk, AnyMessage, ToolMessage

def get_weather(city: str) -> str:
    """获取指定城市天气"""
    return f"It's always sunny in {city}!"

agent = create_agent("openai:gpt-5.2", tools=[get_weather])

def _render_message_chunk(token: AIMessageChunk) -> None:
    """渲染消息块"""
    if token.text:
        print(token.text, end="|")
    if token.tool_call_chunks:
        print(token.tool_call_chunks)

def _render_completed_message(message: AnyMessage) -> None:
    """渲染完整消息"""
    if isinstance(message, AIMessage) and message.tool_calls:
        print(f"工具调用: {message.tool_calls}")
    if isinstance(message, ToolMessage):
        print(f"工具响应: {message.content_blocks}")

# 执行流传输
input_message = {"role": "user", "content": "What is the weather in Boston?"}
for chunk in agent.stream(
    {"messages": [input_message]},
    stream_mode=["messages", "updates"],
    version="v2",
):
    if chunk["type"] == "messages":
        token, metadata = chunk["data"]
        if isinstance(token, AIMessageChunk):
            _render_message_chunk(token)
    elif chunk["type"] == "updates":
        for source, update in chunk["data"].items():
            if source in ("model", "tools"):
                _render_completed_message(update["messages"][-1])

4.3 访问完整消息

场景:在流传输过程中获取完整的消息对象(适用于非状态跟踪场景)

input_message = {"role": "user", "content": "What is the weather in Boston?"}
full_message = None

for chunk in agent.stream(
    {"messages": [input_message]},
    stream_mode=["messages", "updates"],
    version="v2",
):
    if chunk["type"] == "messages":
        token, metadata = chunk["data"]
        if isinstance(token, AIMessageChunk):
            # 聚合消息块
            full_message = token if full_message is None else full_message + token
            # 检查是否为最后一块
            if token.chunk_position == "last":
                if full_message.tool_calls:
                    print(f"完整工具调用: {full_message.tool_calls}")
                full_message = None

4.4 人机交互流

场景:流传输过程中插入人工审批环节(如工具调用确认)

from typing import Any
from langchain.agents import create_agent
from langchain.agents.middleware import HumanInTheLoopMiddleware
from langchain.messages import AIMessage, AIMessageChunk, AnyMessage, ToolMessage
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import Command, Interrupt

def get_weather(city: str) -> str:
    """获取指定城市天气"""
    return f"It's always sunny in {city}!"

# 配置检查点和人机交互中间件
checkpointer = InMemorySaver()
agent = create_agent(
    "openai:gpt-5.2",
    tools=[get_weather],
    middleware=[
        HumanInTheLoopMiddleware(interrupt_on={"get_weather": True}),
    ],
    checkpointer=checkpointer,
)

# 流传输并处理中断
input_message = {
    "role": "user",
    "content": "Can you look up the weather in Boston and San Francisco?"
}
config = {"configurable": {"thread_id": "some_id"}}
interrupts = []

for chunk in agent.stream(
    {"messages": [input_message]},
    config=config,
    stream_mode=["messages", "updates"],
    version="v2",
):
    if chunk["type"] == "updates" and "__interrupt__" in chunk["data"]:
        interrupts.extend(chunk["data"]["__interrupt__"])
        # 显示审批请求
        for interrupt in chunk["data"]["__interrupt__"]:
            for req in interrupt.value["action_requests"]:
                print(f"需要审批: {req['description']}")

# 处理审批(示例:编辑波士顿请求,批准旧金山请求)
decisions = {}
for interrupt in interrupts:
    decisions[interrupt.id] = {
        "decisions": [
            {
                "type": "edit",
                "edited_action": {"name": "get_weather", "args": {"city": "Boston, U.K."}}
            },
            {"type": "approve"}
        ]
    }

# 继续执行流传输
for chunk in agent.stream(
    Command(resume=decisions),
    config=config,
    stream_mode=["messages", "updates"],
    version="v2",
):
    # 处理后续流数据...
    pass

4.5 子代理流传输

场景:多代理系统中区分不同代理的流输出

from typing import Any
from langchain.agents import create_agent
from langchain.chat_models import init_chat_model
from langchain.messages import AIMessage, AnyMessage

def get_weather(city: str) -> str:
    """获取指定城市天气"""
    return f"It's always sunny in {city}!"

# 创建子代理(天气代理)
weather_model = init_chat_model("openai:gpt-5.2")
weather_agent = create_agent(
    model=weather_model,
    tools=[get_weather],
    name="weather_agent",  # 指定代理名称
)

# 创建父代理(管理代理)
def call_weather_agent(query: str) -> str:
    """调用天气代理"""
    result = weather_agent.invoke({
        "messages": [{"role": "user", "content": query}]
    })
    return result["messages"][-1].text

supervisor_model = init_chat_model("openai:gpt-5.2")
agent = create_agent(
    model=supervisor_model,
    tools=[call_weather_agent],
    name="supervisor",  # 指定代理名称
)

# 流传输并区分代理来源
current_agent = None
for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "What is the weather in Boston?"}]},
    stream_mode=["messages", "updates"],
    subgraphs=True,  # 启用子图流传输
    version="v2",
):
    if chunk["type"] == "messages":
        token, metadata = chunk["data"]
        # 识别代理名称
        if agent_name := metadata.get("lc_agent_name"):
            if agent_name != current_agent:
                print(f"\n🤖 {agent_name}: ")
                current_agent = agent_name
        if isinstance(token, AIMessageChunk):
            print(token.text, end="|")

输出结果

🤖 supervisor:
[{'name': 'call_weather_agent', 'args': '', 'id': 'call_asorzUf0mB6sb7MiKfgojp7I', 'index': 0, 'type': 'tool_call_chunk'}]
...

🤖 weather_agent:
[{'name': 'get_weather', 'args': '', 'id': 'call_LZ89lT8fW6w8vqck5pZeaDIx', 'index': 0, 'type': 'tool_call_chunk'}]
...
Boston| weather| right| now|:| **|Sunny|**|.

五、禁用流传输

场景:多代理系统中控制特定模型的流输出

from langchain_openai import ChatOpenAI

# 禁用流传输
model = ChatOpenAI(
    model="gpt-4.1",
    streaming=False  # 禁用流传输
)

# 兼容不支持 streaming 参数的模型
model = ChatOpenAI(
    model="gpt-4.1",
    disable_streaming=True  # 通用禁用参数
)

六、v2 流格式(LangGraph ≥1.1)

v2 格式提供统一的流输出结构,支持类型 narrowing 和 Pydantic 模型转换:

核心优势

  1. 统一格式:所有流块均为 {"type": "...", "ns": "...", "data": "..."}
  2. 简化调用:无需手动解包元组
  3. 增强的 invoke() 返回值:分离状态和中断信息

使用示例

# 流传输示例
for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "What is the weather in SF?"}]},
    stream_mode=["updates", "custom"],
    version="v2",
):
    print(f"类型: {chunk['type']}")
    print(f"数据: {chunk['data']}\n")

# invoke() 增强示例
result = agent.invoke(
    {"messages": [{"role": "user", "content": "Hello"}]},
    version="v2",
)
print("状态结果:", result.value)
print("中断信息:", result.interrupts)

七、相关资源

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐