流式输出(SSE)原理与 Python 实现

本文是我学习流式输出过程中的总结与梳理。在写完《OpenAI API 实战指南》之后,我已经能用 stream=True 让 LLM 逐字输出回复了。但用着用着,几个问题开始困扰我:stream=True 背后到底是什么机制?SSE 协议是怎么工作的?如果不用 OpenAI 的 SDK,我自己怎么实现一个 SSE 服务端?前端又是怎么接收和渲染这些流式数据的?这篇文章就是我对这些问题的探索和回答。

声明:本文为作者在学习过程中的总结与梳理,仅供学习参考。由于作者水平有限,文中可能存在表述不准确或遗漏之处,欢迎读者提出指正与交流。


目录

  1. 引言:从 stream=True 到理解 SSE
  2. SSE 协议原理深度解析
  3. Python 异步生成器与 SSE 服务端实现
  4. 前端 EventSource 与逐字渲染
  5. 实战:搭建完整的流式 AI 对话应用
  6. 常见问题与避坑指南
  7. 总结与学习建议

1. 引言:从 stream=True 到理解 SSE

1.1 一个让我好奇的现象

在《OpenAI API 实战指南》中,我写了一个流式客户端,核心代码就几行:

stream = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[{"role": "user", "content": "你好"}],
    stream=True,  # 就这一个参数
)

for chunk in stream:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="", flush=True)

这段代码跑起来之后,文字真的就一个字一个字往外蹦。当时我觉得很神奇,但没多想——反正 SDK 帮我搞定了。

后来有一次,我想把这个流式输出集成到一个自己写的 Web 应用里。问题来了:OpenAI 的 SDK 是在客户端用的,我的 Web 应用后端怎么把流式数据传给前端浏览器?前端又怎么接收和渲染?

这一连串问题推着我开始深入研究 SSE(Server-Sent Events)协议。下面这张图概括了我从"会用"到"理解"的学习路径:

会用(实战指南阶段)          理解(本文阶段)
      │                              │
      ├─ stream=True 参数     →       ├─ SSE 协议规范
      ├─ for chunk in stream  →       ├─ HTTP 长连接与分块传输
      ├─ print(content)       →       ├─ 异步生成器与事件循环
      └─ SDK 封装好的          →       ├─ 自己实现 SSE 服务端
                                      ├─ 前端 EventSource / Fetch Stream
                                      └─ 逐字渲染的打字机效果

1.2 我学到了什么

经过这段时间的学习和实践,我掌握了以下能力:

  • 理解 SSE 协议的完整规范(数据格式、事件类型、重连机制、错误处理)
  • 用 Python 异步生成器自己实现 SSE 服务端,不依赖任何第三方 SSE 库
  • 用 FastAPI 和 Starlette 搭建支持流式输出的 Web 服务
  • 前端用 EventSource 和 Fetch API 两种方式接收流式数据
  • 实现打字机效果的逐字渲染,包括 Markdown 内容的流式渲染
  • 处理连接断开、并发连接、跨域等生产环境常见问题

2. SSE 协议原理深度解析

2.1 HTTP 协议的限制与 SSE 的诞生

要理解 SSE,得先理解传统 HTTP 的工作方式。

传统的 HTTP 请求-响应模式是这样的:

客户端                          服务器
   │                              │
   ├──── 请求(Request)──────────→│
   │                              │  服务器处理...
   │                              │
   │←──── 响应(Response)─────────┤
   │                              │
   │        连接关闭               │

这是一个"一问一答"的模式。客户端发一个请求,服务器回一个响应,连接就关了。如果客户端想获取新数据,必须再发一个新请求。

这种模式在大多数场景下没问题,但在 AI 对话场景下有个明显的体验问题:LLM 生成一个完整回复可能需要 3-10 秒,用户在这段时间里只能盯着空白屏幕干等。

SSE 的思路很简单:服务器不一次性返回所有数据,而是保持连接打开,有数据就发一点,发完了再关。这样客户端就能边收边显示。

客户端                          服务器
   │                              │
   ├──── 请求 ────────────────────→│
   │                              │
   │←──── data: token1 ───────────┤  (连接保持打开)
   │←──── data: token2 ───────────┤
   │←──── data: token3 ───────────┤
   │←──── data: token4 ───────────┤
   │←──── data: [DONE] ───────────┤
   │                              │
   │        连接关闭               │

2.2 SSE vs WebSocket vs 轮询:三种实时通信方案对比

在学 SSE 之前,我先搞清楚了一个问题:实现"服务器主动推送",不止 SSE 一种方案。常见的三种方案各有适用场景:

方案 通信方向 协议 复杂度 适用场景
短轮询(Polling) 客户端→服务器 HTTP 最低 对实时性要求不高的场景
长轮询(Long Polling) 客户端→服务器 HTTP 中等 兼容老浏览器、简单通知
SSE 服务器→客户端(单向) HTTP 较低 AI 流式输出、股票行情、日志推送
WebSocket 双向 WS/WSS 较高 聊天室、协作编辑、游戏

我的理解

  • 短轮询就是每隔几秒问一次"有新数据吗?"。简单但浪费资源,大部分请求都是空返回。
  • 长轮询是问一次之后,服务器hold住连接,有数据再返回。比短轮询好,但每次都要重新建立连接。
  • SSE 是建立一个长连接,服务器可以持续推送数据。单向(服务器到客户端),基于 HTTP,实现简单。
  • WebSocket 是全双工的,客户端和服务器可以互相发消息。功能最强但也最复杂,需要升级协议。

对于 AI 流式输出这个场景,SSE 是最合适的选择:

  1. 单向就够了:AI 生成回复是服务器推给客户端,不需要客户端频繁发消息给服务器
  2. 基于 HTTP:不需要特殊的协议升级,现有的 HTTP 基础设施(代理、负载均衡、认证)都能直接用
  3. 实现简单:服务端只需要按特定格式写数据,客户端有原生的 EventSource API
  4. 自动重连:SSE 规范内置了重连机制,连接断了客户端会自动重连

