📚 本文属于《AI开发实战》系列第6篇

  • ✅ 已完成:系列一第1-5篇
  • 🔄 进行中:系列一第6篇 ← 当前
  • 📋 待开始:系列一第7-8篇

📌 前置知识:建议先阅读 第1篇第2篇第3篇第4篇第5篇


一、前言:为什么流式响应很重要

你有没有这种感觉:

问AI一个问题,等了5秒钟,屏幕一片空白;然后突然"唰"一下,整段回复同时出现。

这5秒钟的等待,用户体验是崩溃式的。

大语言模型生成内容需要时间——一个500字的回答,生成可能需要3-5秒。

流式响应(Streaming) 解决的就是这个问题:不是等AI全部生成完再显示,而是一个字一个字/一个词一个词的实时输出,让用户"看到AI在思考"。

模式 用户感受
非流式 等5秒 → 突然看到完整回复
流式 等0.5秒 → 开始看到回复 → 实时看到内容生成

流式响应不只是"快",是改变了用户体验的感知方式


二、流式响应原理

2.1 传统HTTP vs 流式响应

传统HTTP请求

客户端发送请求 → 服务器处理完成 → 返回完整响应
(等所有数据准备好,才开始传输)

流式响应(SSE/Streaming)

客户端发送请求 → 服务器开始处理 → 一边处理一边返回数据
(Server-Sent Events / HTTP Chunked Transfer)

2.2 AI API的流式原理

OpenAI/Claude等API的流式输出,基于HTTP chunked transfer encoding

服务器收到AI生成的一个词 → 立即通过HTTP chunk发送
↓  ↓  ↓  ↓  ↓  ↓  ↓
客户端逐步接收 → 逐步显示到界面

当你在ChatGPT里看到文字"一个一个出现",就是这种机制在起作用。

2.3 OpenAI流式 vs 非流式对比

# 非流式:等全部生成完才返回
response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "写一篇100字介绍AI的文章"}]
)
print(response.choices[0].message.content)
# 等3-5秒后,一次性输出全部内容
# 流式:逐步返回
stream = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "写一篇100字介绍AI的文章"}],
    stream=True  # 开启流式
)
for chunk in stream:
    print(chunk.choices[0].delta.content, end="", flush=True)
# 实时显示每个词/字

三、OpenAI流式响应实战

3.1 基础流式调用

import os
from openai import OpenAI

client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

def stream_chat(user_message: str):
    """流式对话"""
    stream = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "你是一个有帮助的AI助手。"},
            {"role": "user", "content": user_message}
        ],
        stream=True  # 开启流式
    )

    print("AI: ", end="", flush=True)
    full_response = ""

    for chunk in stream:
        if chunk.choices[0].delta.content:
            content = chunk.choices[0].delta.content
            print(content, end="", flush=True)
            full_response += content

    print()  # 换行
    return full_response

# 测试
stream_chat("用一句话解释什么是人工智能")

输出效果

AI: 人工智能(AI)是指...
     (文字逐步出现,不是等完整后才显示)

3.2 带打字机效果的流式封装

import time
import os
from openai import OpenAI

client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

class StreamingChat:
    def __init__(self):
        self.full_response = ""

    def stream_print(self, text: str, delay: float = 0.02):
        """模拟打字机效果(可选)"""
        for char in text:
            print(char, end="", flush=True)
            time.sleep(delay)
        print()

    def chat(self, user_message: str, use_stream: bool = True):
        """支持流式/非流式切换"""
        messages = [
            {"role": "system", "content": "你是一个简洁的AI助手。"},
            {"role": "user", "content": user_message}
        ]

        if use_stream:
            return self._stream_chat(messages)
        else:
            return self._normal_chat(messages)

    def _stream_chat(self, messages: list):
        """流式对话"""
        stream = client.chat.completions.create(
            model="gpt-4o",
            messages=messages,
            stream=True
        )

        self.full_response = ""
        print("AI: ", end="", flush=True)

        for chunk in stream:
            content = chunk.choices[0].delta.content
            if content:
                print(content, end="", flush=True)
                self.full_response += content

        print()
        return self.full_response

    def _normal_chat(self, messages: list):
        """非流式对话"""
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=messages
        )
        result = response.choices[0].message.content
        print(f"AI: {result}")
        return result

# 使用
chat = StreamingChat()
chat.chat("为什么天空是蓝色的?")

3.3 保存流式响应到变量

有时候流式显示给用户看,但程序本身也需要拿到完整回复:

def stream_and_save(user_message: str) -> str:
    """流式输出,同时保存完整响应"""
    stream = client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": user_message}],
        stream=True
    )

    full_response = ""
    print("AI: ", end="", flush=True)

    for chunk in stream:
        content = chunk.choices[0].delta.content
        if content:
            print(content, end="", flush=True)
            full_response += content

    print()
    return full_response  # 程序可以继续使用这个变量

# 保存后可用于后续处理
response_text = stream_and_save("解释一下什么是机器学习")
print(f"\n[已保存] 响应长度: {len(response_text)} 字")
# 后续:可以保存到数据库、传给下一个函数等

四、Claude流式响应实战

4.1 Claude流式调用

from anthropic import Anthropic

client = Anthropic()

def stream_claude(user_message: str):
    """Claude流式对话"""
    with client.messages.stream(
        model="claude-sonnet-4-20250514",
        max_tokens=1024,
        messages=[{"role": "user", "content": user_message}]
    ) as stream:
        for text in stream.text_stream:
            print(text, end="", flush=True)

# 测试
stream_claude("用三句话解释什么是深度学习")

