流式输出SSE原理与Python实现
流式输出(SSE)原理与 Python 实现
本文是我学习流式输出过程中的总结与梳理。在写完《OpenAI API 实战指南》之后,我已经能用
stream=True让 LLM 逐字输出回复了。但用着用着,几个问题开始困扰我:stream=True背后到底是什么机制?SSE 协议是怎么工作的?如果不用 OpenAI 的 SDK,我自己怎么实现一个 SSE 服务端?前端又是怎么接收和渲染这些流式数据的?这篇文章就是我对这些问题的探索和回答。
声明:本文为作者在学习过程中的总结与梳理,仅供学习参考。由于作者水平有限,文中可能存在表述不准确或遗漏之处,欢迎读者提出指正与交流。
目录
- 引言:从
stream=True到理解 SSE - SSE 协议原理深度解析
- Python 异步生成器与 SSE 服务端实现
- 3.1 异步生成器:理解
async def+yield - 3.2 用 FastAPI 实现 SSE 服务端
- 3.3 用 Starlette 的 StreamingResponse 实现
- 3.4 处理连接断开与资源清理
- 3.5 并发多个 SSE 连接:协程的优势
- 3.1 异步生成器:理解
- 前端 EventSource 与逐字渲染
- 实战:搭建完整的流式 AI 对话应用
- 5.1 后端:FastAPI + OpenAI Stream
- 5.2 前端:原生 HTML/JS 实现
- 5.3 完整流程串联
- 常见问题与避坑指南
- 6.1 Nginx 反向代理缓冲问题
- 6.2 中文乱码问题
- 6.3 连接超时
- 6.4 跨域问题
- 6.5 多个 chunk 同时到达
- 6.6 中途取消生成
- 6.7 用 curl 测试 SSE 端点
- 总结与学习建议
1. 引言:从 stream=True 到理解 SSE
1.1 一个让我好奇的现象
在《OpenAI API 实战指南》中,我写了一个流式客户端,核心代码就几行:
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "你好"}],
stream=True, # 就这一个参数
)
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
这段代码跑起来之后,文字真的就一个字一个字往外蹦。当时我觉得很神奇,但没多想——反正 SDK 帮我搞定了。
后来有一次,我想把这个流式输出集成到一个自己写的 Web 应用里。问题来了:OpenAI 的 SDK 是在客户端用的,我的 Web 应用后端怎么把流式数据传给前端浏览器?前端又怎么接收和渲染?
这一连串问题推着我开始深入研究 SSE(Server-Sent Events)协议。下面这张图概括了我从"会用"到"理解"的学习路径:
会用(实战指南阶段) 理解(本文阶段)
│ │
├─ stream=True 参数 → ├─ SSE 协议规范
├─ for chunk in stream → ├─ HTTP 长连接与分块传输
├─ print(content) → ├─ 异步生成器与事件循环
└─ SDK 封装好的 → ├─ 自己实现 SSE 服务端
├─ 前端 EventSource / Fetch Stream
└─ 逐字渲染的打字机效果
1.2 我学到了什么
经过这段时间的学习和实践,我掌握了以下能力:
- 理解 SSE 协议的完整规范(数据格式、事件类型、重连机制、错误处理)
- 用 Python 异步生成器自己实现 SSE 服务端,不依赖任何第三方 SSE 库
- 用 FastAPI 和 Starlette 搭建支持流式输出的 Web 服务
- 前端用 EventSource 和 Fetch API 两种方式接收流式数据
- 实现打字机效果的逐字渲染,包括 Markdown 内容的流式渲染
- 处理连接断开、并发连接、跨域等生产环境常见问题
2. SSE 协议原理深度解析
2.1 HTTP 协议的限制与 SSE 的诞生
要理解 SSE,得先理解传统 HTTP 的工作方式。
传统的 HTTP 请求-响应模式是这样的:
客户端 服务器
│ │
├──── 请求(Request)──────────→│
│ │ 服务器处理...
│ │
│←──── 响应(Response)─────────┤
│ │
│ 连接关闭 │
这是一个"一问一答"的模式。客户端发一个请求,服务器回一个响应,连接就关了。如果客户端想获取新数据,必须再发一个新请求。
这种模式在大多数场景下没问题,但在 AI 对话场景下有个明显的体验问题:LLM 生成一个完整回复可能需要 3-10 秒,用户在这段时间里只能盯着空白屏幕干等。
SSE 的思路很简单:服务器不一次性返回所有数据,而是保持连接打开,有数据就发一点,发完了再关。这样客户端就能边收边显示。
客户端 服务器
│ │
├──── 请求 ────────────────────→│
│ │
│←──── data: token1 ───────────┤ (连接保持打开)
│←──── data: token2 ───────────┤
│←──── data: token3 ───────────┤
│←──── data: token4 ───────────┤
│←──── data: [DONE] ───────────┤
│ │
│ 连接关闭 │
2.2 SSE vs WebSocket vs 轮询:三种实时通信方案对比
在学 SSE 之前,我先搞清楚了一个问题:实现"服务器主动推送",不止 SSE 一种方案。常见的三种方案各有适用场景:
| 方案 | 通信方向 | 协议 | 复杂度 | 适用场景 |
|---|---|---|---|---|
| 短轮询(Polling) | 客户端→服务器 | HTTP | 最低 | 对实时性要求不高的场景 |
| 长轮询(Long Polling) | 客户端→服务器 | HTTP | 中等 | 兼容老浏览器、简单通知 |
| SSE | 服务器→客户端(单向) | HTTP | 较低 | AI 流式输出、股票行情、日志推送 |
| WebSocket | 双向 | WS/WSS | 较高 | 聊天室、协作编辑、游戏 |
我的理解:
- 短轮询就是每隔几秒问一次"有新数据吗?"。简单但浪费资源,大部分请求都是空返回。
- 长轮询是问一次之后,服务器hold住连接,有数据再返回。比短轮询好,但每次都要重新建立连接。
- SSE 是建立一个长连接,服务器可以持续推送数据。单向(服务器到客户端),基于 HTTP,实现简单。
- WebSocket 是全双工的,客户端和服务器可以互相发消息。功能最强但也最复杂,需要升级协议。
对于 AI 流式输出这个场景,SSE 是最合适的选择:
- 单向就够了:AI 生成回复是服务器推给客户端,不需要客户端频繁发消息给服务器
- 基于 HTTP:不需要特殊的协议升级,现有的 HTTP 基础设施(代理、负载均衡、认证)都能直接用
- 实现简单:服务端只需要按特定格式写数据,客户端有原生的 EventSource API
- 自动重连:SSE 规范内置了重连机制,连接断了客户端会自动重连
2.3 SSE 协议规范详解
SSE 的协议规范其实很简单,核心就几条规则。我第一次看 W3C 的规范文档时觉得挺吓人,但提炼出来就下面这些。
Content-Type
服务器返回的 HTTP 响应头必须设置为 text/event-stream:
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
text/event-stream:告诉浏览器这是 SSE 流,不是普通网页no-cache:禁止缓存,确保每次都是实时数据keep-alive:保持连接不关闭
数据格式
SSE 的数据格式是纯文本,由多个字段组成,每个字段占一行,格式为 field: value。一条消息由一个或多个字段组成,消息之间用空行分隔。
SSE 定义了四种标准字段:
| 字段 | 必填 | 说明 | 示例 |
|---|---|---|---|
data |
是 | 消息的数据内容,可以有多行 | data: hello world |
event |
否 | 事件类型,不指定则触发 onmessage |
event: update |
id |
否 | 消息 ID,用于断线重连时告诉服务器从哪继续 | id: 42 |
retry |
否 | 重连间隔(毫秒),默认约 3 秒 | retry: 5000 |
基本示例:
data: 这是第一条消息
data: 这是第二条消息
data: 这条消息有多行数据
event: custom_event
data: 这是一个自定义事件
id: 100
data: 这条消息带有ID
OpenAI 流式输出的 SSE 格式:
data: {"id":"chatcmpl-xxx","object":"chat.completion.chunk","created":1700000000,"model":"gpt-4o-mini","choices":[{"index":0,"delta":{"content":"你好"},"finish_reason":null}]}
data: {"id":"chatcmpl-xxx","object":"chat.completion.chunk","created":1700000000,"model":"gpt-4o-mini","choices":[{"index":0,"delta":{"content":",我"},"finish_reason":null}]}
data: {"id":"chatcmpl-xxx","object":"chat.completion.chunk","created":1700000000,"model":"gpt-4o-mini","choices":[{"index":0,"delta":{"content":"是"},"finish_reason":null}]}
data: [DONE]
注意几个细节:
- 每条消息以
data:开头 - 消息内容是 JSON 格式(这是 OpenAI 的选择,不是 SSE 规范要求的)
- 最后一条是
data: [DONE],表示流结束 - 每条消息之间有空行分隔
注释行
以冒号 : 开头的行是注释,客户端会忽略。常用于心跳保活:
: 这是一个注释,客户端不会处理
: ping - 2024-01-01 12:00:00
事件类型
不指定 event 字段时,客户端通过 onmessage 处理器接收。指定了 event 后,可以通过 addEventListener 监听特定事件:
event: user_joined
data: {"username": "张三", "time": "12:00"}
event: user_left
data: {"username": "李四", "time": "12:05"}
// 前端可以分别监听不同事件
const source = new EventSource('/api/events');
source.addEventListener('user_joined', (e) => {
const data = JSON.parse(e.data);
console.log(`${data.username} 加入了`);
});
source.addEventListener('user_left', (e) => {
const data = JSON.parse(e.data);
console.log(`${data.username} 离开了`);
});
断线重连
SSE 最让我觉得贴心的设计是自动重连。如果连接意外断开,浏览器会自动重新连接。重连时,浏览器会在请求头中带上 Last-Event-ID,服务器可以根据这个 ID 继续发送断线期间错过的数据。
客户端断开 → 等待 retry 毫秒 → 重新连接 → 带上 Last-Event-ID 头
这个机制对 AI 流式输出场景意义不大(因为 LLM 生成的内容不适合"断点续传"),但在消息推送、日志流等场景下非常有用。
服务端可以通过读取请求头中的 Last-Event-ID 来实现断点续传:
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
@app.get("/api/events")
async def sse_events(request: Request):
# 读取客户端发来的 Last-Event-ID,判断从哪开始续传
last_event_id = request.headers.get("Last-Event-ID")
if last_event_id:
start_from = int(last_event_id) + 1
else:
start_from = 0
async def event_generator():
for i in range(start_from, 100):
yield f"id: {i}\n"
yield f"data: 事件 {i}\n\n"
await asyncio.sleep(1)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
)
2.4 SSE 的底层:HTTP 长连接与分块传输编码
SSE 能实现"服务器持续推送"的关键在于两个 HTTP 机制:长连接(Keep-Alive) 和 分块传输编码(Chunked Transfer Encoding)。
长连接:HTTP/1.1 默认开启 Keep-Alive,TCP 连接在响应完成后不会立即关闭,可以复用。SSE 利用这个特性,让连接一直保持打开状态。
分块传输编码:正常情况下,HTTP 响应需要先知道 Content-Length(响应体有多大)。但 SSE 的数据是动态生成的,服务器不知道总共要发多少数据。分块传输编码解决了这个问题——服务器把响应体分成一块一块发送,每块前面加上该块的大小(十六进制),最后发一个大小为 0 的块表示结束。
HTTP/1.1 200 OK
Content-Type: text/event-stream
Transfer-Encoding: chunked
1a\r\n ← 块大小(26字节)
data: {"content": "你好"}\n\n ← 块数据
\r\n
1f\r\n ← 块大小(31字节)
data: {"content": ",我是"}\n\n ← 块数据
\r\n
0\r\n ← 结束标记
\r\n
不过这些底层细节我们一般不需要手动处理——Web 框架(FastAPI、Starlette 等)会自动处理分块传输编码。了解这些是为了在出问题时知道从哪排查。
关于 HTTP/2:以上讨论基于 HTTP/1.1。在 HTTP/2 下,SSE 的行为有一些值得注意的差异:
- HTTP/2 原生支持多路复用(一个 TCP 连接承载多个请求/响应流),不需要分块传输编码
- 但 HTTP/2 的连接流有并发限制,浏览器通常限制同一域名下约 100 个并发流
- SSE 的长连接在 HTTP/2 下仍然占用一个流,如果页面同时有多个 SSE 连接和其他请求,可能触及并发上限
- 实际开发中,HTTP/2 的这些差异对 SSE 的使用影响不大,FastAPI + uvicorn 在 HTTP/2 下也能正常工作(通过 h2 协议支持)
3. Python 异步生成器与 SSE 服务端实现
3.1 异步生成器:理解 async def + yield
在写 SSE 服务端之前,我得先搞懂 Python 的异步生成器。这个概念一开始让我有点晕——async 和 yield 放在一起是什么意思?
普通生成器(同步):
def count_up_to(n):
i = 1
while i <= n:
yield i # 暂停,返回值,等下次调用再继续
i += 1
for num in count_up_to(3):
print(num) # 输出: 1, 2, 3
普通生成器的特点是:函数执行到 yield 时暂停,把值返回给调用者;调用者再次迭代时,函数从上次暂停的地方继续执行。整个过程是同步的——暂停期间,整个程序都在等。
异步生成器:
import asyncio
async def async_count_up_to(n):
i = 1
while i <= n:
await asyncio.sleep(0.1) # 模拟异步操作
yield i # 暂停,返回值
i += 1
async def main():
async for num in async_count_up_to(3):
print(num) # 输出: 1, 2, 3(每个间隔0.1秒)
asyncio.run(main())
异步生成器的关键区别在于:yield 暂停时,不阻塞事件循环。其他协程可以继续运行。这对于 SSE 服务端至关重要——一个 SSE 连接在 yield 等待数据时,事件循环可以去处理其他连接的请求。
我的理解:
普通生成器:yield → 暂停 → 整个程序等 → 下次迭代 → 继续
异步生成器:yield → 暂停 → 事件循环去干别的事 → 下次迭代 → 继续
这就是为什么 SSE 服务端必须用异步——一个服务端可能要同时维持几十上百个 SSE 连接,如果每个连接都阻塞等待,服务端很快就撑不住了。
3.2 用 FastAPI 实现 SSE 服务端
FastAPI 对 SSE 的支持非常自然——只需要返回一个 StreamingResponse,把异步生成器传进去就行。
# sse_server_fastapi.py
import asyncio
import json
import time
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from openai import OpenAI
app = FastAPI()
client = OpenAI()
async def openai_stream_generator(messages: list[dict], model: str = "gpt-4o-mini"):
"""异步生成器:逐块产出 OpenAI 的流式响应"""
stream = client.chat.completions.create(
model=model,
messages=messages,
stream=True,
stream_options={"include_usage": True},
)
for chunk in stream:
if chunk.choices and len(chunk.choices) > 0:
delta = chunk.choices[0].delta
if delta.content:
# 构造 SSE 格式的数据
sse_data = json.dumps({
"content": delta.content,
"finish_reason": chunk.choices[0].finish_reason,
}, ensure_ascii=False)
yield f"data: {sse_data}\n\n"
# 最后一个 chunk 包含 usage 信息
if hasattr(chunk, "usage") and chunk.usage:
usage_data = json.dumps({
"usage": {
"prompt_tokens": chunk.usage.prompt_tokens,
"completion_tokens": chunk.usage.completion_tokens,
"total_tokens": chunk.usage.total_tokens,
}
}, ensure_ascii=False)
yield f"data: {usage_data}\n\n"
# 发送结束标记
yield "data: [DONE]\n\n"
@app.post("/api/chat/stream")
async def chat_stream(request: Request):
"""流式对话接口"""
body = await request.json()
messages = body.get("messages", [])
return StreamingResponse(
openai_stream_generator(messages),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # 禁用 Nginx 缓冲
},
)
关键点解析:
StreamingResponse:FastAPI/Starlette 提供的流式响应类,接受一个生成器作为内容源media_type="text/event-stream":告诉浏览器这是 SSE 流X-Accel-Buffering: no:如果用了 Nginx 反向代理,这个头告诉 Nginx 不要缓冲响应,直接透传yield f"data: {sse_data}\n\n":每条消息以data:开头,以\n\n结尾(空行分隔)ensure_ascii=False:确保中文不会被转成\uXXXX格式stream_options={"include_usage": True}:OpenAI 的参数,开启后会在流的最后一个 chunk 中附带 token 用量统计(usage字段)。默认情况下流式请求不返回 usage 信息,开启后可以实时获取 prompt_tokens、completion_tokens 等数据,方便做成本监控。注意这个参数需要 OpenAI Python SDK v1.0+ 版本
3.3 用 Starlette 的 StreamingResponse 实现
如果你的项目用的是 Starlette(或者不想引入 FastAPI),可以直接用 Starlette 的 StreamingResponse。FastAPI 底层就是 Starlette,所以用法几乎一样:
# sse_server_starlette.py
import asyncio
import json
from starlette.applications import Starlette
from starlette.responses import StreamingResponse
from starlette.routing import Route
async def sse_generator():
"""一个简单的 SSE 生成器示例"""
messages = [
"你好!",
"我是 AI 助手。",
"正在为你生成回复...",
"这是最后一条消息。",
]
for i, msg in enumerate(messages):
await asyncio.sleep(0.5) # 模拟生成延迟
data = json.dumps({
"index": i,
"content": msg,
"is_final": i == len(messages) - 1,
}, ensure_ascii=False)
yield f"data: {data}\n\n"
yield "data: [DONE]\n\n"
async def stream_endpoint(request):
return StreamingResponse(
sse_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
)
app = Starlette(routes=[
Route("/api/stream", stream_endpoint),
])
3.4 处理连接断开与资源清理
SSE 连接可能随时断开——用户关闭了浏览器标签页、网络波动、代理超时等等。如果不处理断开事件,生成器可能还在后台运行,浪费资源。
import asyncio
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
app = FastAPI()
async def safe_sse_generator(request: Request):
"""带断开检测和资源清理的 SSE 生成器"""
try:
for i in range(100):
# 检查客户端是否已断开
if await request.is_disconnected():
print(f"客户端已断开连接,停止生成")
break
await asyncio.sleep(0.5)
yield f"data: 消息 {i + 1}\n\n"
except asyncio.CancelledError:
# 协程被取消时的清理
print("SSE 连接被取消,执行清理...")
finally:
# 无论如何都会执行的清理逻辑
print("SSE 连接结束,释放资源")
@app.get("/api/safe-stream")
async def safe_stream(request: Request):
return StreamingResponse(
safe_sse_generator(request),
media_type="text/event-stream",
)
关键点:
request.is_disconnected():FastAPI/Starlette 提供的方法,检查客户端是否还在连接asyncio.CancelledError:当协程被取消时(比如客户端断开),会抛出这个异常finally:确保无论如何都会执行清理逻辑
3.5 并发多个 SSE 连接:协程的优势
这是异步生成器真正发光的地方。假设有 100 个用户同时连接 SSE,每个连接都在等待 LLM 生成:
# 同步方式(假想代码,实际不推荐)
# 每个连接占用一个线程,100个连接 = 100个线程
# 线程切换开销大,内存占用高
# 异步方式(实际使用)
# 100个连接共享一个事件循环
# 每个连接在 await 时让出控制权,事件循环去处理下一个连接
# 内存占用低,切换开销小
异步的优势在于:当一个 SSE 连接在 await asyncio.sleep() 或等待 I/O 时,它不占用 CPU,事件循环可以切换到另一个连接继续工作。这就是协程的"协作式多任务"——每个协程主动让出控制权,而不是被操作系统强制切换。
4. 前端 EventSource 与逐字渲染
4.1 EventSource API 详解
浏览器原生支持 SSE,通过 EventSource 对象来接收。这是最简单的使用方式:
// 创建 EventSource 连接
const source = new EventSource('/api/chat/stream');
// 监听默认消息事件
source.onmessage = (event) => {
if (event.data === '[DONE]') {
console.log('流式传输完成');
source.close(); // 主动关闭连接
return;
}
const data = JSON.parse(event.data);
console.log('收到内容:', data.content);
};
// 监听自定义事件
source.addEventListener('error_event', (event) => {
console.error('服务器发来了错误事件:', event.data);
});
// 连接打开时
source.onopen = () => {
console.log('SSE 连接已建立');
};
// 连接出错时(包括断线重连失败)
source.onerror = (event) => {
console.error('SSE 连接出错:', event);
// readyState 可以判断连接状态:
// 0 = CONNECTING(正在连接)
// 1 = OPEN(已连接)
// 2 = CLOSED(已关闭)
if (source.readyState === EventSource.CLOSED) {
console.log('连接已永久关闭');
}
};
// 手动关闭连接
// source.close();
EventSource 的局限性:
- 只支持 GET 请求:不能发 POST,不能自定义请求头(除了 Cookie)
- 不能传 Request Body:对于 AI 对话,messages 通常很长,放在 URL 里不现实
- 自动重连:有时候是好事,有时候是坏事——AI 流式生成断了重连也没意义
因为这些限制,在实际的 AI 对话应用中,我更推荐用 Fetch API + ReadableStream。
4.2 Fetch + ReadableStream:更灵活的方案
Fetch API 的 response.body 是一个 ReadableStream,可以逐块读取响应数据。相比 EventSource,它的优势是:
- 支持 POST 请求,可以传 JSON Body
- 可以自定义请求头(如 Authorization)
- 不会自动重连,行为完全可控
- 可以中途取消(通过
AbortController)
// fetch_stream.js
async function chatWithStream(userMessage, conversationHistory = []) {
const messages = [
{ role: 'system', content: '你是一个有帮助的AI助手。' },
...conversationHistory,
{ role: 'user', content: userMessage },
];
const response = await fetch('/api/chat/stream', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ messages }),
});
if (!response.ok) {
throw new Error(`HTTP 错误: ${response.status}`);
}
// 获取 ReadableStream
const reader = response.body.getReader();
const decoder = new TextDecoder('utf-8');
let buffer = ''; // 缓冲区,存放不完整的数据
let fullContent = ''; // 累积的完整内容
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('流式传输完成');
break;
}
// 解码二进制数据
buffer += decoder.decode(value, { stream: true });
// 按行分割,处理完整的 SSE 消息
const lines = buffer.split('\n');
// 最后一行可能不完整,保留在缓冲区
buffer = lines.pop() || '';
for (const line of lines) {
if (line.startsWith('data: ')) {
const dataStr = line.slice(6); // 去掉 "data: " 前缀
if (dataStr === '[DONE]') {
console.log('收到结束标记');
return fullContent;
}
try {
const data = JSON.parse(dataStr);
if (data.content) {
fullContent += data.content;
// 回调:将新内容传给渲染函数
onNewContent(data.content, fullContent);
}
} catch (e) {
// JSON 解析失败,跳过
console.warn('解析 SSE 数据失败:', dataStr);
}
}
}
}
return fullContent;
}
// 使用示例
function onNewContent(delta, fullContent) {
// 将增量内容追加到页面
document.getElementById('output').textContent = fullContent;
}
关键点解析:
response.body.getReader():获取流读取器decoder.decode(value, { stream: true }):stream: true表示这是流式解码,多字节字符(如中文)可能被截断,解码器会保留不完整的字节等下次一起解码- 缓冲区
buffer:因为 TCP 是流式传输,一次read()可能读到不完整的 SSE 消息。用缓冲区保存不完整的行,等下次读取时拼接 lines.pop() || '':JavaScript 中Array.pop()对空数组返回undefined(falsy 值),||运算符会取右侧的''作为默认值。等价于 Python 的lines.pop() if lines else ''。最后一行可能不完整,保留在缓冲区
4.3 逐字渲染的 UI 实现:打字机效果
数据拿到了,接下来是怎么在页面上展示。最简单的做法是直接设置 textContent:
// 简单但不够好——整个文本替换,没有动画感
document.getElementById('output').textContent = fullContent;
更好的做法是模拟打字机效果——新内容逐个字符追加,给用户一种"AI 正在打字"的感觉:
// typewriter.js
class TypewriterRenderer {
constructor(elementId, speed = 30) {
this.element = document.getElementById(elementId);
this.queue = []; // 待渲染的字符队列
this.isRendering = false;
this.speed = speed; // 每个字符的渲染间隔(毫秒)
}
// 追加新内容到队列
append(deltaText) {
for (const char of deltaText) {
this.queue.push(char);
}
this._startRendering();
}
_startRendering() {
if (this.isRendering) return; // 已经在渲染中
this.isRendering = true;
this._renderNext();
}
_renderNext() {
if (this.queue.length === 0) {
this.isRendering = false;
return;
}
const char = this.queue.shift();
this.element.textContent += char;
// 自动滚动到底部
this.element.scrollTop = this.element.scrollHeight;
setTimeout(() => this._renderNext(), this.speed);
}
// 清空
reset() {
this.queue = [];
this.element.textContent = '';
this.isRendering = false;
}
}
// 使用示例
const renderer = new TypewriterRenderer('chat-output', 30);
function onNewContent(delta, fullContent) {
renderer.append(delta);
}
打字机效果的几个细节:
- 速度控制:
speed = 30表示每个字符间隔 30ms,大约每秒 33 个字符。这个速度比人类阅读稍快,不会让用户觉得慢 - 队列机制:LLM 生成速度可能比渲染速度快,用队列缓冲,避免丢字符
- 自动滚动:新内容出现时自动滚动到底部,用户不用手动滚
4.4 Markdown 内容的流式渲染
AI 回复经常包含 Markdown 格式(代码块、列表、加粗等)。流式渲染 Markdown 有个挑战:Markdown 语法需要看到完整的标记才能正确渲染。
比如 **加粗**,如果只收到了 **加,这时候渲染会出问题——开头的 ** 没有对应的结尾 **。
解决方案是:用累积的完整内容重新渲染,而不是只渲染增量:
// markdown_stream_renderer.js
import { marked } from 'marked'; // Markdown 解析库
class MarkdownStreamRenderer {
constructor(elementId) {
this.element = document.getElementById(elementId);
this.fullContent = '';
}
append(deltaText) {
this.fullContent += deltaText;
// 用完整内容重新渲染 Markdown
// 注意:不完整的 Markdown 语法可能导致短暂显示异常
// 但下一次 append 时会被修正
this.element.innerHTML = marked.parse(this.fullContent);
// 自动滚动到底部
this.element.scrollTop = this.element.scrollHeight;
}
reset() {
this.fullContent = '';
this.element.innerHTML = '';
}
}
代码块的流式渲染优化:
代码块是 Markdown 流式渲染中最容易出问题的地方。如果代码块还没闭合(只收到了开头的 ```),整个后续内容都会被当成代码块的一部分,导致页面显示异常。
function safeMarkdownRender(text) {
// 如果代码块未闭合,暂时用 <pre><code> 包裹
const openCodeBlocks = (text.match(/```/g) || []).length;
if (openCodeBlocks % 2 !== 0) {
// 代码块未闭合,补一个闭合标记
text += '\n```';
}
return marked.parse(text);
}
这个方案虽然简单,但有一个局限:它假设所有 ``` 都是代码块标记。如果 Markdown 内容中包含行内代码 ```…```(虽然少见),计数就会出错。更稳健的做法是用状态机逐行解析,但对于学习阶段,上面的方案已经够用了。
另外,除了代码块,还有一些 Markdown 结构在流式渲染时可能出现短暂异常:
- 加粗
**...**:只收到开头的**时,后续文字会短暂显示为加粗样式,收到结尾**后恢复正常 - 表格:表头和分隔行没到齐时,表格无法正确渲染
- 嵌套列表:缩进层级在流式到达时可能暂时错乱
这些问题的本质都一样:Markdown 是"看到完整结构才能正确渲染"的格式。流式场景下的通用解法就是用完整累积内容重新渲染——每次收到新内容,用整个 fullContent 重新调用 Markdown 解析器。虽然看起来"浪费"(重复解析已有内容),但现代 Markdown 解析器(如 marked.js)性能足够好,实际体验没有可感知的延迟。
5. 实战:搭建完整的流式 AI 对话应用
5.1 后端:FastAPI + OpenAI Stream
把前面学的串起来,写一个完整的后端服务:
# app.py - 完整的流式 AI 对话后端
import json
import time
import uuid
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse, HTMLResponse
from fastapi.staticfiles import StaticFiles
from openai import OpenAI
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(title="流式 AI 对话应用")
client = OpenAI()
async def chat_stream_generator(messages: list[dict], model: str = "gpt-4o-mini"):
"""核心流式生成器"""
request_id = str(uuid.uuid4())[:8]
logger.info(f"[{request_id}] 开始流式生成, model={model}")
start_time = time.time()
total_content = ""
try:
stream = client.chat.completions.create(
model=model,
messages=messages,
stream=True,
stream_options={"include_usage": True},
)
for chunk in stream:
if chunk.choices and len(chunk.choices) > 0:
delta = chunk.choices[0].delta
finish_reason = chunk.choices[0].finish_reason
if delta.content:
total_content += delta.content
sse_data = json.dumps({
"type": "content",
"content": delta.content,
"finish_reason": finish_reason,
}, ensure_ascii=False)
yield f"data: {sse_data}\n\n"
if hasattr(chunk, "usage") and chunk.usage:
elapsed = time.time() - start_time
sse_data = json.dumps({
"type": "usage",
"usage": {
"prompt_tokens": chunk.usage.prompt_tokens,
"completion_tokens": chunk.usage.completion_tokens,
"total_tokens": chunk.usage.total_tokens,
},
"elapsed_seconds": round(elapsed, 2),
}, ensure_ascii=False)
yield f"data: {sse_data}\n\n"
yield "data: [DONE]\n\n"
elapsed = time.time() - start_time
logger.info(
f"[{request_id}] 流式生成完成, "
f"内容长度={len(total_content)}, 耗时={elapsed:.2f}s"
)
except Exception as e:
logger.error(f"[{request_id}] 流式生成出错: {e}")
error_data = json.dumps({
"type": "error",
"message": str(e),
}, ensure_ascii=False)
yield f"data: {error_data}\n\n"
yield "data: [DONE]\n\n"
@app.post("/api/chat/stream")
async def chat_stream(request: Request):
"""流式对话接口"""
body = await request.json()
messages = body.get("messages", [])
model = body.get("model", "gpt-4o-mini")
return StreamingResponse(
chat_stream_generator(messages, model),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
@app.get("/")
async def index():
"""返回前端页面"""
with open("index.html", "r", encoding="utf-8") as f:
return HTMLResponse(f.read())
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
5.2 前端:原生 HTML/JS 实现
<!-- index.html -->
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>流式 AI 对话</title>
<style>
* { margin: 0; padding: 0; box-sizing: border-box; }
body {
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif;
background: #f5f5f5;
display: flex;
justify-content: center;
align-items: center;
min-height: 100vh;
}
.chat-container {
width: 100%;
max-width: 800px;
height: 90vh;
background: white;
border-radius: 12px;
box-shadow: 0 4px 20px rgba(0,0,0,0.1);
display: flex;
flex-direction: column;
overflow: hidden;
}
.chat-header {
padding: 16px 20px;
border-bottom: 1px solid #eee;
font-size: 18px;
font-weight: 600;
color: #333;
}
.chat-messages {
flex: 1;
overflow-y: auto;
padding: 20px;
}
.message {
margin-bottom: 16px;
display: flex;
gap: 12px;
}
.message.user { justify-content: flex-end; }
.message-avatar {
width: 36px;
height: 36px;
border-radius: 50%;
display: flex;
align-items: center;
justify-content: center;
font-size: 16px;
flex-shrink: 0;
}
.message.user .message-avatar { background: #007aff; color: white; }
.message.assistant .message-avatar { background: #34c759; color: white; }
.message-content {
max-width: 70%;
padding: 12px 16px;
border-radius: 12px;
line-height: 1.6;
word-break: break-word;
}
.message.user .message-content {
background: #007aff;
color: white;
border-bottom-right-radius: 4px;
}
.message.assistant .message-content {
background: #f0f0f0;
color: #333;
border-bottom-left-radius: 4px;
}
/* Markdown 内容样式 */
.message-content pre {
background: #1e1e1e;
color: #d4d4d4;
padding: 12px;
border-radius: 8px;
overflow-x: auto;
margin: 8px 0;
}
.message-content code {
font-family: "Fira Code", monospace;
font-size: 14px;
}
.message-content p { margin: 4px 0; }
.message-content ul, .message-content ol { padding-left: 20px; }
.chat-input-area {
padding: 16px 20px;
border-top: 1px solid #eee;
display: flex;
gap: 12px;
}
.chat-input {
flex: 1;
padding: 12px 16px;
border: 1px solid #ddd;
border-radius: 24px;
font-size: 15px;
outline: none;
resize: none;
font-family: inherit;
}
.chat-input:focus { border-color: #007aff; }
.send-btn {
padding: 12px 24px;
background: #007aff;
color: white;
border: none;
border-radius: 24px;
font-size: 15px;
cursor: pointer;
transition: background 0.2s;
}
.send-btn:hover { background: #0056cc; }
.send-btn:disabled { background: #ccc; cursor: not-allowed; }
.typing-indicator {
display: inline-block;
width: 8px;
height: 8px;
background: #999;
border-radius: 50%;
animation: blink 1s infinite;
}
@keyframes blink {
0%, 100% { opacity: 1; }
50% { opacity: 0.3; }
}
</style>
</head>
<body>
<div class="chat-container">
<div class="chat-header">AI 流式对话</div>
<div class="chat-messages" id="chatMessages"></div>
<div class="chat-input-area">
<textarea
class="chat-input"
id="chatInput"
placeholder="输入消息,按 Enter 发送..."
rows="1"
></textarea>
<button class="send-btn" id="sendBtn" onclick="sendMessage()">发送</button>
</div>
</div>
<script>
const chatMessages = document.getElementById('chatMessages');
const chatInput = document.getElementById('chatInput');
const sendBtn = document.getElementById('sendBtn');
const conversationHistory = [];
let isGenerating = false;
// Enter 发送,Shift+Enter 换行
chatInput.addEventListener('keydown', (e) => {
if (e.key === 'Enter' && !e.shiftKey) {
e.preventDefault();
sendMessage();
}
});
// 自动调整输入框高度
chatInput.addEventListener('input', () => {
chatInput.style.height = 'auto';
chatInput.style.height = chatInput.scrollHeight + 'px';
});
function addMessage(role, content) {
const div = document.createElement('div');
div.className = `message ${role}`;
div.innerHTML = `
<div class="message-avatar">${role === 'user' ? 'U' : 'AI'}</div>
<div class="message-content">${content}</div>
`;
chatMessages.appendChild(div);
chatMessages.scrollTop = chatMessages.scrollHeight;
return div;
}
async function sendMessage() {
const userMessage = chatInput.value.trim();
if (!userMessage || isGenerating) return;
// 显示用户消息
addMessage('user', escapeHtml(userMessage));
chatInput.value = '';
chatInput.style.height = 'auto';
sendBtn.disabled = true;
isGenerating = true;
// 创建 AI 消息占位
const aiMessageDiv = addMessage('assistant', '');
const contentDiv = aiMessageDiv.querySelector('.message-content');
contentDiv.innerHTML = '<span class="typing-indicator"></span>';
conversationHistory.push({ role: 'user', content: userMessage });
try {
const response = await fetch('/api/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
messages: [
{ role: 'system', content: '你是一个有帮助的AI助手。回答简洁清晰。' },
...conversationHistory,
],
}),
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}`);
}
const reader = response.body.getReader();
const decoder = new TextDecoder('utf-8');
let buffer = '';
let fullContent = '';
let isFirstChunk = true;
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (const line of lines) {
if (!line.startsWith('data: ')) continue;
const dataStr = line.slice(6);
if (dataStr === '[DONE]') break;
try {
const data = JSON.parse(dataStr);
if (data.type === 'content' && data.content) {
if (isFirstChunk) {
contentDiv.innerHTML = '';
isFirstChunk = false;
}
fullContent += data.content;
// 简单 Markdown 渲染
contentDiv.innerHTML = renderSimpleMarkdown(fullContent);
chatMessages.scrollTop = chatMessages.scrollHeight;
}
if (data.type === 'usage') {
console.log('Token 使用:', data.usage);
}
if (data.type === 'error') {
contentDiv.innerHTML = `错误: ${escapeHtml(data.message)}`;
}
} catch (e) {
// 忽略解析错误
}
}
}
conversationHistory.push({ role: 'assistant', content: fullContent });
} catch (error) {
contentDiv.innerHTML = `请求失败: ${escapeHtml(error.message)}`;
} finally {
isGenerating = false;
sendBtn.disabled = false;
chatInput.focus();
}
}
function escapeHtml(text) {
const div = document.createElement('div');
div.textContent = text;
return div.innerHTML;
}
function renderSimpleMarkdown(text) {
// 简单的 Markdown 渲染(生产环境建议用 marked.js)
let html = escapeHtml(text);
// 代码块 ```
html = html.replace(/```(\w*)\n([\s\S]*?)```/g, (_, lang, code) => {
return `<pre><code class="language-${lang}">${code.trim()}</code></pre>`;
});
// 行内代码 `
html = html.replace(/`([^`]+)`/g, '<code>$1</code>');
// 加粗 **
html = html.replace(/\*\*([^*]+)\*\*/g, '<strong>$1</strong>');
// 换行
html = html.replace(/\n/g, '<br>');
return html;
}
</script>
</body>
</html>
5.3 完整流程串联
把整个流程串起来看一遍:
1. 用户在浏览器输入 "用Python写一个快速排序"
│
▼
2. 前端 fetch POST /api/chat/stream
请求体: {"messages": [{"role":"user","content":"用Python写一个快速排序"}]}
│
▼
3. FastAPI 收到请求 → 调用 chat_stream_generator()
│
▼
4. 生成器内部调用 OpenAI API (stream=True)
│
▼
5. OpenAI 逐 token 返回 → 生成器 yield SSE 格式数据
│
▼
6. StreamingResponse 通过 HTTP 分块传输发给浏览器
│
▼
7. 前端 ReadableStream 逐块读取 → 解析 SSE → 提取 content
│
▼
8. 打字机效果逐字渲染到页面
│
▼
9. 收到 [DONE] → 流结束 → 对话历史更新
6. 常见问题与避坑指南
6.1 Nginx 反向代理缓冲问题
如果你用 Nginx 做反向代理,可能会发现流式输出变成了"一次性返回"——Nginx 默认会缓冲响应。解决方法:
location /api/chat/stream {
proxy_pass http://backend:8000;
proxy_buffering off; # 关闭缓冲
proxy_cache off; # 关闭缓存
proxy_set_header Connection '';
proxy_http_version 1.1; # 使用 HTTP/1.1 长连接
chunked_transfer_encoding on; # 开启分块传输
}
6.2 中文乱码问题
SSE 数据中包含中文时,确保:
- 服务端
json.dumps使用ensure_ascii=False - 响应头中设置正确的编码(FastAPI 默认 UTF-8,一般不需要额外设置)
- 前端
TextDecoder使用utf-8
6.3 连接超时
SSE 是长连接,如果 LLM 生成时间很长(比如超过 60 秒),可能会触发各种超时:
- Nginx
proxy_read_timeout:默认 60s,需要调大 - AWS ALB 空闲超时:默认 60s,需要调大
- 浏览器超时:一般不会主动断开 SSE 连接
location /api/chat/stream {
proxy_read_timeout 300s; # 5分钟超时
proxy_send_timeout 300s;
}
6.4 跨域问题
如果前端和后端不在同一个域名下,需要配置 CORS:
from fastapi.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000"], # 前端地址
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
6.5 多个 chunk 同时到达
网络传输中,多个 SSE chunk 可能在同一个 TCP 包中到达。这就是为什么前端需要缓冲区——不能假设一次 read() 恰好对应一条 SSE 消息。
6.6 中途取消生成
用户可能想中途停止 AI 的生成。实现方式:
// 前端:用 AbortController 取消请求
const controller = new AbortController();
const response = await fetch('/api/chat/stream', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ messages }),
signal: controller.signal, // 关联 AbortController
});
// 用户点击"停止"按钮
document.getElementById('stopBtn').onclick = () => {
controller.abort(); // 取消请求
};
# 后端:检测客户端断开
async def chat_stream_generator(messages, request: Request):
for chunk in stream:
if await request.is_disconnected():
logger.info("客户端已断开,停止生成")
break
# ... 正常处理
6.7 用 curl 测试 SSE 端点
开发调试时,用 curl 可以直接查看 SSE 原始数据,非常方便:
# -N 禁用缓冲,否则 curl 会等数据全部到达才显示
# --no-buffer 效果相同
curl -N http://localhost:8000/api/chat/stream \
-H "Content-Type: application/json" \
-d '{"messages": [{"role": "user", "content": "你好"}]}'
# 输出示例:
# data: {"type":"content","content":"你好","finish_reason":null}
# data: {"type":"content","content":"!","finish_reason":null}
# data: {"type":"content","content":"有什么","finish_reason":null}
# data: [DONE]
-N / --no-buffer 是关键参数——没有它,curl 默认会缓冲输出,流式效果就看不到了。
7. 总结与学习建议
7.1 核心要点回顾
回头看这段学习经历,SSE 的核心知识可以浓缩为以下几点:
- SSE 是单向的服务器推送协议,基于 HTTP,比 WebSocket 简单,非常适合 AI 流式输出场景
- 协议格式很简单:
data: 内容\n\n,空行分隔消息,[DONE]表示结束 - Python 异步生成器(
async def+yield)是实现 SSE 服务端的核心,不阻塞事件循环 - 前端有两种接收方式:EventSource(简单但有局限)和 Fetch + ReadableStream(灵活推荐)
- 逐字渲染用队列 + 定时器实现打字机效果,Markdown 内容用完整内容重新渲染
7.2 学习路径建议
第一步:理解 SSE 协议
→ 搞懂 data/event/id/retry 四个字段
→ 理解 HTTP 长连接和分块传输
→ 用 curl 测试 SSE 接口,看原始数据
第二步:写一个简单的 SSE 服务端
→ 用 FastAPI + StreamingResponse
→ 先不接 OpenAI,手动 yield 几条测试数据
→ 用浏览器 EventSource 接收验证
第三步:接入 OpenAI 流式 API
→ 把 OpenAI stream 的输出转成 SSE 格式
→ 处理 usage 信息、错误信息
第四步:完善前端
→ 用 Fetch + ReadableStream 替代 EventSource
→ 实现打字机效果
→ 处理 Markdown 渲染
第五步:生产化
→ 处理 Nginx 缓冲
→ 处理连接断开和超时
→ 添加日志和监控
7.3 与已有知识体系的关联
- 本文是《OpenAI API 实战指南》第四章的深度展开,建议先阅读实战指南了解基础用法
- SSE 的异步生成器是 Python asyncio 的典型应用场景,后续学习异步编程时可以回来看
- 流式输出是 AI 对话应用的体验核心,后续学习 Agent 开发、RAG 系统时都会用到
7.4 我参考的资料
- SSE 协议规范(W3C):https://html.spec.whatwg.org/multipage/server-sent-events.html
- MDN EventSource 文档:https://developer.mozilla.org/zh-CN/docs/Web/API/EventSource
- FastAPI StreamingResponse 文档:https://fastapi.tiangolo.com/advanced/custom-response/#streamingresponse
- OpenAI Streaming 文档:https://platform.openai.com/docs/api-reference/streaming
- Python asyncio 官方文档:https://docs.python.org/zh-cn/3/library/asyncio.html
文档版本:v1.0 | 更新时间:2026-05-13 | 适用读者:已完成 OpenAI API 基础学习,希望深入理解流式输出原理和实现的 AI 应用开发者
本文为原创技术文档,,部分内容由AI辅助完成。如需转载,请注明出处。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)