2.3 SSE 协议规范详解

SSE 的协议规范其实很简单,核心就几条规则。我第一次看 W3C 的规范文档时觉得挺吓人,但提炼出来就下面这些。

Content-Type

服务器返回的 HTTP 响应头必须设置为 text/event-stream

Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
  • text/event-stream:告诉浏览器这是 SSE 流,不是普通网页
  • no-cache:禁止缓存,确保每次都是实时数据
  • keep-alive:保持连接不关闭
数据格式

SSE 的数据格式是纯文本,由多个字段组成,每个字段占一行,格式为 field: value。一条消息由一个或多个字段组成,消息之间用空行分隔。

SSE 定义了四种标准字段:

字段 必填 说明 示例
data 消息的数据内容,可以有多行 data: hello world
event 事件类型,不指定则触发 onmessage event: update
id 消息 ID,用于断线重连时告诉服务器从哪继续 id: 42
retry 重连间隔(毫秒),默认约 3 秒 retry: 5000

基本示例

data: 这是第一条消息

data: 这是第二条消息
data: 这条消息有多行数据

event: custom_event
data: 这是一个自定义事件

id: 100
data: 这条消息带有ID

OpenAI 流式输出的 SSE 格式

data: {"id":"chatcmpl-xxx","object":"chat.completion.chunk","created":1700000000,"model":"gpt-4o-mini","choices":[{"index":0,"delta":{"content":"你好"},"finish_reason":null}]}

data: {"id":"chatcmpl-xxx","object":"chat.completion.chunk","created":1700000000,"model":"gpt-4o-mini","choices":[{"index":0,"delta":{"content":",我"},"finish_reason":null}]}

data: {"id":"chatcmpl-xxx","object":"chat.completion.chunk","created":1700000000,"model":"gpt-4o-mini","choices":[{"index":0,"delta":{"content":"是"},"finish_reason":null}]}

data: [DONE]

注意几个细节:

  • 每条消息以 data: 开头
  • 消息内容是 JSON 格式(这是 OpenAI 的选择,不是 SSE 规范要求的)
  • 最后一条是 data: [DONE],表示流结束
  • 每条消息之间有空行分隔
注释行

以冒号 : 开头的行是注释,客户端会忽略。常用于心跳保活:

: 这是一个注释,客户端不会处理
: ping - 2024-01-01 12:00:00
事件类型

不指定 event 字段时,客户端通过 onmessage 处理器接收。指定了 event 后,可以通过 addEventListener 监听特定事件:

event: user_joined
data: {"username": "张三", "time": "12:00"}

event: user_left
data: {"username": "李四", "time": "12:05"}
// 前端可以分别监听不同事件
const source = new EventSource('/api/events');

source.addEventListener('user_joined', (e) => {
    const data = JSON.parse(e.data);
    console.log(`${data.username} 加入了`);
});

source.addEventListener('user_left', (e) => {
    const data = JSON.parse(e.data);
    console.log(`${data.username} 离开了`);
});
断线重连

SSE 最让我觉得贴心的设计是自动重连。如果连接意外断开,浏览器会自动重新连接。重连时,浏览器会在请求头中带上 Last-Event-ID,服务器可以根据这个 ID 继续发送断线期间错过的数据。

客户端断开 → 等待 retry 毫秒 → 重新连接 → 带上 Last-Event-ID 头

这个机制对 AI 流式输出场景意义不大(因为 LLM 生成的内容不适合"断点续传"),但在消息推送、日志流等场景下非常有用。

服务端可以通过读取请求头中的 Last-Event-ID 来实现断点续传:

from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse

@app.get("/api/events")
async def sse_events(request: Request):
    # 读取客户端发来的 Last-Event-ID,判断从哪开始续传
    last_event_id = request.headers.get("Last-Event-ID")
    if last_event_id:
        start_from = int(last_event_id) + 1
    else:
        start_from = 0

    async def event_generator():
        for i in range(start_from, 100):
            yield f"id: {i}\n"
            yield f"data: 事件 {i}\n\n"
            await asyncio.sleep(1)

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
    )

2.4 SSE 的底层:HTTP 长连接与分块传输编码

SSE 能实现"服务器持续推送"的关键在于两个 HTTP 机制:长连接(Keep-Alive)分块传输编码(Chunked Transfer Encoding)

长连接:HTTP/1.1 默认开启 Keep-Alive,TCP 连接在响应完成后不会立即关闭,可以复用。SSE 利用这个特性,让连接一直保持打开状态。

分块传输编码:正常情况下,HTTP 响应需要先知道 Content-Length(响应体有多大)。但 SSE 的数据是动态生成的,服务器不知道总共要发多少数据。分块传输编码解决了这个问题——服务器把响应体分成一块一块发送,每块前面加上该块的大小(十六进制),最后发一个大小为 0 的块表示结束。

HTTP/1.1 200 OK
Content-Type: text/event-stream
Transfer-Encoding: chunked

1a\r\n                          ← 块大小(26字节)
data: {"content": "你好"}\n\n   ← 块数据
\r\n
1f\r\n                          ← 块大小(31字节)
data: {"content": ",我是"}\n\n ← 块数据
\r\n
0\r\n                           ← 结束标记
\r\n

不过这些底层细节我们一般不需要手动处理——Web 框架(FastAPI、Starlette 等)会自动处理分块传输编码。了解这些是为了在出问题时知道从哪排查。

