你的 AI 应用生成速度很快,但用户盯着空白屏幕等了 8 秒才看到第一个字——这不是模型慢,是你没开流式输出。这篇文章把流式接入的坑全踩一遍,让你少走弯路。

为什么要用流式输出?

对话模型生成文本是逐 token 输出的,默认的非流式接口会等模型把整段回复都生成完,才一次性返回给你。生成 500 个 token 的回复,你就要等 5-10 秒的白屏。

流式输出(Streaming)的改变是:模型生成一个 token,立刻发给你一个,前端看到文字一个字一个字地蹦出来,这就是 ChatGPT 那种打字机效果。

核心收益:

  • 首 token 延迟从 5-10 秒降到 0.5-1 秒:用户几乎立刻看到响应,心理等待感大幅降低
  • 长文本体验完全不同:2000 字的文章,流式是"看着它写出来",非流式是"等 20 秒然后突然全出来"
  • 提前中止:用户可以看到方向不对就立刻停止,不用等模型把无用内容全部生成完

SSE 协议基础

流式输出底层用的是 SSE(Server-Sent Events),这是一个基于 HTTP 的单向推送协议,比 WebSocket 轻得多,用普通 GET/POST 请求就能建立持久连接。

响应格式长这样:

HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache

data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"你"},"index":0}]}

data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"好"},"index":0}]}

data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"!"},"index":0}]}

data: [DONE]

几个关键点:

  • data: 前缀:每一行数据都以 data: 开头(注意有个空格)
  • 空行分隔:每条消息后面跟一个空行(\n\n
  • [DONE] 结束标记:流结束时服务端发一条 data: [DONE],客户端收到就知道该收工了
  • Content-Type: text/event-stream:响应头必须是这个,否则浏览器不会识别为 SSE 流

Python 流式调用

openai 库,只需加一个 stream=True

from openai import OpenAI

client = OpenAI(
    base_url="https://api.therouter.ai/v1",
    api_key="你的 API Key",
)

stream = client.chat.completions.create(
    model="anthropic/claude-sonnet-4",
    messages=[
        {"role": "user", "content": "写一篇 500 字的短文,介绍量子计算的基本原理"}
    ],
    max_tokens=1024,
    stream=True,  # 就这一行
)

for chunk in stream:
    delta = chunk.choices[0].delta
    if delta.content:
        print(delta.content, end="", flush=True)

print()  # 最后换行

flush=True 很重要——Python 的 stdout 有缓冲,不加这个,你会看到文字攒够一批才一起输出,流式效果打折。

带异常处理的完整版

from openai import OpenAI, APIStatusError, APIConnectionError
import time

client = OpenAI(
    base_url="https://api.therouter.ai/v1",
    api_key="你的 API Key",
)

def stream_with_retry(messages, model="anthropic/claude-sonnet-4", max_retries=3):
    for attempt in range(max_retries):
        try:
            stream = client.chat.completions.create(
                model=model,
                messages=messages,
                max_tokens=2048,
                stream=True,
            )
            full_content = ""
            for chunk in stream:
                delta = chunk.choices[0].delta
                if delta.content:
                    print(delta.content, end="", flush=True)
                    full_content += delta.content
            print()
            return full_content

        except APIConnectionError as e:
            # 网络断了,等一下重试
            wait = 2 ** attempt
            print(f"\n[连接中断,{wait}s 后重试 ({attempt+1}/{max_retries})]")
            time.sleep(wait)

        except APIStatusError as e:
            if e.status_code in (429, 500, 502, 503):
                # 限流或服务端临时错误,退避重试
                wait = 2 ** attempt
                print(f"\n[{e.status_code} 错误,{wait}s 后重试]")
                time.sleep(wait)
            else:
                # 4xx 客户端错误,不重试
                raise

    raise RuntimeError(f"超过最大重试次数 {max_retries}")

stream_with_retry([{"role": "user", "content": "介绍一下 Rust 的所有权模型"}])

Node.js 流式调用

import OpenAI from "openai";

const client = new OpenAI({
  baseURL: "https://api.therouter.ai/v1",
  apiKey: "你的 API Key",
});

async function streamChat(prompt: string) {
  const stream = await client.chat.completions.create({
    model: "anthropic/claude-sonnet-4",
    messages: [{ role: "user", content: prompt }],
    max_tokens: 1024,
    stream: true,
  });

  let fullContent = "";

  for await (const chunk of stream) {
    const content = chunk.choices[0]?.delta?.content ?? "";
    if (content) {
      process.stdout.write(content);
      fullContent += content;
    }

    // 检查结束原因
    const finishReason = chunk.choices[0]?.finish_reason;
    if (finishReason === "length") {
      console.warn("\n[警告] 输出因 max_tokens 限制被截断");
    }
  }

  console.log();
  return fullContent;
}

streamChat("用 Node.js 实现一个简单的事件总线");

前端接收 SSE:fetch + ReadableStream

浏览器端不要用 EventSource——它只支持 GET 请求,而 AI API 用 POST。用 fetch + ReadableStream 来手动解析:

async function streamToUI(prompt: string, onToken: (token: string) => void) {
  const response = await fetch("https://api.therouter.ai/v1/chat/completions", {
    method: "POST",
    headers: {
      "Content-Type": "application/json",
      Authorization: `Bearer ${API_KEY}`,
    },
    body: JSON.stringify({
      model: "anthropic/claude-sonnet-4",
      messages: [{ role: "user", content: prompt }],
      stream: true,
    }),
  });

  if (!response.ok) {
    const err = await response.json();
    throw new Error(`API 错误 ${response.status}: ${err.error?.message}`);
  }

  const reader = response.body!.getReader();
  const decoder = new TextDecoder();
  let buffer = "";

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    // 将新数据追加到缓冲区(chunk 可能在消息边界被截断)
    buffer += decoder.decode(value, { stream: true });

    // 按行解析
    const lines = buffer.split("\n");
    // 最后一行可能不完整,留到下次处理
    buffer = lines.pop() ?? "";

    for (const line of lines) {
      const trimmed = line.trim();
      if (!trimmed || !trimmed.startsWith("data:")) continue;

      const data = trimmed.slice(5).trim();
      if (data === "[DONE]") return;

      try {
        const parsed = JSON.parse(data);
        const token = parsed.choices?.[0]?.delta?.content;
        if (token) onToken(token);
      } catch {
        // 忽略解析失败的行(心跳包等)
      }
    }
  }
}

