1. stream() 同步传输

from langchain.chat_models import init_chat_model

# 流式传输
model = init_chat_model(model="deepseek-chat", model_provider="deepseek")

# stream方法返回的是一个迭代器,产生的是消息块
print(model.invoke("写一个关于春天的作文,1000字"))

stream方法返回的是一个迭代器,产生的是消息块,那么就可以遍历拿到所有的消息块。

from langchain.chat_models import init_chat_model
# 流式传输
model = init_chat_model(model="deepseek-chat", model_provider="deepseek")

# stream方法返回的是一个迭代器,产生的是消息块
# 那么就可以遍历拿到所有的消息块
for chunk in model.stream("写一个关于春天的作文,1000字"):
    print(chunk.content, end="|", flush=True) # end="|":每个块结尾用|进行分割;flush=True:强制立即刷新输出缓冲区

输出结果:

|##| |春|分|

|||||||时分|||站在|学校|操场|中央||地理|老师说||这时|太阳||||||南北|半球|昼夜|平分|||我怎么||感受|不到|这种|平衡|——|脚下的|影子|几乎|||成一个|||仿佛|随时||消失||阳光|||||||头皮||||空气||||||||极了|冬天|残留||雪花|||抬头|望向||||||||||||出的|||||||显得|格外|||||让我|想起|爷爷|家的|春天||爷爷|住在|太行||深处||那里|没有|高大的|教学楼||只有|连绵|起伏|的山||||月的|||依然||||带着|冬天|最后的||||阳光|穿过|稀疏||树枝||在地上|留下|||的影子||去年|这时候|||||爷爷||||||看他||土豆||||||||||在地上||出一条|浅浅||||然后把|发芽||土豆|||进去|||用手|轻轻|盖上||||这样|就好了|?”|我问|||好了|。”|爷爷|||||拍拍|手上的|泥土|,“|等着|||再过||日子||它们||发芽||。”

|||下来||盯着|那片|平整|的土地|||看不见|任何|变化||阳光||||色的|泥土|||反射||淡淡|的光||远处的||还是||黄的||只有||处的||||||出了||黄的|||爷爷|说的|||日子|”,|到底||多久|||现在||当我|站在|城市的|校园|||望着||高楼|切割|||块的|天空||突然|理解了|爷爷|的话|||说的|||日子|”,|不是|日历|||过去|的一|||而是|土地||种子|一点点|积蓄|力量|的过程||就像|学校的|铃声||定点|响起||准时|下课|||春天的|生长|从来不||铃声|||||||科学||上学||植物的|光合|作用||老师说|植物|通过|光合|作用||阳光|变成|能量||于是就||出了||||出了||||我觉得||事情|哪有|这么|简单||去年|春天||爷爷|||我看到|一颗|蒲公英|种子|落在|||||没有|阳光||没有|土壤||只有|一点|雨水||灰尘||但它|还是|发芽|||||出了一||小小的|黄花||那一刻||我才|明白||春天|不是|偶然|降临||礼物|||大概|就是|春天的|秘密||||不需要|大声|宣告|自己的|到来||也不需要|人们|时刻|惦记||就像|爷爷||下的|土豆|||看不见|的地方|悄悄|生长||而我们|||我们|||等待|一个||重的|开始||一个|完美的|春天|||忘了||春天|就在|每一次|呼吸|之间|||每一个|隐秘||角落里|静静|等待||||

我们调试可以看到整个chunk是一个AIMessageChunk,也就是AIMessage其中的一个块。
在这里插入图片描述

2. astream() 异步传输

2.1 同步(阻塞)方式

# 同步IO
import time

def boil_water():
    print("开始烧水...")
    time.sleep(5)     # 模拟烧水5s, CPU 完全空闲
    print("烧水完成...")

def send_message():
    print("开始发消息...")
    time.sleep(2)  # 模拟烧水2s
    print("发消息完成...")

def main():
    # 1、烧水
    boil_water()
    # 2、发消息
    send_message()

# 共耗时7s
main()

问题: 在boil_water 函数等待的5秒里,CPU 完全空闲,但却不能去做send_message 任务,效率低下。

2.2 协程

多进程通常利用的是多核 CPU 的优势,同时执行多个计算任务。每个进程有自己独立的内存管理,所以不同进程之间要进行数据通信比较麻烦。

多线程是在一个 cpu 上创建多个子任务,当某一个子任务休息的时候其他任务接着执行。多线程的控制是由 python 自己控制的。线程存在数据同步问题,所以要有锁机制。