关于 HTTP/2:以上讨论基于 HTTP/1.1。在 HTTP/2 下,SSE 的行为有一些值得注意的差异:

  • HTTP/2 原生支持多路复用(一个 TCP 连接承载多个请求/响应流),不需要分块传输编码
  • 但 HTTP/2 的连接流有并发限制,浏览器通常限制同一域名下约 100 个并发流
  • SSE 的长连接在 HTTP/2 下仍然占用一个流,如果页面同时有多个 SSE 连接和其他请求,可能触及并发上限
  • 实际开发中,HTTP/2 的这些差异对 SSE 的使用影响不大,FastAPI + uvicorn 在 HTTP/2 下也能正常工作(通过 h2 协议支持)

3. Python 异步生成器与 SSE 服务端实现

3.1 异步生成器:理解 async def + yield

在写 SSE 服务端之前,我得先搞懂 Python 的异步生成器。这个概念一开始让我有点晕——asyncyield 放在一起是什么意思?

普通生成器(同步):

def count_up_to(n):
    i = 1
    while i <= n:
        yield i      # 暂停,返回值,等下次调用再继续
        i += 1

for num in count_up_to(3):
    print(num)       # 输出: 1, 2, 3

普通生成器的特点是:函数执行到 yield 时暂停,把值返回给调用者;调用者再次迭代时,函数从上次暂停的地方继续执行。整个过程是同步的——暂停期间,整个程序都在等。

异步生成器

import asyncio

async def async_count_up_to(n):
    i = 1
    while i <= n:
        await asyncio.sleep(0.1)  # 模拟异步操作
        yield i                    # 暂停,返回值
        i += 1

async def main():
    async for num in async_count_up_to(3):
        print(num)                 # 输出: 1, 2, 3(每个间隔0.1秒)

asyncio.run(main())

异步生成器的关键区别在于:yield 暂停时,不阻塞事件循环。其他协程可以继续运行。这对于 SSE 服务端至关重要——一个 SSE 连接在 yield 等待数据时,事件循环可以去处理其他连接的请求。

我的理解

普通生成器:yield → 暂停 → 整个程序等 → 下次迭代 → 继续
异步生成器:yield → 暂停 → 事件循环去干别的事 → 下次迭代 → 继续

这就是为什么 SSE 服务端必须用异步——一个服务端可能要同时维持几十上百个 SSE 连接,如果每个连接都阻塞等待,服务端很快就撑不住了。

3.2 用 FastAPI 实现 SSE 服务端

FastAPI 对 SSE 的支持非常自然——只需要返回一个 StreamingResponse,把异步生成器传进去就行。

# sse_server_fastapi.py
import asyncio
import json
import time
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from openai import OpenAI

app = FastAPI()
client = OpenAI()


async def openai_stream_generator(messages: list[dict], model: str = "gpt-4o-mini"):
    """异步生成器:逐块产出 OpenAI 的流式响应"""
    stream = client.chat.completions.create(
        model=model,
        messages=messages,
        stream=True,
        stream_options={"include_usage": True},
    )

    for chunk in stream:
        if chunk.choices and len(chunk.choices) > 0:
            delta = chunk.choices[0].delta
            if delta.content:
                # 构造 SSE 格式的数据
                sse_data = json.dumps({
                    "content": delta.content,
                    "finish_reason": chunk.choices[0].finish_reason,
                }, ensure_ascii=False)
                yield f"data: {sse_data}\n\n"

        # 最后一个 chunk 包含 usage 信息
        if hasattr(chunk, "usage") and chunk.usage:
            usage_data = json.dumps({
                "usage": {
                    "prompt_tokens": chunk.usage.prompt_tokens,
                    "completion_tokens": chunk.usage.completion_tokens,
                    "total_tokens": chunk.usage.total_tokens,
                }
            }, ensure_ascii=False)
            yield f"data: {usage_data}\n\n"

    # 发送结束标记
    yield "data: [DONE]\n\n"


@app.post("/api/chat/stream")
async def chat_stream(request: Request):
    """流式对话接口"""
    body = await request.json()
    messages = body.get("messages", [])

    return StreamingResponse(
        openai_stream_generator(messages),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # 禁用 Nginx 缓冲
        },
    )

关键点解析

  1. StreamingResponse:FastAPI/Starlette 提供的流式响应类,接受一个生成器作为内容源
  2. media_type="text/event-stream":告诉浏览器这是 SSE 流
  3. X-Accel-Buffering: no:如果用了 Nginx 反向代理,这个头告诉 Nginx 不要缓冲响应,直接透传
  4. yield f"data: {sse_data}\n\n":每条消息以 data: 开头,以 \n\n 结尾(空行分隔)
  5. ensure_ascii=False:确保中文不会被转成 \uXXXX 格式
  6. stream_options={"include_usage": True}:OpenAI 的参数,开启后会在流的最后一个 chunk 中附带 token 用量统计(usage 字段)。默认情况下流式请求不返回 usage 信息,开启后可以实时获取 prompt_tokens、completion_tokens 等数据,方便做成本监控。注意这个参数需要 OpenAI Python SDK v1.0+ 版本

3.3 用 Starlette 的 StreamingResponse 实现

如果你的项目用的是 Starlette(或者不想引入 FastAPI),可以直接用 Starlette 的 StreamingResponse。FastAPI 底层就是 Starlette,所以用法几乎一样:

# sse_server_starlette.py
import asyncio
import json
from starlette.applications import Starlette
from starlette.responses import StreamingResponse
from starlette.routing import Route


async def sse_generator():
    """一个简单的 SSE 生成器示例"""
    messages = [
        "你好!",
        "我是 AI 助手。",
        "正在为你生成回复...",
        "这是最后一条消息。",
    ]

    for i, msg in enumerate(messages):
        await asyncio.sleep(0.5)  # 模拟生成延迟
        data = json.dumps({
            "index": i,
            "content": msg,
            "is_final": i == len(messages) - 1,
        }, ensure_ascii=False)
        yield f"data: {data}\n\n"

    yield "data: [DONE]\n\n"


