用户问了一个问题,AI思考了30秒,然后一次性吐出800字的回答。这30秒里,用户可能在怀疑:系统是不是卡了?网络是不是断了?我是不是白等了?

流式响应,就是解决这个问题的答案。本文将基于豆包大模型API,从零实现SSE流式输出,并深入探讨断点续传、性能监控、生产级错误处理等进阶话题。

一、为什么需要流式响应?

1.1 传统同步模式的三宗罪

问题 表现 用户感知
首字延迟高 等待全部内容生成完毕才返回 长时间白屏,以为系统卡死
内存占用大 长文本响应一次性加载到内存 服务器压力大,OOM风险
体验割裂 无法实现“打字机效果” 缺乏实时反馈,交互生硬

1.2 流式响应的核心指标对比

特性 同步响应 流式响应(SSE)
首字节时间(TTFB) 长(等待完整生成) 短(毫秒级返回首个token)
内存使用 高(一次性加载) 低(分批处理,逐块释放)
用户体验 差(长时间等待) 好(实时显示,类人对话)
错误处理 全部成功或全部失败 部分成功仍可返回
网络要求 高稳定性 容忍临时中断,支持断点续传

结论:流式响应不是锦上添花,而是AI交互场景的刚需。

二、技术原理:SSE与豆包API

2.1 为什么选择SSE而不是WebSocket?

协议 通信方向 协议复杂度 适用场景
SSE 单向(服务器→客户端) 低(基于HTTP) AI流式输出、实时通知
WebSocket 双向 高(需升级协议) 在线聊天、实时对战

AI模型API的典型交互模式是:客户端发送请求 → 服务器持续推送数据 → 结束。SSE天然适合这种一问多答的场景,且无需额外的协议升级,实现更简单。

2.2 豆包API流式响应格式

豆包API兼容OpenAI接口规范,通过stream: true参数启用流式输出。响应采用SSE标准格式:

data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"Hello"}}]}

data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":" world"}}]}

data: [DONE]

格式解析

        每个数据块以data:开头

        两个数据块之间用空行分隔

   [DONE]标记流结束

   delta字段包含增量内容

2.3 流式响应vs普通响应的请求差异

// 普通请求(无stream字段或stream:false)
{
  "model": "doubao-pro-4k",
  "messages": [...],
  "max_tokens": 2000
}

// 流式请求
{
  "model": "doubao-pro-4k",
  "messages": [...],
  "max_tokens": 2000,
  "stream": true          // ← 关键参数
}

三、核心实现:从零构建流式客户端

3.1 整体架构设计

┌─────────────────────────────────────────────────────────────┐
│                      客户端(前端/App)                       │
│                   EventSource / fetch stream                 │
└─────────────────────────┬───────────────────────────────────┘
                          │ HTTP/SSE
┌─────────────────────────▼───────────────────────────────────┐
│                  SpringBoot 服务层                           │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐      │
│  │   Controller │→│  DoubaoAdapter│→│ FluxSink     │      │
│  │  (SSE输出)   │  │  (HTTP调用)   │  │  (响应式流)  │      │
│  └──────────────┘  └──────────────┘  └──────────────┘      │
└─────────────────────────┬───────────────────────────────────┘
                          │ HTTPS + SSE
┌─────────────────────────▼───────────────────────────────────┐
│                    豆包大模型API                             │
│              (stream=true, text/event-stream)                │
└─────────────────────────────────────────────────────────────┘

3.2 完整实现代码

@Service
@Slf4j
public class DoubaoStreamAdapter {
    
    @Autowired
    private ObjectMapper objectMapper;
    
    /**
     * 流式调用豆包API
     * @return Flux<String> 每个元素是一个内容块
     */
    public Flux<String> callApiStream(ModelConfig config,
                                      List<Message> messages,
                                      Integer maxTokens) {
        return Flux.create(sink -> {
            String requestId = generateRequestId();
            log.info("开始流式调用 [{}], 模型: {}", requestId, config.getName());
            
            HttpURLConnection connection = null;
            try {
                // 1. 构建流式请求体(stream=true)
                Map<String, Object> requestBody = buildStreamRequest(config, messages, maxTokens);
                
                // 2. 创建支持流式读取的HTTP连接
                connection = createStreamingConnection(config);
                
                // 3. 发送请求
                try (OutputStream os = connection.getOutputStream()) {
                    os.write(objectMapper.writeValueAsBytes(requestBody));
                    os.flush();
                }
                
                // 4. 流式读取响应(关键)
                readSSEResponse(connection, sink, requestId);
                
                sink.complete();
                log.info("流式调用完成 [{}]", requestId);
                
            } catch (Exception e) {
                log.error("流式调用异常 [{}]", requestId, e);
                sink.error(e);
            } finally {
                if (connection != null) connection.disconnect();
            }
        });
    }
    
