流式响应 (SSE) 的坑与最佳实践:处理断流、重试和错误码
你的 AI 应用生成速度很快,但用户盯着空白屏幕等了 8 秒才看到第一个字——这不是模型慢,是你没开流式输出。这篇文章把流式接入的坑全踩一遍,让你少走弯路。
为什么要用流式输出?
对话模型生成文本是逐 token 输出的,默认的非流式接口会等模型把整段回复都生成完,才一次性返回给你。生成 500 个 token 的回复,你就要等 5-10 秒的白屏。
流式输出(Streaming)的改变是:模型生成一个 token,立刻发给你一个,前端看到文字一个字一个字地蹦出来,这就是 ChatGPT 那种打字机效果。
核心收益:
- 首 token 延迟从 5-10 秒降到 0.5-1 秒:用户几乎立刻看到响应,心理等待感大幅降低
- 长文本体验完全不同:2000 字的文章,流式是"看着它写出来",非流式是"等 20 秒然后突然全出来"
- 提前中止:用户可以看到方向不对就立刻停止,不用等模型把无用内容全部生成完
SSE 协议基础
流式输出底层用的是 SSE(Server-Sent Events),这是一个基于 HTTP 的单向推送协议,比 WebSocket 轻得多,用普通 GET/POST 请求就能建立持久连接。
响应格式长这样:
HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"你"},"index":0}]}
data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"好"},"index":0}]}
data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"!"},"index":0}]}
data: [DONE]
几个关键点:
data:前缀:每一行数据都以data:开头(注意有个空格)- 空行分隔:每条消息后面跟一个空行(
\n\n) [DONE]结束标记:流结束时服务端发一条data: [DONE],客户端收到就知道该收工了Content-Type: text/event-stream:响应头必须是这个,否则浏览器不会识别为 SSE 流
Python 流式调用
用 openai 库,只需加一个 stream=True:
from openai import OpenAI
client = OpenAI(
base_url="https://api.therouter.ai/v1",
api_key="你的 API Key",
)
stream = client.chat.completions.create(
model="anthropic/claude-sonnet-4",
messages=[
{"role": "user", "content": "写一篇 500 字的短文,介绍量子计算的基本原理"}
],
max_tokens=1024,
stream=True, # 就这一行
)
for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
print(delta.content, end="", flush=True)
print() # 最后换行
flush=True 很重要——Python 的 stdout 有缓冲,不加这个,你会看到文字攒够一批才一起输出,流式效果打折。
带异常处理的完整版
from openai import OpenAI, APIStatusError, APIConnectionError
import time
client = OpenAI(
base_url="https://api.therouter.ai/v1",
api_key="你的 API Key",
)
def stream_with_retry(messages, model="anthropic/claude-sonnet-4", max_retries=3):
for attempt in range(max_retries):
try:
stream = client.chat.completions.create(
model=model,
messages=messages,
max_tokens=2048,
stream=True,
)
full_content = ""
for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
print(delta.content, end="", flush=True)
full_content += delta.content
print()
return full_content
except APIConnectionError as e:
# 网络断了,等一下重试
wait = 2 ** attempt
print(f"\n[连接中断,{wait}s 后重试 ({attempt+1}/{max_retries})]")
time.sleep(wait)
except APIStatusError as e:
if e.status_code in (429, 500, 502, 503):
# 限流或服务端临时错误,退避重试
wait = 2 ** attempt
print(f"\n[{e.status_code} 错误,{wait}s 后重试]")
time.sleep(wait)
else:
# 4xx 客户端错误,不重试
raise
raise RuntimeError(f"超过最大重试次数 {max_retries}")
stream_with_retry([{"role": "user", "content": "介绍一下 Rust 的所有权模型"}])
Node.js 流式调用
import OpenAI from "openai";
const client = new OpenAI({
baseURL: "https://api.therouter.ai/v1",
apiKey: "你的 API Key",
});
async function streamChat(prompt: string) {
const stream = await client.chat.completions.create({
model: "anthropic/claude-sonnet-4",
messages: [{ role: "user", content: prompt }],
max_tokens: 1024,
stream: true,
});
let fullContent = "";
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content ?? "";
if (content) {
process.stdout.write(content);
fullContent += content;
}
// 检查结束原因
const finishReason = chunk.choices[0]?.finish_reason;
if (finishReason === "length") {
console.warn("\n[警告] 输出因 max_tokens 限制被截断");
}
}
console.log();
return fullContent;
}
streamChat("用 Node.js 实现一个简单的事件总线");
前端接收 SSE:fetch + ReadableStream
浏览器端不要用 EventSource——它只支持 GET 请求,而 AI API 用 POST。用 fetch + ReadableStream 来手动解析:
async function streamToUI(prompt: string, onToken: (token: string) => void) {
const response = await fetch("https://api.therouter.ai/v1/chat/completions", {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${API_KEY}`,
},
body: JSON.stringify({
model: "anthropic/claude-sonnet-4",
messages: [{ role: "user", content: prompt }],
stream: true,
}),
});
if (!response.ok) {
const err = await response.json();
throw new Error(`API 错误 ${response.status}: ${err.error?.message}`);
}
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
// 将新数据追加到缓冲区(chunk 可能在消息边界被截断)
buffer += decoder.decode(value, { stream: true });
// 按行解析
const lines = buffer.split("\n");
// 最后一行可能不完整,留到下次处理
buffer = lines.pop() ?? "";
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed || !trimmed.startsWith("data:")) continue;
const data = trimmed.slice(5).trim();
if (data === "[DONE]") return;
try {
const parsed = JSON.parse(data);
const token = parsed.choices?.[0]?.delta?.content;
if (token) onToken(token);
} catch {
// 忽略解析失败的行(心跳包等)
}
}
}
}
// React 中使用:
// streamToUI(prompt, (token) => setContent(prev => prev + token))
这里有两个细节值得注意:
- buffer 机制:网络传输不保证一条 SSE 消息整块到达,
chunk可能在data:和 JSON 之间被截断。必须用 buffer 拼接后再按\n切割。 TextDecoder的stream: true:告诉解码器这不是最后一块,遇到多字节 UTF-8 字符(比如中文)在边界被截断时不会乱码。
常见问题排查
问题一:断流——生成到一半就停了
现象:模型开始输出了,但在某个位置突然没了,[DONE] 也没来。
原因:
- 代理/VPN 超时:很多代理默认 60 秒超时,AI 生成长文本可能超过这个时间
- Nginx 反代超时:默认
proxy_read_timeout 60s,流式请求必须调大 - 客户端代码没处理
done: true:reader.read()正常结束但你的循环没退出
Nginx 配置修复:
location /v1/ {
proxy_pass http://backend;
proxy_read_timeout 300s; # 调大到 5 分钟
proxy_buffering off; # 关闭缓冲,否则不是真流式
proxy_cache off;
# SSE 必需的响应头
proxy_set_header Connection '';
proxy_http_version 1.1;
chunked_transfer_encoding on;
}
问题二:HTTP 200 开始,但中途收到错误
现象:流正常开始,但某个 chunk 里收到的不是 token 而是错误信息。
这是流式接口的一个特殊情况:SSE 连接建立成功(200),但在流传输过程中模型服务出错了,错误会被编码进流里发过来,而不是通过 HTTP 状态码。
典型格式:
data: {"error":{"message":"Rate limit exceeded","type":"rate_limit_error","code":429}}
data: [DONE]
处理方式:
for chunk in stream:
# 检查 chunk 里是否有错误字段
if hasattr(chunk, 'error') and chunk.error:
raise Exception(f"流中错误: {chunk.error}")
delta = chunk.choices[0].delta
# 正常处理...
前端版本:
const parsed = JSON.parse(data);
// 先检查是否是错误包
if (parsed.error) {
throw new Error(`流错误: ${parsed.error.message}`);
}
const token = parsed.choices?.[0]?.delta?.content;
问题三:Nginx / Cloudflare 缓冲,“一次性输出”
现象:后端明明在流式输出,前端却看到等了很久然后全部内容一次性出现——和非流式效果一样。
根因:中间层缓冲了响应,等缓冲区满了才一次性刷出。
Nginx 端:如上面配置,proxy_buffering off 是关键。
Cloudflare 端:在 Cloudflare 控制台把对应路由的 Response Buffering 关掉;或者在响应头加 X-Accel-Buffering: no(Nginx 识别这个头也会关闭缓冲)。
在代码里设置响应头:
# FastAPI/后端示例
from fastapi.responses import StreamingResponse
async def stream_endpoint():
return StreamingResponse(
generate_stream(),
media_type="text/event-stream",
headers={
"X-Accel-Buffering": "no", # 告诉 Nginx 不缓冲
"Cache-Control": "no-cache",
}
)
问题四:finish_reason: length,内容被截断
模型还没写完,但达到了 max_tokens 上限,流就结束了,最后一个 chunk 的 finish_reason 是 "length" 而不是 "stop"。
务必在代码里检测并提示用户,或者自动增加 max_tokens 重试。
重试策略
流式请求的重试比普通请求复杂,因为你不知道已经接收了多少内容:
简单场景:全量重试
适合生成内容较短(< 500 tokens)或不需要保留已生成内容时:
def retry_stream(messages, max_retries=3):
for i in range(max_retries):
try:
return do_stream(messages)
except (ConnectionError, TimeoutError) as e:
if i == max_retries - 1:
raise
time.sleep(2 ** i) # 指数退避:1s, 2s, 4s
复杂场景:断点续传思路
如果已经流出了一半内容,重试时把已收到的内容作为上下文传回去,让模型从断点继续:
def stream_with_resume(user_message, max_retries=3):
accumulated = ""
for attempt in range(max_retries):
try:
if accumulated:
# 把已有内容作为 assistant 消息,让模型续写
messages = [
{"role": "user", "content": user_message},
{"role": "assistant", "content": accumulated + "(以下继续)\n"},
{"role": "user", "content": "请继续"},
]
else:
messages = [{"role": "user", "content": user_message}]
stream = client.chat.completions.create(
model="anthropic/claude-sonnet-4",
messages=messages,
stream=True,
)
for chunk in stream:
content = chunk.choices[0].delta.content or ""
accumulated += content
yield content
return # 正常结束
except Exception as e:
if attempt < max_retries - 1:
time.sleep(2 ** attempt)
else:
raise
注意:续写效果依赖模型理解上下文,不是所有场景都适用。对于需要完整性的内容(代码、JSON),续写后可能需要人工拼接验证。
TheRouter 的流式稳定性机制
直接打 Anthropic / OpenAI 官方 API,流式响应有几个不稳定点:网络抖动断流、上游过载导致流中断、长文本生成超时。
TheRouter 在网关层做了几件事来保证稳定性:
- 连接保活:网关和上游之间维持长连接,减少建立连接的开销
- 上游切换:当主用上游不稳定时,自动路由到备用通道,对客户端透明
- 超时宽松:流式请求的超时配置远高于普通请求,避免长文本生成被误杀
- 心跳注释行:对于生成速度慢的模型,网关每隔 15 秒发一个
: keepalive注释行,防止代理层因为无数据而关闭连接(注释行不会被 SSE 解析器当作数据处理)
调用方式和普通请求完全一样,加 stream=True 即可:
client = OpenAI(
base_url="https://api.therouter.ai/v1",
api_key="你的 TheRouter Key",
)
stream = client.chat.completions.create(
model="openai/gpt-4o", # 换模型也是一行的事
messages=[...],
stream=True,
)
小结
| 问题 | 原因 | 修复 |
|---|---|---|
| 断流 | 代理/Nginx 超时 | proxy_read_timeout 300s |
| 一次性输出 | 响应缓冲 | proxy_buffering off + X-Accel-Buffering: no |
| 流中错误 | 200 后出错 | 解析每个 chunk 检查 error 字段 |
| 内容截断 | max_tokens 不够 | 检查 finish_reason === "length" |
| chunk 解析乱码 | 多字节字符被截断 | TextDecoder 加 { stream: true } |
流式输出看起来只是加了一个参数,但落地到生产环境,中间件配置、错误处理、重试逻辑一个都不能少。希望这篇文章能帮你把这些坑一次性都填掉。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)