async def stream_endpoint(request):
    return StreamingResponse(
        sse_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        },
    )


app = Starlette(routes=[
    Route("/api/stream", stream_endpoint),
])

3.4 处理连接断开与资源清理

SSE 连接可能随时断开——用户关闭了浏览器标签页、网络波动、代理超时等等。如果不处理断开事件,生成器可能还在后台运行,浪费资源。

import asyncio
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse

app = FastAPI()


async def safe_sse_generator(request: Request):
    """带断开检测和资源清理的 SSE 生成器"""
    try:
        for i in range(100):
            # 检查客户端是否已断开
            if await request.is_disconnected():
                print(f"客户端已断开连接,停止生成")
                break

            await asyncio.sleep(0.5)
            yield f"data: 消息 {i + 1}\n\n"

    except asyncio.CancelledError:
        # 协程被取消时的清理
        print("SSE 连接被取消,执行清理...")

    finally:
        # 无论如何都会执行的清理逻辑
        print("SSE 连接结束,释放资源")


@app.get("/api/safe-stream")
async def safe_stream(request: Request):
    return StreamingResponse(
        safe_sse_generator(request),
        media_type="text/event-stream",
    )

关键点

  • request.is_disconnected():FastAPI/Starlette 提供的方法,检查客户端是否还在连接
  • asyncio.CancelledError:当协程被取消时(比如客户端断开),会抛出这个异常
  • finally:确保无论如何都会执行清理逻辑

3.5 并发多个 SSE 连接:协程的优势

这是异步生成器真正发光的地方。假设有 100 个用户同时连接 SSE,每个连接都在等待 LLM 生成:

# 同步方式(假想代码,实际不推荐)
# 每个连接占用一个线程,100个连接 = 100个线程
# 线程切换开销大,内存占用高

# 异步方式(实际使用)
# 100个连接共享一个事件循环
# 每个连接在 await 时让出控制权,事件循环去处理下一个连接
# 内存占用低,切换开销小

异步的优势在于:当一个 SSE 连接在 await asyncio.sleep() 或等待 I/O 时,它不占用 CPU,事件循环可以切换到另一个连接继续工作。这就是协程的"协作式多任务"——每个协程主动让出控制权,而不是被操作系统强制切换。


4. 前端 EventSource 与逐字渲染

4.1 EventSource API 详解

浏览器原生支持 SSE,通过 EventSource 对象来接收。这是最简单的使用方式:

// 创建 EventSource 连接
const source = new EventSource('/api/chat/stream');

// 监听默认消息事件
source.onmessage = (event) => {
    if (event.data === '[DONE]') {
        console.log('流式传输完成');
        source.close();  // 主动关闭连接
        return;
    }

    const data = JSON.parse(event.data);
    console.log('收到内容:', data.content);
};

// 监听自定义事件
source.addEventListener('error_event', (event) => {
    console.error('服务器发来了错误事件:', event.data);
});

// 连接打开时
source.onopen = () => {
    console.log('SSE 连接已建立');
};

// 连接出错时(包括断线重连失败)
source.onerror = (event) => {
    console.error('SSE 连接出错:', event);
    // readyState 可以判断连接状态:
    // 0 = CONNECTING(正在连接)
    // 1 = OPEN(已连接)
    // 2 = CLOSED(已关闭)
    if (source.readyState === EventSource.CLOSED) {
        console.log('连接已永久关闭');
    }
};

// 手动关闭连接
// source.close();

EventSource 的局限性

  1. 只支持 GET 请求:不能发 POST,不能自定义请求头(除了 Cookie)
  2. 不能传 Request Body:对于 AI 对话,messages 通常很长,放在 URL 里不现实
  3. 自动重连:有时候是好事,有时候是坏事——AI 流式生成断了重连也没意义

因为这些限制,在实际的 AI 对话应用中,我更推荐用 Fetch API + ReadableStream。

4.2 Fetch + ReadableStream:更灵活的方案