    /**
     * 创建支持流式读取的HTTP连接
     */
    private HttpURLConnection createStreamingConnection(ModelConfig config) throws IOException {
        URL url = new URL(buildApiUrl(config.getApiUrl()));
        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
        
        conn.setRequestMethod("POST");
        conn.setDoOutput(true);
        conn.setDoInput(true);
        
        // 关键超时配置
        conn.setConnectTimeout(30000);   // 30秒连接超时
        conn.setReadTimeout(300000);     // 5分钟读取超时(长文本场景)
        
        // 关键请求头
        conn.setRequestProperty("Content-Type", "application/json");
        conn.setRequestProperty("Authorization", "Bearer " + config.getApiKey());
        conn.setRequestProperty("Accept", "text/event-stream");  // 声明接受SSE
        conn.setRequestProperty("Cache-Control", "no-cache");
        
        return conn;
    }
    
    /**
     * 读取并解析SSE格式响应
     */
    private void readSSEResponse(HttpURLConnection conn,
                                 FluxSink<String> sink,
                                 String requestId) throws IOException {
        try (BufferedReader reader = new BufferedReader(
                new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8))) {
            
            String line;
            while ((line = reader.readLine()) != null && !sink.isCancelled()) {
                // SSE格式:data: {json}
                if (line.startsWith("data: ")) {
                    String data = line.substring(6);
                    
                    // 流结束标记
                    if ("[DONE]".equals(data.trim())) {
                        log.debug("收到结束标记 [{}]", requestId);
                        break;
                    }
                    
                    // 解析并提取内容块
                    String content = extractContentFromChunk(data);
                    if (content != null && !content.isEmpty()) {
                        sink.next(content);
                        log.trace("发送数据块 [{}]: {}", requestId, content);
                    }
                }
            }
        }
    }
    
    /**
     * 从SSE数据块中提取增量内容
     */
    private String extractContentFromChunk(String chunkData) {
        try {
            JsonNode node = objectMapper.readTree(chunkData);
            JsonNode delta = node.path("choices").get(0).path("delta");
            return delta.path("content").asText(null);
        } catch (Exception e) {
            log.warn("解析数据块失败: {}", chunkData, e);
            return null;
        }
    }
    
    /**
     * 构建流式请求体
     */
    private Map<String, Object> buildStreamRequest(ModelConfig config,
                                                   List<Message> messages,
                                                   Integer maxTokens) {
        Map<String, Object> body = new HashMap<>();
        body.put("model", config.getName());
        body.put("messages", messages);
        body.put("stream", true);           // 关键:开启流式
        body.put("max_tokens", maxTokens);
        body.put("temperature", 0.7);
        return body;
    }
    
    private String generateRequestId() {
        return UUID.randomUUID().toString().substring(0, 8);
    }
}

3.3 Controller层暴露SSE接口

@RestController
@RequestMapping("/api/doubao")
public class DoubaoController {
    
    @Autowired
    private DoubaoStreamAdapter doubaoAdapter;
    
    @PostMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<String>> streamChat(@RequestBody ChatRequest request) {
        return doubaoAdapter.callApiStream(
                request.getModelConfig(),
                request.getMessages(),
                request.getMaxTokens()
            )
            .map(chunk -> ServerSentEvent.<String>builder()
                .data(chunk)
                .event("message")
                .build())
            .doOnSubscribe(sub -> log.info("客户端已订阅"))
            .doOnError(err -> log.error("流异常", err))
            .onErrorResume(e -> Flux.just(
                ServerSentEvent.<String>builder()
                    .event("error")
                    .data("服务异常: " + e.getMessage())
                    .build()
            ));
    }
}

四、进阶特性

4.1 会话状态管理与断点续传

@Component
public class StreamingSessionManager {
    