// React 中使用:
// streamToUI(prompt, (token) => setContent(prev => prev + token))

这里有两个细节值得注意:

  1. buffer 机制:网络传输不保证一条 SSE 消息整块到达,chunk 可能在 data: 和 JSON 之间被截断。必须用 buffer 拼接后再按 \n 切割。
  2. TextDecoderstream: true:告诉解码器这不是最后一块,遇到多字节 UTF-8 字符(比如中文)在边界被截断时不会乱码。

常见问题排查

问题一:断流——生成到一半就停了

现象:模型开始输出了,但在某个位置突然没了,[DONE] 也没来。

原因

  • 代理/VPN 超时:很多代理默认 60 秒超时,AI 生成长文本可能超过这个时间
  • Nginx 反代超时:默认 proxy_read_timeout 60s,流式请求必须调大
  • 客户端代码没处理 done: truereader.read() 正常结束但你的循环没退出

Nginx 配置修复

location /v1/ {
    proxy_pass http://backend;
    proxy_read_timeout 300s;      # 调大到 5 分钟
    proxy_buffering off;          # 关闭缓冲,否则不是真流式
    proxy_cache off;
    # SSE 必需的响应头
    proxy_set_header Connection '';
    proxy_http_version 1.1;
    chunked_transfer_encoding on;
}

问题二:HTTP 200 开始,但中途收到错误

现象:流正常开始,但某个 chunk 里收到的不是 token 而是错误信息。

这是流式接口的一个特殊情况:SSE 连接建立成功(200),但在流传输过程中模型服务出错了,错误会被编码进流里发过来,而不是通过 HTTP 状态码。

典型格式:

data: {"error":{"message":"Rate limit exceeded","type":"rate_limit_error","code":429}}

data: [DONE]

处理方式:

for chunk in stream:
    # 检查 chunk 里是否有错误字段
    if hasattr(chunk, 'error') and chunk.error:
        raise Exception(f"流中错误: {chunk.error}")

    delta = chunk.choices[0].delta
    # 正常处理...

前端版本:

const parsed = JSON.parse(data);

// 先检查是否是错误包
if (parsed.error) {
  throw new Error(`流错误: ${parsed.error.message}`);
}

const token = parsed.choices?.[0]?.delta?.content;