Fetch API 的 response.body 是一个 ReadableStream,可以逐块读取响应数据。相比 EventSource,它的优势是:

  • 支持 POST 请求,可以传 JSON Body
  • 可以自定义请求头(如 Authorization)
  • 不会自动重连,行为完全可控
  • 可以中途取消(通过 AbortController
// fetch_stream.js
async function chatWithStream(userMessage, conversationHistory = []) {
    const messages = [
        { role: 'system', content: '你是一个有帮助的AI助手。' },
        ...conversationHistory,
        { role: 'user', content: userMessage },
    ];

    const response = await fetch('/api/chat/stream', {
        method: 'POST',
        headers: {
            'Content-Type': 'application/json',
        },
        body: JSON.stringify({ messages }),
    });

    if (!response.ok) {
        throw new Error(`HTTP 错误: ${response.status}`);
    }

    // 获取 ReadableStream
    const reader = response.body.getReader();
    const decoder = new TextDecoder('utf-8');
    let buffer = '';       // 缓冲区,存放不完整的数据
    let fullContent = '';  // 累积的完整内容

    while (true) {
        const { done, value } = await reader.read();

        if (done) {
            console.log('流式传输完成');
            break;
        }

        // 解码二进制数据
        buffer += decoder.decode(value, { stream: true });

        // 按行分割,处理完整的 SSE 消息
        const lines = buffer.split('\n');
        // 最后一行可能不完整,保留在缓冲区
        buffer = lines.pop() || '';

        for (const line of lines) {
            if (line.startsWith('data: ')) {
                const dataStr = line.slice(6);  // 去掉 "data: " 前缀

                if (dataStr === '[DONE]') {
                    console.log('收到结束标记');
                    return fullContent;
                }

                try {
                    const data = JSON.parse(dataStr);
                    if (data.content) {
                        fullContent += data.content;
                        // 回调:将新内容传给渲染函数
                        onNewContent(data.content, fullContent);
                    }
                } catch (e) {
                    // JSON 解析失败,跳过
                    console.warn('解析 SSE 数据失败:', dataStr);
                }
            }
        }
    }

    return fullContent;
}

// 使用示例
function onNewContent(delta, fullContent) {
    // 将增量内容追加到页面
    document.getElementById('output').textContent = fullContent;
}

关键点解析

  1. response.body.getReader():获取流读取器
  2. decoder.decode(value, { stream: true })stream: true 表示这是流式解码,多字节字符(如中文)可能被截断,解码器会保留不完整的字节等下次一起解码
  3. 缓冲区 buffer:因为 TCP 是流式传输,一次 read() 可能读到不完整的 SSE 消息。用缓冲区保存不完整的行,等下次读取时拼接
  4. lines.pop() || '':JavaScript 中 Array.pop() 对空数组返回 undefined(falsy 值),|| 运算符会取右侧的 '' 作为默认值。等价于 Python 的 lines.pop() if lines else ''。最后一行可能不完整,保留在缓冲区

4.3 逐字渲染的 UI 实现:打字机效果

数据拿到了,接下来是怎么在页面上展示。最简单的做法是直接设置 textContent

// 简单但不够好——整个文本替换,没有动画感
document.getElementById('output').textContent = fullContent;

更好的做法是模拟打字机效果——新内容逐个字符追加,给用户一种"AI 正在打字"的感觉:

// typewriter.js
class TypewriterRenderer {
    constructor(elementId, speed = 30) {
        this.element = document.getElementById(elementId);
        this.queue = [];       // 待渲染的字符队列
        this.isRendering = false;
        this.speed = speed;    // 每个字符的渲染间隔(毫秒)
    }

    // 追加新内容到队列
    append(deltaText) {
        for (const char of deltaText) {
            this.queue.push(char);
        }
        this._startRendering();
    }

    _startRendering() {
        if (this.isRendering) return;  // 已经在渲染中
        this.isRendering = true;
        this._renderNext();
    }

    _renderNext() {
        if (this.queue.length === 0) {
            this.isRendering = false;
            return;
        }

        const char = this.queue.shift();
        this.element.textContent += char;

        // 自动滚动到底部
        this.element.scrollTop = this.element.scrollHeight;

        setTimeout(() => this._renderNext(), this.speed);
    }

    // 清空
    reset() {
        this.queue = [];
        this.element.textContent = '';
        this.isRendering = false;
    }
}

// 使用示例
const renderer = new TypewriterRenderer('chat-output', 30);

function onNewContent(delta, fullContent) {
    renderer.append(delta);
}

打字机效果的几个细节

  1. 速度控制speed = 30 表示每个字符间隔 30ms,大约每秒 33 个字符。这个速度比人类阅读稍快,不会让用户觉得慢
  2. 队列机制:LLM 生成速度可能比渲染速度快,用队列缓冲,避免丢字符
  3. 自动滚动:新内容出现时自动滚动到底部,用户不用手动滚

4.4 Markdown 内容的流式渲染

AI 回复经常包含 Markdown 格式(代码块、列表、加粗等)。流式渲染 Markdown 有个挑战:Markdown 语法需要看到完整的标记才能正确渲染。

比如 **加粗**,如果只收到了 **加,这时候渲染会出问题——开头的 ** 没有对应的结尾 **

解决方案是:用累积的完整内容重新渲染,而不是只渲染增量:

// markdown_stream_renderer.js
import { marked } from 'marked';  // Markdown 解析库

class MarkdownStreamRenderer {
    constructor(elementId) {
        this.element = document.getElementById(elementId);
        this.fullContent = '';
    }

    append(deltaText) {
        this.fullContent += deltaText;

        // 用完整内容重新渲染 Markdown
        // 注意:不完整的 Markdown 语法可能导致短暂显示异常
        // 但下一次 append 时会被修正
        this.element.innerHTML = marked.parse(this.fullContent);

        // 自动滚动到底部
        this.element.scrollTop = this.element.scrollHeight;
    }

    reset() {
        this.fullContent = '';
        this.element.innerHTML = '';
    }
}

代码块的流式渲染优化

代码块是 Markdown 流式渲染中最容易出问题的地方。如果代码块还没闭合(只收到了开头的 ```),整个后续内容都会被当成代码块的一部分,导致页面显示异常。

function safeMarkdownRender(text) {
    // 如果代码块未闭合,暂时用 <pre><code> 包裹
    const openCodeBlocks = (text.match(/```/g) || []).length;
    if (openCodeBlocks % 2 !== 0) {
        // 代码块未闭合,补一个闭合标记
        text += '\n```';
    }
    return marked.parse(text);
}