协程的实现是在一个线程内实现的,相当于流水线作业。由于线程切换的消耗比较大,所以对于并发编程,可以优先使用协程。
在这里插入图片描述
协程,作为一种轻量级的并发编程模型,可以被视为用户态的“轻量级线程”。与传统线程相比,协程的核心优势在于其调度完全由用户空间掌控,避免了操作系统内核的频繁介入,从而显著降低了上下文切换的开销。

2.3 异步方式

# 异步 IO
import asyncio

# 协程1
async def boil_water_async():
    print("开始烧水...")
    await asyncio.sleep(5)  # 关键! await表示等待这个操作完成,但期间可以做别的事
    print("烧水完成...")
# 协程2
async def send_message_async():
    print("开始发消息...")
    await asyncio.sleep(2)  # 模拟烧水2s
    print("发消息完成...")

我们将这两个方法定义成了异步的,实际上boil_water_asyncsend_message_async 就是两个协程

我们让main函数来管理这两个协程的流程:

# 异步 IO
import asyncio

# 协程1
async def boil_water_async():
    print("开始烧水...")
    await asyncio.sleep(5)  # 关键! await表示等待这个操作完成,但期间可以做别的事
    print("烧水完成...")
# 协程2
async def send_message_async():
    print("开始发消息...")
    await asyncio.sleep(2)  # 模拟烧水2s
    print("发消息完成...")

# 协程:调度
# 事件循环
async def main():
    # 1、烧水(任务),使用asyncio.create_task将其创建成任务
    task1 = asyncio.create_task(boil_water_async())
    # 2、发消息(任务)
    task2 = asyncio.create_task(send_message_async())
    
    # 等待任务1和任务2都完成
    await task1
    await task2

# 5s
# run函数 会创建一个事件循环,并运行指定的协程。
asyncio.run(main())

输出结果:

开始烧水...
开始发消息...
发消息完成...
烧水完成...

什么是事件循环???

事件循环是 asyncio(Python 标准库中的模块,用于编写异步 I/O
操作的代码)的核心,你可以把它想象成一个总调度员或一个高效的待办事项 (To-Do List) 管理员。 它的工作流程非常简单:

  1. 它维护着一个任务列表(比如:煮水、发短信)。
  2. 它不断地循环检查每个任务: a. 如果任务处于“等待I/O” 状态(比如等水开、等网络响应),就暂停它,立即去执行下一个已经“就绪” 的任务。 b. 如果任务的等待时间到了或者 I/O 操作完成了,事件循环就恢复执行这个任务。

通过使用 asyncio ,我们可以在单线程中同时处理多个任务。一个在单线程内调度和管理所有协程的核心机制,就是事件循环。它不停地检查哪些协程可以执行,哪些在等待。
总结一下:

  • 协程是 asyncio 的核心概念之一。它是一个特殊的函数,可以在执行过程中暂停,并在稍后恢复执行。协程通过 async def 关键字定义,并通过 await 关键字暂停执行,等待异步操作完成。
  • 要运行一个协程,可以使用 asyncio.run() 函数。它会创建一个事件循环,并运行指定的协程。事件循环是 asyncio 的核心组件,负责调度和执行协程。它不断地检查是否有任务需要执行,并在任务完成后调用相应的回调函数。

2.4 异步流式输出

from langchain.chat_models import init_chat_model
import asyncio

model = init_chat_model(model="deepseek-chat", model_provider="deepseek")

# 异步流式输出
async def async_stream():
    print("======异步调用======")
    async for chunk in model.astream("写一段关于春天的作文,100字") :
        print(chunk.content, end='|', flush=True)

# 创建事件循环,执行协程
asyncio.run(async_stream())

输出结果:

======异步调用======
|#| |春|

|窗外||梧桐||出了|||||||||||来的|||阳光|||||下来|||融融|||||日的|寒气|||化了||||带着||泥土||味儿||还有|花的||||||||||绿|||燕子|||||||屋檐|||||||||||||热闹||春天|就这样|悄悄地|来了|||一切都||醒了||||

3. 自定义输出格式

from typing import Iterator, List

from langchain.chat_models import init_chat_model
from langchain_core.output_parsers import StrOutputParser

# 组件一:聊天模型
model = init_chat_model(model="deepseek-chat", model_provider="deepseek")

# 组件二:定义输出解析器
parser = StrOutputParser()