    private final Map<String, StreamingSession> sessions = new ConcurrentHashMap<>();
    
    @Data
    public static class StreamingSession {
        private String sessionId;
        private List<String> receivedChunks = new ArrayList<>();
        private volatile boolean paused = false;
        private volatile boolean completed = false;
        private Instant lastActiveTime;
        
        public void addChunk(String chunk) {
            if (!paused) {
                receivedChunks.add(chunk);
                lastActiveTime = Instant.now();
            }
        }
        
        public String getFullContent() {
            return String.join("", receivedChunks);
        }
        
        public StreamingSession snapshot() {
            StreamingSession snapshot = new StreamingSession();
            snapshot.sessionId = this.sessionId;
            snapshot.receivedChunks = new ArrayList<>(this.receivedChunks);
            snapshot.completed = this.completed;
            return snapshot;
        }
        
        public void pause() { this.paused = true; }
        public void resume() { this.paused = false; }
    }
    
    public StreamingSession getOrCreate(String sessionId) {
        return sessions.computeIfAbsent(sessionId, id -> {
            StreamingSession session = new StreamingSession();
            session.setSessionId(id);
            return session;
        });
    }
}

4.2 智能重试与退避策略

public Flux<String> callWithRetry(ModelConfig config,
                                  List<Message> messages,
                                  int maxTokens) {
    return Flux.defer(() -> doubaoAdapter.callApiStream(config, messages, maxTokens))
        .retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
            .maxBackoff(Duration.ofSeconds(10))
            .jitter(0.5)
            .filter(this::isRetryableError)
            .doBeforeRetry(rs -> 
                log.warn("流式调用重试,第{}次", rs.totalRetries() + 1))
        );
}

private boolean isRetryableError(Throwable error) {
    return error instanceof SocketTimeoutException 
        || error instanceof ConnectException
        || (error instanceof HttpRetryException 
            && ((HttpRetryException) error).responseCode() >= 500);
}

4.3 实时监控指标

指标名 类型 含义 告警阈值
stream.request.count Counter 流式请求总数
stream.first_token.latency Timer 首token延迟 > 3秒
stream.token.latency Timer 单token平均延迟 > 200ms
stream.throughput Meter tokens/秒吞吐量 < 20
stream.error.rate Gauge 错误率 > 5%

实现示例:

@Component
public class StreamMetrics {
    private final Timer firstTokenTimer;
    private final Timer tokenTimer;
    private final Counter errorCounter;
    
    public StreamMetrics(MeterRegistry registry) {
        this.firstTokenTimer = Timer.builder("stream.first_token.latency")
            .description("首token延迟")
            .register(registry);
        this.tokenTimer = Timer.builder("stream.token.latency")
            .register(registry);
        this.errorCounter = Counter.builder("stream.error.count")
            .register(registry);
    }
    
    public void recordFirstToken(long latencyMs) {
        firstTokenTimer.record(latencyMs, TimeUnit.MILLISECONDS);
    }
}

五、最佳实践与避坑指南

5.1 超时配置建议

参数 推荐值 说明
connectTimeout 30秒 建立连接的超时时间
readTimeout 300秒 读取数据的超时时间(长文本场景需更大)
前端SSE超时 无限制 使用Heartbeat保持连接

5.2 常见问题排查

问题 可能原因 解决方案
首token延迟高 网络延迟 / 模型推理慢 检查网络链路,联系API提供商
流中断 读取超时 / 代理缓冲 调大readTimeout,禁用代理缓冲
数据解析失败 SSE格式异常 增加容错逻辑,跳过无效行
内存泄漏 Flux订阅未正确关闭 确保使用doFinally释放资源

5.3 安全注意事项

        API Key保护:仅在服务端存储和调用,严禁暴露给客户端

        输入过滤:对用户输入进行敏感词过滤

        输出审计:记录流式输出的内容,便于问题追溯

        速率限制:实现客户端级别的限流,防止滥用

六、总结

豆包大模型的流式响应改造,核心在于三点:

层次 关键动作
协议层 请求中设置stream: true,响应按SSE格式解析
传输层 使用HttpURLConnection + 长readTimeout,逐行读取
应用层 采用响应式编程(Flux/SSE),边读边推

流式响应不再是高级特性,而是AI应用的标配能力。掌握SSE + 响应式编程,是每一个后端开发者在AI时代的必修课。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