在聊天模型应用中,响应速度直接影响用户体验。试想,如果你问模型一个问题,等待了 20 秒才看到完整回答,难免感到焦虑。流式传输正是解决这一痛点的利器:它允许模型在生成内容的同时逐步输出,而不是一次性返回所有结果。本文将带你从基础用法到深层原理,全面掌握 LangChain 中的流式传输。


1、流式 vs 非流式

传统的 .invoke() 调用属于非流式传输。模型在内部完整地生成答案,然后一次性返回给用户。如果生成内容很长(比如一个 1000 字的笑话),可能要盯着白屏等待许久。

代码示例:

model = ChatOpenAI()
response = model.invoke("讲一个1000字的笑话")
# 等待约 20 秒后,一次性返回全文

而流式传输则不同。例如 DeepSeek 客户端默认使用流式返回,它一边生成一边显示,等待时间被切分成极短的间隔,用户能立即看到内容,体验显著提升。

LangChain 的聊天模型同样支持流式返回,提供了 .stream()(同步)和 .astream()(异步)两种方法。


2、同步流式传输:.stream()

.stream() 方法返回一个迭代器,该迭代器会在生成输出时同步产出消息块(chunk),可以逐块处理并实时展示给用户。

基础示例:

from langchain_deepseek import ChatDeepSeek

model = ChatDeepSeek(model="deepseek-chat")

chunks = []
for chunk in model.stream("讲一个100字的笑话"):
    chunks.append(chunk)
    print(chunk.content, end="|", flush=True)

输出效果(用竖线分割每个块):

这里每个 chunk 都是一个 AIMessageChunk 对象,它是 AIMessage 的片段版本。这些片段甚至可以直接相加:

print(chunks[0] + chunks[1] + chunks[2] + chunks[3] + chunks[4])

叠加后的内容会拼接起来,方便后续合并处理。


3、异步流式传输:.astream()

在构建高性能应用时,我们往往希望使用异步 IO 来避免阻塞。LangChain 提供了异步版本的 .astream(),结合 Python 的 asyncio 可以实现非阻塞的流式调用。

3.1、为什么要异步?—— 协程与事件循环

在讲解代码之前,先通过一个生活例子来理解同步异步的区别。

假设你需要“煮一壶水”(耗时 5 秒)并且“给朋友发一条短信”(耗时 2 秒)。

同步方式:一件一件做,总耗时 7 秒。在烧水的 5 秒内,CPU 空闲却不能去做发短信的任务,效率低下。

import time

def boil_water():
    print("开始煮水...")
    time.sleep(5)      # 阻塞等待
    print("水开了!")

def send_message():
    print("开始发短信...")
    time.sleep(2)      # 阻塞等待
    print("短信发送成功!")

def main():
    boil_water()
    send_message()

main()  # 总耗时约7秒

异步方式:使用 asyncio 和协程(async/await)。当某个任务遇到 await 等待时,它会主动让出控制权,事件循环会去执行其他任务,这样总耗时约等于最长的单个任务时间(5 秒)。

import asyncio

async def boil_water_async():
    print("开始煮水...")
    await asyncio.sleep(5)   # 让出控制权,不阻塞
    print("水开了!")

async def send_message_async():
    print("开始发短信...")
    await asyncio.sleep(2)   # 让出控制权
    print("短信发送成功!")

async def main():
    task1 = asyncio.create_task(boil_water_async())
    task2 = asyncio.create_task(send_message_async())
    await task1
    await task2

asyncio.run(main())

输出顺序大概为:

关键概念:

  • 协程(coroutine):用 async def 定义的函数,可以在执行中暂停并恢复,是用户态的轻量级任务。

  • 事件循环(event loop):asyncio 的核心,像一个待办事项管理器,循环检查所有任务,遇到等待的就暂停该任务、切换去执行另一个就绪的任务。

3.2、异步流式代码

了解了异步机制,再来看 .astream() 的用法就简单了

from langchain_deepseek import ChatDeepSeek
import asyncio

model = ChatDeepSeek(model="deepseek-chat")

async def async_stream():
    print("=== 异步调用 ===")
    async for chunk in model.astream("讲一个50字的笑话"):
        print(chunk.content, end="|", flush=True)

asyncio.run(async_stream())

输出效果:

在异步环境中(如 FastAPI 的 Web 服务),使用 .astream() 可以避免阻塞整个线程,是推荐的最佳实践。


4、流式传输是所有 Runnable 的能力

流式传输并不是聊天模型的专属能力。实际上,所有实现了 LangChain Runnable 接口的组件都具备 .stream() 和 .astream() 方法。Runnable 标准接口支持:

  • invoke:单输入 → 单输出

  • batch:多输入批量处理

  • stream:流式传输输出块

  • inspect:查看输入输出模式

  • compose:用 | 管道符组合多个 Runnable

因此,可以将模型与输出解析器组合成一条链(Chain),这条链整体也是一个 Runnable,同样支持流式。

4.1、使用 StrOutputParser 解析流式输出

当我们直接流式传输聊天模型时,得到的是 AIMessageChunk 对象。但通常我们只关心文本内容。StrOutputParser 能自动从每个 chunk 中提取出纯文本字符串,让流式处理更加流畅。

from langchain_deepseek import ChatDeepSeek
from langchain_core.output_parsers import StrOutputParser