# 自定义生成器
def split_into_list(input: Iterator[str]) -> Iterator[List[str]]:
    buffer = ""
    for chunk in input:
        buffer += chunk
        # 遇到句号,刷新
        while "。" in buffer:
            # 找到句号的位置
            stop_index = buffer.index("。")
            yield [buffer[:stop_index].strip()]
            buffer = buffer[stop_index+1:]
    # 处理buffer最后几个字
    yield [buffer[:].strip()]

# 定义链
chain = model | parser | split_into_list

for chunk in chain.stream("写一个关于春天的作文,5句话,每句话用中文句号隔开"):
    # print(chunk.content, end="|", flush=True)
    # 这里使用的是输出解析器,结果就是一个str,不用再chunk.content
    print(chunk, end="|", flush=True)

yield 是 Python 中用来创建生成器的关键字。包含 yield的函数不会一次性执行完并返回一个值,而是返回一个迭代器(生成器对象)。每次调用 next()或循环迭代时,函数会从上一次暂停的地方继续执行,直到遇到下一个 yield,然后把 yield 后面的值“吐”出来,再次暂停。

输出结果:

['春天总是悄然而至,像一位羞涩的画家,用嫩绿的草芽和粉白的杏花悄悄地涂抹着大地']|['清晨的露珠挂在刚刚舒展的柳叶上,折射出细碎的光芒,仿佛每一滴都藏着一个苏醒的梦']|['风也变得温柔了,不再像冬天那样尖锐,而是一阵一阵地送来泥土和青草的香气']|['人们脱去厚重的冬衣,脚步不自觉地轻快起来,连说话的声音里都带着笑意']|['春天不像夏天那样热烈,也不像秋天那样浓艳,它只是轻轻地、坚定地告诉我们:一切都可以重新开始']|['']|

4. 流式传输原理

在流式传输中,客户端向服务端发送消息之后,服务端需要向客户端持续的推送消息,WebSocket可以支持双向连接,但是我们需要维护状态。

4.1 SSE协议

HTTP 协议本身设计为无状态的请求-响应模式,严格来说,是无法做到服务器主动推送消息到客户端,但通过Server-Sent Events (服务器发送事件,简称 SSE)技术可实现流式传输,允许服务器主动向浏览器推送数据流。
也就是说,服务器向客户端声明,接下来要发送的是流消息(streaming),这时客户端不会关闭连接,会一直等待服务器发送过来新的数据流。
SSE(Server-Sent Events)是一种基于 HTTP 的轻量级实时通信协议,浏览器可以通过内置的EventSource API 接收并处理这些实时事件。

在这里插入图片描述

SSE核心特点

  • 基于 HTTP 协议
    复用标准 HTTP/HTTPS 协议,无需额外端口或协议,兼容性好且易于部署。
  • 单向通信机制
    SSE 仅支持服务器向客户端的单向数据推送,客户端通过普通 HTTP 请求建立连接后,服务器可持续发送数据流,但客户端无法通过同一连接向服务器发送数据。
  • 自动重连机制
    支持断线重连,连接中断时,浏览器会自动尝试重新连接(支持 retry 字段指定重连间隔)。
  • 自定义消息类型
    客户端发起请求后,服务器保持连接开放,响应头设置Content-Type: text/eventstream,标识为事件流格式,持续推送事件流。

数据格式

服务端向浏览器发送 SSE 数据,需要设置必要的 HTTP 头信息:

# 告诉浏览器返回的数据类型是“事件流”,浏览器会据此保持连接并等待后续数据。
Content-Type: text/event-stream;charset=utf-8

# 要求 HTTP 连接保持打开状态,而不是每次发送完数据就关闭。这样服务器可以持续推送消息。
Connection: keep-alive

SSE 的消息由若干行组成,每条消息之间用 两个换行符 \n\n 分隔。
每一行的基本格式是:

[field]: value\n

在这里插入图片描述

4.2 LangChain实现方式

上面的代码中,我们的model/chain调用的stream方法都是LangChain里面的组件的流式传输能力。

LangChain 本身并不“创造”或“规定”一个底层的网络传输协议,而是依赖于其底层的大模型供应商(如 OpenAI)和我们自身服务应用所使用的 Web 框架(如 FastAPI)的协议。

因此对于 LangChain 的流式传输能力,本身是因为大模型供应商提供了流式传输能力,由 LangChain进行调用后接收并处理成一个个的AIMessageChunk 。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