这个方案虽然简单,但有一个局限:它假设所有 ``` 都是代码块标记。如果 Markdown 内容中包含行内代码 ```…```(虽然少见),计数就会出错。更稳健的做法是用状态机逐行解析,但对于学习阶段,上面的方案已经够用了。

另外,除了代码块,还有一些 Markdown 结构在流式渲染时可能出现短暂异常:

  • 加粗 **...**:只收到开头的 ** 时,后续文字会短暂显示为加粗样式,收到结尾 ** 后恢复正常
  • 表格:表头和分隔行没到齐时,表格无法正确渲染
  • 嵌套列表:缩进层级在流式到达时可能暂时错乱

这些问题的本质都一样:Markdown 是"看到完整结构才能正确渲染"的格式。流式场景下的通用解法就是用完整累积内容重新渲染——每次收到新内容,用整个 fullContent 重新调用 Markdown 解析器。虽然看起来"浪费"(重复解析已有内容),但现代 Markdown 解析器(如 marked.js)性能足够好,实际体验没有可感知的延迟。


5. 实战:搭建完整的流式 AI 对话应用

5.1 后端:FastAPI + OpenAI Stream

把前面学的串起来,写一个完整的后端服务:

# app.py - 完整的流式 AI 对话后端
import json
import time
import uuid
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse, HTMLResponse
from fastapi.staticfiles import StaticFiles
from openai import OpenAI
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = FastAPI(title="流式 AI 对话应用")

client = OpenAI()


async def chat_stream_generator(messages: list[dict], model: str = "gpt-4o-mini"):
    """核心流式生成器"""
    request_id = str(uuid.uuid4())[:8]
    logger.info(f"[{request_id}] 开始流式生成, model={model}")

    start_time = time.time()
    total_content = ""

    try:
        stream = client.chat.completions.create(
            model=model,
            messages=messages,
            stream=True,
            stream_options={"include_usage": True},
        )

        for chunk in stream:
            if chunk.choices and len(chunk.choices) > 0:
                delta = chunk.choices[0].delta
                finish_reason = chunk.choices[0].finish_reason

                if delta.content:
                    total_content += delta.content
                    sse_data = json.dumps({
                        "type": "content",
                        "content": delta.content,
                        "finish_reason": finish_reason,
                    }, ensure_ascii=False)
                    yield f"data: {sse_data}\n\n"

            if hasattr(chunk, "usage") and chunk.usage:
                elapsed = time.time() - start_time
                sse_data = json.dumps({
                    "type": "usage",
                    "usage": {
                        "prompt_tokens": chunk.usage.prompt_tokens,
                        "completion_tokens": chunk.usage.completion_tokens,
                        "total_tokens": chunk.usage.total_tokens,
                    },
                    "elapsed_seconds": round(elapsed, 2),
                }, ensure_ascii=False)
                yield f"data: {sse_data}\n\n"

        yield "data: [DONE]\n\n"
        elapsed = time.time() - start_time
        logger.info(
            f"[{request_id}] 流式生成完成, "
            f"内容长度={len(total_content)}, 耗时={elapsed:.2f}s"
        )

    except Exception as e:
        logger.error(f"[{request_id}] 流式生成出错: {e}")
        error_data = json.dumps({
            "type": "error",
            "message": str(e),
        }, ensure_ascii=False)
        yield f"data: {error_data}\n\n"
        yield "data: [DONE]\n\n"


@app.post("/api/chat/stream")
async def chat_stream(request: Request):
    """流式对话接口"""
    body = await request.json()
    messages = body.get("messages", [])
    model = body.get("model", "gpt-4o-mini")

    return StreamingResponse(
        chat_stream_generator(messages, model),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
        },
    )


@app.get("/")
async def index():
    """返回前端页面"""
    with open("index.html", "r", encoding="utf-8") as f:
        return HTMLResponse(f.read())


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

5.2 前端:原生 HTML/JS 实现

<!-- index.html -->
<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>流式 AI 对话</title>
    <style>
        * { margin: 0; padding: 0; box-sizing: border-box; }
        body {
            font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", sans-serif;
            background: #f5f5f5;
            display: flex;
            justify-content: center;
            align-items: center;
            min-height: 100vh;
        }
        .chat-container {
            width: 100%;
            max-width: 800px;
            height: 90vh;
            background: white;
            border-radius: 12px;
            box-shadow: 0 4px 20px rgba(0,0,0,0.1);
            display: flex;
            flex-direction: column;
            overflow: hidden;
        }
        .chat-header {
            padding: 16px 20px;
            border-bottom: 1px solid #eee;
            font-size: 18px;
            font-weight: 600;
            color: #333;
        }
        .chat-messages {
            flex: 1;
            overflow-y: auto;
            padding: 20px;
        }
        .message {
            margin-bottom: 16px;
            display: flex;
            gap: 12px;
        }
        .message.user { justify-content: flex-end; }
        .message-avatar {
            width: 36px;
            height: 36px;
            border-radius: 50%;
            display: flex;
            align-items: center;
            justify-content: center;
            font-size: 16px;
            flex-shrink: 0;
        }
        .message.user .message-avatar { background: #007aff; color: white; }
        .message.assistant .message-avatar { background: #34c759; color: white; }
        .message-content {
            max-width: 70%;
            padding: 12px 16px;
            border-radius: 12px;
            line-height: 1.6;
            word-break: break-word;
        }
        .message.user .message-content {
            background: #007aff;
            color: white;
            border-bottom-right-radius: 4px;
        }
        .message.assistant .message-content {
            background: #f0f0f0;
            color: #333;
            border-bottom-left-radius: 4px;
        }
        /* Markdown 内容样式 */
        .message-content pre {
            background: #1e1e1e;
            color: #d4d4d4;
            padding: 12px;
            border-radius: 8px;
            overflow-x: auto;
            margin: 8px 0;
        }
        .message-content code {
            font-family: "Fira Code", monospace;
            font-size: 14px;
        }
        .message-content p { margin: 4px 0; }
        .message-content ul, .message-content ol { padding-left: 20px; }
        .chat-input-area {
            padding: 16px 20px;
            border-top: 1px solid #eee;
            display: flex;
            gap: 12px;
        }
        .chat-input {
            flex: 1;
            padding: 12px 16px;
            border: 1px solid #ddd;
            border-radius: 24px;
            font-size: 15px;
            outline: none;
            resize: none;
            font-family: inherit;
        }
        .chat-input:focus { border-color: #007aff; }
        .send-btn {
            padding: 12px 24px;
            background: #007aff;
            color: white;
            border: none;
            border-radius: 24px;
            font-size: 15px;
            cursor: pointer;
            transition: background 0.2s;
        }
        .send-btn:hover { background: #0056cc; }
        .send-btn:disabled { background: #ccc; cursor: not-allowed; }
        .typing-indicator {
            display: inline-block;
            width: 8px;
            height: 8px;
            background: #999;
            border-radius: 50%;
            animation: blink 1s infinite;
        }
        @keyframes blink {
            0%, 100% { opacity: 1; }
            50% { opacity: 0.3; }
        }
    </style>
</head>
<body>
    <div class="chat-container">
        <div class="chat-header">AI 流式对话</div>
        <div class="chat-messages" id="chatMessages"></div>
        <div class="chat-input-area">
            <textarea
                class="chat-input"
                id="chatInput"
                placeholder="输入消息,按 Enter 发送..."
                rows="1"
            ></textarea>
            <button class="send-btn" id="sendBtn" onclick="sendMessage()">发送</button>
        </div>
    </div>

    <script>
        const chatMessages = document.getElementById('chatMessages');
        const chatInput = document.getElementById('chatInput');
        const sendBtn = document.getElementById('sendBtn');
        const conversationHistory = [];
        let isGenerating = false;

        // Enter 发送,Shift+Enter 换行
        chatInput.addEventListener('keydown', (e) => {
            if (e.key === 'Enter' && !e.shiftKey) {
                e.preventDefault();
                sendMessage();
            }
        });

        // 自动调整输入框高度
        chatInput.addEventListener('input', () => {
            chatInput.style.height = 'auto';
            chatInput.style.height = chatInput.scrollHeight + 'px';
        });

        function addMessage(role, content) {
            const div = document.createElement('div');
            div.className = `message ${role}`;
            div.innerHTML = `
                <div class="message-avatar">${role === 'user' ? 'U' : 'AI'}</div>
                <div class="message-content">${content}</div>
            `;
            chatMessages.appendChild(div);
            chatMessages.scrollTop = chatMessages.scrollHeight;
            return div;
        }

        async function sendMessage() {
            const userMessage = chatInput.value.trim();
            if (!userMessage || isGenerating) return;

            // 显示用户消息
            addMessage('user', escapeHtml(userMessage));
            chatInput.value = '';
            chatInput.style.height = 'auto';
            sendBtn.disabled = true;
            isGenerating = true;

            // 创建 AI 消息占位
            const aiMessageDiv = addMessage('assistant', '');
            const contentDiv = aiMessageDiv.querySelector('.message-content');
            contentDiv.innerHTML = '<span class="typing-indicator"></span>';

            conversationHistory.push({ role: 'user', content: userMessage });

            try {
                const response = await fetch('/api/chat/stream', {
                    method: 'POST',
                    headers: { 'Content-Type': 'application/json' },
                    body: JSON.stringify({
                        messages: [
                            { role: 'system', content: '你是一个有帮助的AI助手。回答简洁清晰。' },
                            ...conversationHistory,
                        ],
                    }),
                });

                if (!response.ok) {
                    throw new Error(`HTTP ${response.status}`);
                }

                const reader = response.body.getReader();
                const decoder = new TextDecoder('utf-8');
                let buffer = '';
                let fullContent = '';
                let isFirstChunk = true;

                while (true) {
                    const { done, value } = await reader.read();
                    if (done) break;

                    buffer += decoder.decode(value, { stream: true });
                    const lines = buffer.split('\n');
                    buffer = lines.pop() || '';

                    for (const line of lines) {
                        if (!line.startsWith('data: ')) continue;
                        const dataStr = line.slice(6);

                        if (dataStr === '[DONE]') break;

                        try {
                            const data = JSON.parse(dataStr);

                            if (data.type === 'content' && data.content) {
                                if (isFirstChunk) {
                                    contentDiv.innerHTML = '';
                                    isFirstChunk = false;
                                }
                                fullContent += data.content;
                                // 简单 Markdown 渲染
                                contentDiv.innerHTML = renderSimpleMarkdown(fullContent);
                                chatMessages.scrollTop = chatMessages.scrollHeight;
                            }

                            if (data.type === 'usage') {
                                console.log('Token 使用:', data.usage);
                            }

                            if (data.type === 'error') {
                                contentDiv.innerHTML = `错误: ${escapeHtml(data.message)}`;
                            }
                        } catch (e) {
                            // 忽略解析错误
                        }
                    }
                }

                conversationHistory.push({ role: 'assistant', content: fullContent });

            } catch (error) {
                contentDiv.innerHTML = `请求失败: ${escapeHtml(error.message)}`;
            } finally {
                isGenerating = false;
                sendBtn.disabled = false;
                chatInput.focus();
            }
        }

        function escapeHtml(text) {
            const div = document.createElement('div');
            div.textContent = text;
            return div.innerHTML;
        }

        function renderSimpleMarkdown(text) {
            // 简单的 Markdown 渲染(生产环境建议用 marked.js)
            let html = escapeHtml(text);

            // 代码块 ```
            html = html.replace(/```(\w*)\n([\s\S]*?)```/g, (_, lang, code) => {
                return `<pre><code class="language-${lang}">${code.trim()}</code></pre>`;
            });

            // 行内代码 `
            html = html.replace(/`([^`]+)`/g, '<code>$1</code>');

            // 加粗 **
            html = html.replace(/\*\*([^*]+)\*\*/g, '<strong>$1</strong>');

            // 换行
            html = html.replace(/\n/g, '<br>');

            return html;
        }
    </script>
</body>
</html>

5.3 完整流程串联

把整个流程串起来看一遍:

1. 用户在浏览器输入 "用Python写一个快速排序"
      │
      ▼
2. 前端 fetch POST /api/chat/stream
   请求体: {"messages": [{"role":"user","content":"用Python写一个快速排序"}]}
      │
      ▼
3. FastAPI 收到请求 → 调用 chat_stream_generator()
      │
      ▼
4. 生成器内部调用 OpenAI API (stream=True)
      │
      ▼
5. OpenAI 逐 token 返回 → 生成器 yield SSE 格式数据
      │
      ▼
6. StreamingResponse 通过 HTTP 分块传输发给浏览器
      │
      ▼
7. 前端 ReadableStream 逐块读取 → 解析 SSE → 提取 content
      │
      ▼
8. 打字机效果逐字渲染到页面
      │
      ▼
9. 收到 [DONE] → 流结束 → 对话历史更新

6. 常见问题与避坑指南

6.1 Nginx 反向代理缓冲问题

如果你用 Nginx 做反向代理,可能会发现流式输出变成了"一次性返回"——Nginx 默认会缓冲响应。解决方法:

location /api/chat/stream {
    proxy_pass http://backend:8000;
    proxy_buffering off;              # 关闭缓冲
    proxy_cache off;                  # 关闭缓存
    proxy_set_header Connection '';
    proxy_http_version 1.1;           # 使用 HTTP/1.1 长连接
    chunked_transfer_encoding on;     # 开启分块传输
}

6.2 中文乱码问题

SSE 数据中包含中文时,确保:

  1. 服务端 json.dumps 使用 ensure_ascii=False
  2. 响应头中设置正确的编码(FastAPI 默认 UTF-8,一般不需要额外设置)
  3. 前端 TextDecoder 使用 utf-8

6.3 连接超时

SSE 是长连接,如果 LLM 生成时间很长(比如超过 60 秒),可能会触发各种超时:

  • Nginx proxy_read_timeout:默认 60s,需要调大
  • AWS ALB 空闲超时:默认 60s,需要调大
  • 浏览器超时:一般不会主动断开 SSE 连接
location /api/chat/stream {
    proxy_read_timeout 300s;  # 5分钟超时
    proxy_send_timeout 300s;
}

6.4 跨域问题

如果前端和后端不在同一个域名下,需要配置 CORS:

from fastapi.middleware.cors import CORSMiddleware

app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:3000"],  # 前端地址
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

6.5 多个 chunk 同时到达

网络传输中,多个 SSE chunk 可能在同一个 TCP 包中到达。这就是为什么前端需要缓冲区——不能假设一次 read() 恰好对应一条 SSE 消息。

6.6 中途取消生成

用户可能想中途停止 AI 的生成。实现方式:

// 前端:用 AbortController 取消请求
const controller = new AbortController();

const response = await fetch('/api/chat/stream', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ messages }),
    signal: controller.signal,  // 关联 AbortController
});

// 用户点击"停止"按钮
document.getElementById('stopBtn').onclick = () => {
    controller.abort();  // 取消请求
};
# 后端:检测客户端断开
async def chat_stream_generator(messages, request: Request):
    for chunk in stream:
        if await request.is_disconnected():
            logger.info("客户端已断开,停止生成")
            break
        # ... 正常处理

6.7 用 curl 测试 SSE 端点

开发调试时,用 curl 可以直接查看 SSE 原始数据,非常方便:

# -N 禁用缓冲,否则 curl 会等数据全部到达才显示
# --no-buffer 效果相同
curl -N http://localhost:8000/api/chat/stream \
  -H "Content-Type: application/json" \
  -d '{"messages": [{"role": "user", "content": "你好"}]}'

# 输出示例:
# data: {"type":"content","content":"你好","finish_reason":null}
# data: {"type":"content","content":"!","finish_reason":null}
# data: {"type":"content","content":"有什么","finish_reason":null}
# data: [DONE]

-N / --no-buffer 是关键参数——没有它,curl 默认会缓冲输出,流式效果就看不到了。


7. 总结与学习建议

7.1 核心要点回顾

回头看这段学习经历,SSE 的核心知识可以浓缩为以下几点:

  1. SSE 是单向的服务器推送协议,基于 HTTP,比 WebSocket 简单,非常适合 AI 流式输出场景
  2. 协议格式很简单data: 内容\n\n,空行分隔消息,[DONE] 表示结束
  3. Python 异步生成器async def + yield)是实现 SSE 服务端的核心,不阻塞事件循环
  4. 前端有两种接收方式:EventSource(简单但有局限)和 Fetch + ReadableStream(灵活推荐)
  5. 逐字渲染用队列 + 定时器实现打字机效果,Markdown 内容用完整内容重新渲染

7.2 学习路径建议

第一步:理解 SSE 协议
  → 搞懂 data/event/id/retry 四个字段
  → 理解 HTTP 长连接和分块传输
  → 用 curl 测试 SSE 接口,看原始数据

第二步:写一个简单的 SSE 服务端
  → 用 FastAPI + StreamingResponse
  → 先不接 OpenAI,手动 yield 几条测试数据
  → 用浏览器 EventSource 接收验证

第三步:接入 OpenAI 流式 API
  → 把 OpenAI stream 的输出转成 SSE 格式
  → 处理 usage 信息、错误信息

第四步:完善前端
  → 用 Fetch + ReadableStream 替代 EventSource
  → 实现打字机效果
  → 处理 Markdown 渲染

第五步:生产化
  → 处理 Nginx 缓冲
  → 处理连接断开和超时
  → 添加日志和监控

7.3 与已有知识体系的关联

  • 本文是《OpenAI API 实战指南》第四章的深度展开,建议先阅读实战指南了解基础用法
  • SSE 的异步生成器是 Python asyncio 的典型应用场景,后续学习异步编程时可以回来看
  • 流式输出是 AI 对话应用的体验核心,后续学习 Agent 开发、RAG 系统时都会用到

7.4 我参考的资料


文档版本:v1.0 | 更新时间:2026-05-13 | 适用读者:已完成 OpenAI API 基础学习,希望深入理解流式输出原理和实现的 AI 应用开发者

本文为原创技术文档,,部分内容由AI辅助完成。如需转载,请注明出处。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