model = ChatDeepSeek(model="deepseek-chat")
parser = StrOutputParser()

chain = model | parser

for chunk in chain.stream("写一段关于爱情的歌词,需要5句话"):
    print(chunk, end="|", flush=True)

输出效果:

现在每个块都是字符串,可以直接拼接显示


5、自定义流式解析器:按句子输出

默认情况下,流式返回的块大小不确定,可能是单个字符或几个字。如果希望按句子为单位进行流式输出,可以编写自定义的生成器函数,并嵌入到链中。

生成器的签名应为:Iterator[Input] -> Iterator[Output],异步版本则用 AsyncIterator

下面实现一个按中文句号 . 分隔的自定义解析器:

from langchain_deepseek import ChatDeepSeek
from langchain_core.output_parsers import StrOutputParser
from typing import Iterator, List

model = ChatDeepSeek(model="deepseek-chat")
parser = StrOutputParser()

# 自定义生成器:将字符流组合成句子列表
def split_into_list(input_stream: Iterator[str]) -> Iterator[List[str]]:
    buffer = ""
    for chunk in input_stream:
        buffer += chunk
        while "。 " in buffer:
            index = buffer.index("。 ")
            sentence = buffer[:index].strip()
            yield [sentence]           # 产出一个句子
            buffer = buffer[index+1:]  # 保留剩余部分
    if buffer.strip():
        yield [buffer.strip()]         # 产出最后的部分

chain = model | parser | split_into_list

for chunk in chain.stream("写一份关于爱情的歌词,需要5句话,每句话用句号分割"):
    print(chunk, end="|", flush=True)

输出效果(每个 chunk 是一个包含一句话的列表):

['你像晨光落进我眼底,把暗夜都推向潮汐']|['心跳的密语缠成线,系在无名指的缝隙']|['风穿过发梢的季节,你的温度是唯一答案']|['所有风景都退成模糊,你是清晰的那句诺言']|['后来日落沉入海平线,我们的故事还在写。']|

通过这种方式,可以灵活地按照业务需求控制流式输出的粒度


6、深度探索:LangChain 流式传输的底层协议

上面我们学会了如何使用流式传输,但是否好奇:LangChain 是怎样实现流式的?它依赖什么协议?

6.1、SSE(Server-Sent Events)协议简介

HTTP 是无状态的请求-响应协议,本不支持服务器主动推送数据。SSE 是一种基于 HTTP 的轻量级实时通信技术,允许服务器向客户端单向推送事件流。

核心特性

  • 基于标准 HTTP/HTTPS,无需额外端口,兼容性好。

  • 单向通信:服务器 → 客户端持续发送数据。

  • 自动重连:连接中断时客户端可自动重连。

  • 报文格式:服务端响应头设置 Content-Type: text/event-stream,数据由若干消息组成,每个消息以 \n\n 分隔,消息内部行格式为 field: value

常见字段:

  • data:数据内容(必需)

  • event:自定义事件类型(可选,默认为 message

  • id:数据标识符(可选)

  • retry:重连间隔(可选,单位毫秒)

6.2、LangChain 如何使用 SSE?

LangChain 本身并不创造底层协议,它复用了各大模型提供商原有的流式能力。以 OpenAI 为例:

  1. 发起 HTTP 请求:LangChain 内部集成了 OpenAI Python SDK,定义了一个 HTTP 客户端(_SyncHttpxClientWrapper),向 https://api.openai.com/v1 发起请求。

  2. 开启流式模式:在请求参数中设置 stream=True(见 _stream() 源码第一步),告知 OpenAI 服务器用 SSE 格式返回数据。

  3. 接收 SSE 数据块:服务器保持连接,以 SSE 格式持续推送 JSON 块。

  4. 转换为 LangChain 内部格式:LangChain 通过 _convert_chunk_to_generation_chunk() 和 _convert_delta_to_message_chunk() 方法,将 OpenAI 的原始块转换为 AIMessageChunk 对象。转换过程会根据 delta 中的 role 和 content 等信息,创建对应的消息块(如 AIMessageChunkToolMessageChunk 等),还能处理函数调用、工具调用等高阶功能。

总结链路

用户调用 .stream()
  → LangChain 构造 HTTP 请求(stream=True)
  → OpenAI API 以 SSE 格式返回事件块
  → LangChain 逐块接收并解析
  → 将每个块转换为 AIMessageChunk
  → 通过迭代器逐个产出给用户代码

这意味着,无论底层的模型提供商是 OpenAI、Anthropic 还是其他,LangChain 都能以统一的消息块形式呈现流式结果,方便开发者无缝切换或集成。


7、总结

流式传输显著提升用户体验,让模型一边生成一边显示。

.stream() 提供同步迭代器,适合简单脚本或测试。

.astream() 结合 asyncio 实现异步非阻塞流式,适合生产环境的 Web 应用。

流式是 Runnable 接口的通用能力,用 | 组合的链同样支持流式;StrOutputParser 可将消息块转换为纯文本流。

自定义解析器可以按业务需求调整输出粒度(如按句子、段落输出)。

底层上,LangChain 通过 SSE 协议接收模型提供商的流式数据,并在内部转换为 AIMessageChunk,实现了跨提供商的统一流式处理。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