4.2 Claude vs OpenAI流式对比

# OpenAI
stream = client.chat.completions.create(model="gpt-4o", messages=messages, stream=True)
for chunk in stream:
    content = chunk.choices[0].delta.content

# Claude(语法略有不同)
with client.messages.stream(model="claude-sonnet-4-20250514", messages=messages) as stream:
    for text in stream.text_stream:
        # text 是已解析的文本片段
        print(text, end="", flush=True)

五、前端实现:给用户看到"打字"效果

后端流式返回,前端也需要正确处理才能展示给用户。

5.1 fetch + ReadableStream(原生JS)

async function streamChat(message) {
    const response = await fetch('/api/chat', {
        method: 'POST',
        headers: {'Content-Type': 'application/json'},
        body: JSON.stringify({message: message})
    });

    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    const output = document.getElementById('output');

    while (true) {
        const {done, value} = await reader.read();
        if (done) break;

        const chunk = decoder.decode(value);
        output.textContent += chunk;  // 逐步显示
    }
}

5.2 Python后端 + Flask流式API

from flask import Flask, Response, request, jsonify
from openai import OpenAI
import os

app = Flask(__name__)
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

@app.route('/api/chat', methods=['POST'])
def chat():
    data = request.json
    user_message = data.get('message', '')

    def generate():
        stream = client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": user_message}],
            stream=True
        )

        for chunk in stream:
            content = chunk.choices[0].delta.content
            if content:
                yield f"data: {content}\n\n"  # SSE格式

    return Response(
        generate(),
        mimetype='text/event-stream',  # SSE MIME类型
        headers={
            'Cache-Control': 'no-cache',
            'Connection': 'keep-alive',
            'X-Accel-Buffering': 'no'  # Nginx关闭缓冲
        }
    )

5.3 前端Vue3示例

// ChatComponent.vue
<template>
  <div class="chat">
    <div ref="outputRef" class="output">{{ displayedText }}</div>
  </div>
</template>

<script setup>
import { ref, watch } from 'vue'

const outputRef = ref(null)
const displayedText = ref('')

async function sendMessage(message) {
  displayedText.value = 'AI: '
  const response = await fetch('/api/chat', {
    method: 'POST',
    headers: {'Content-Type': 'application/json'},
    body: JSON.stringify({message})
  })

  const reader = response.body.getReader()
  const decoder = new TextDecoder()

  while (true) {
    const {done, value} = await reader.read()
    if (done) break
    const chunk = decoder.decode(value)
    displayedText.value += chunk
  }
}
</script>

<style scoped>
.output {
  font-family: monospace;
  white-space: pre-wrap;
  padding: 1rem;
  background: #f5f5f5;
  border-radius: 8px;
  min-height: 100px;
}
</style>

六、常见问题与处理

6.1 网络中断:只收到部分内容

def safe_stream_chat(user_message: str) -> tuple[str, bool]:
    """
    流式对话,处理网络中断
    返回:(完整文本, 是否完整)
    """
    full_response = ""

    try:
        stream = client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": user_message}],
            stream=True
        )

        for chunk in stream:
            content = chunk.choices[0].delta.content
            if content:
                print(content, end="", flush=True)
                full_response += content

        return full_response, True  # 完整接收

    except Exception as e:
        print(f"\n[警告] 流式中断: {e}")
        print(f"[已接收] {len(full_response)} 字")
        return full_response, False  # 不完整

6.2 流式响应被缓存

后端配置需要防止Nginx/代理层缓存流式响应:

# Flask端
return Response(
    generate(),
    mimetype='text/event-stream',
    headers={
        'Cache-Control': 'no-cache, no-store, must-revalidate',
        'Pragma': 'no-cache',
        'X-Accel-Buffering': 'no',  # Nginx关闭缓冲
    }
)

6.3 控制台输出乱序

使用 end=""flush=True 确保逐字输出:

# ❌ 错误:所有内容会等循环结束后才一起输出
for chunk in stream:
    print(chunk)  # print默认换行,且可能被缓冲

# ✅ 正确:逐字实时显示
for chunk in stream:
    print(chunk, end="", flush=True)  # 不换行,立即输出

6.4 判断流式是否结束

full_response = ""
chunk_count = 0

for chunk in stream:
    content = chunk.choices[0].delta.content
    if content:
        full_response += content
        chunk_count += 1

print(f"\n[完成] 共 {chunk_count} 个片段,{len(full_response)} 字")

七、性能对比:流式 vs 非流式

指标 非流式 流式
用户感知等待时间 3-5秒(白屏) <1秒(开始显示)
完整内容展示时间 相同 相同
用户体验评分 ⭐⭐ ⭐⭐⭐⭐⭐
代码复杂度 ⭐⭐⭐
API调用成本 相同 相同

结论:流式响应不节省时间,不节省成本,但用户体验提升显著。


八、总结

技能 关键代码
OpenAI流式 stream=True + for chunk in stream
Claude流式 with client.messages.stream() + for text in stream.text_stream
打字机效果 print(char, end="", flush=True)
保存完整响应 拼接 delta.content 到变量
Flask流式API Response(generate(), mimetype='text/event-stream')

更多内容

如果你对AI开发、Agent实战感兴趣,欢迎关注我的公众号【码头码农】:

  • 每日AI热点解读
  • 实战项目复盘
  • 技术成长心得

前一篇《大模型成本优化:从0.1元到0.01元的优化之路》


本文为《AI开发实战》系列课程 · 系列一:大模型应用开发入门 · 第6篇

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