问题三:Nginx / Cloudflare 缓冲,“一次性输出”

现象:后端明明在流式输出,前端却看到等了很久然后全部内容一次性出现——和非流式效果一样。

根因:中间层缓冲了响应,等缓冲区满了才一次性刷出。

Nginx 端:如上面配置,proxy_buffering off 是关键。

Cloudflare 端:在 Cloudflare 控制台把对应路由的 Response Buffering 关掉;或者在响应头加 X-Accel-Buffering: no(Nginx 识别这个头也会关闭缓冲)。

在代码里设置响应头:

# FastAPI/后端示例
from fastapi.responses import StreamingResponse

async def stream_endpoint():
    return StreamingResponse(
        generate_stream(),
        media_type="text/event-stream",
        headers={
            "X-Accel-Buffering": "no",   # 告诉 Nginx 不缓冲
            "Cache-Control": "no-cache",
        }
    )

问题四:finish_reason: length,内容被截断

模型还没写完,但达到了 max_tokens 上限,流就结束了,最后一个 chunk 的 finish_reason"length" 而不是 "stop"

务必在代码里检测并提示用户,或者自动增加 max_tokens 重试。


重试策略

流式请求的重试比普通请求复杂,因为你不知道已经接收了多少内容:

简单场景:全量重试

适合生成内容较短(< 500 tokens)或不需要保留已生成内容时:

def retry_stream(messages, max_retries=3):
    for i in range(max_retries):
        try:
            return do_stream(messages)
        except (ConnectionError, TimeoutError) as e:
            if i == max_retries - 1:
                raise
            time.sleep(2 ** i)  # 指数退避:1s, 2s, 4s

复杂场景:断点续传思路

如果已经流出了一半内容,重试时把已收到的内容作为上下文传回去,让模型从断点继续:

def stream_with_resume(user_message, max_retries=3):
    accumulated = ""

    for attempt in range(max_retries):
        try:
            if accumulated:
                # 把已有内容作为 assistant 消息,让模型续写
                messages = [
                    {"role": "user", "content": user_message},
                    {"role": "assistant", "content": accumulated + "(以下继续)\n"},
                    {"role": "user", "content": "请继续"},
                ]
            else:
                messages = [{"role": "user", "content": user_message}]

            stream = client.chat.completions.create(
                model="anthropic/claude-sonnet-4",
                messages=messages,
                stream=True,
            )

            for chunk in stream:
                content = chunk.choices[0].delta.content or ""
                accumulated += content
                yield content

            return  # 正常结束

        except Exception as e:
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)
            else:
                raise

注意:续写效果依赖模型理解上下文,不是所有场景都适用。对于需要完整性的内容(代码、JSON),续写后可能需要人工拼接验证。


TheRouter 的流式稳定性机制

直接打 Anthropic / OpenAI 官方 API,流式响应有几个不稳定点:网络抖动断流、上游过载导致流中断、长文本生成超时。

TheRouter 在网关层做了几件事来保证稳定性:

  • 连接保活:网关和上游之间维持长连接,减少建立连接的开销
  • 上游切换:当主用上游不稳定时,自动路由到备用通道,对客户端透明
  • 超时宽松:流式请求的超时配置远高于普通请求,避免长文本生成被误杀
  • 心跳注释行:对于生成速度慢的模型,网关每隔 15 秒发一个 : keepalive 注释行,防止代理层因为无数据而关闭连接(注释行不会被 SSE 解析器当作数据处理)

调用方式和普通请求完全一样,加 stream=True 即可:

client = OpenAI(
    base_url="https://api.therouter.ai/v1",
    api_key="你的 TheRouter Key",
)

stream = client.chat.completions.create(
    model="openai/gpt-4o",   # 换模型也是一行的事
    messages=[...],
    stream=True,
)

小结

问题 原因 修复
断流 代理/Nginx 超时 proxy_read_timeout 300s
一次性输出 响应缓冲 proxy_buffering off + X-Accel-Buffering: no
流中错误 200 后出错 解析每个 chunk 检查 error 字段
内容截断 max_tokens 不够 检查 finish_reason === "length"
chunk 解析乱码 多字节字符被截断 TextDecoder{ stream: true }

流式输出看起来只是加了一个参数,但落地到生产环境,中间件配置、错误处理、重试逻辑一个都不能少。希望这篇文章能帮你把这些坑一次性都填掉。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