引言

在当今的AI应用开发中,用户对响应速度的期待越来越高。传统的非流式输出方式需要等待大模型生成完整回答后才能返回,这通常会导致明显的延迟,严重影响用户体验。流式输出(Streaming)技术的出现,彻底改变了这一局面——用户可以实时看到AI正在"思考"和"打字"的过程,获得近乎自然的交互体验。

本文将深入讲解大模型流式输出的核心原理,对比SSE、WebSocket、HTTP Streaming等主流实现方案,并提供完整的Spring Boot实战代码,帮助Java程序员快速掌握这一关键技术。

一、流式输出核心原理

1.1 什么是流式输出

流式输出是指大模型在生成完整回答之前,就开始将已生成的Token逐个返回给客户端的技术。这种方式的核心优势在于:

非流式输出:
用户提问 → [等待30秒] → 完整回答一次性返回

流式输出:
用户提问 → [等待0.5秒] → "今" → "今天" → "今天天气" → ... → 完整回答

1.2 Token与流式输出

大模型的输出本质上是逐Token生成的。以GPT-4为例:

  • 输入文本被分解为一系列Token(如"今天天气很好"可能被分解为"今"、"天"、"天"、"气"、"很"、"好"等Token)
  • 模型每次推理生成一个Token
  • 生成的Token立即通过流式方式传输给客户端
  • 客户端接收到Token后立即渲染,无需等待完整回答

// Token流示例
Token 1: "今"
Token 2: "天"
Token 3: "天"
Token 4: "气"
Token 5: "很"
Token 6: "好"
Token 7: "。"  // done=true,表示生成完成

[java]

1.3 流式传输的核心价值

维度

非流式

流式输出

**首Token延迟**

30秒+

<500毫秒

**用户体验**

等待焦虑

即时反馈

**交互感知**

机械感强

流畅自然

**技术复杂度**

简单

中等

**网络效率**

低(完整响应)

高(逐步传输)

**适用场景**

简单API调用

实时交互

二、流式输出技术方案对比

2.1 Server-Sent Events (SSE)

SSE是一种基于HTTP的服务器推送技术,允许服务器主动向客户端发送数据。

![SSE工作流程](./sse_workflow.png)

SSE工作原理

┌─────────┐                    ┌─────────┐                    ┌─────────┐
│  Client │                    │ Gateway │                    │ LLM Svc │
└────┬────┘                    └────┬────┘                    └────┬────┘
     │                              │                              │
     │──── POST /stream ───────────>│                              │
     │                              │──── HTTP Stream ────────────>│
     │                              │                              │
     │                              │<─── Token 1 ────────────────│
     │<─ data:{"t":"今"} ──────────│                              │
     │                              │<─── Token 2 ────────────────│
     │<─ data:{"t":"天"} ──────────│                              │
     │                              │                              │
     │                              │<─── Token N ────────────────│
     │<─ data:{"t":"好"} ──────────│                              │
     │<─ event:done ────────────────│                              │

SSE格式详解

Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive

data: {"token": "今", "index": 0}

data: {"token": "天", "index": 1}

data: {"token": "气", "index": 2}

data: {"token": "好", "done": true}

SSE代码示例

@RestController
@RequestMapping("/api/llm")
public class SSEController {

    @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter streamChat(@RequestParam String message) {
        // 创建SseEmitter,-1L表示不设置超时
        SseEmitter emitter = new SseEmitter(-1L);

        // 异步处理流式输出
        CompletableFuture.runAsync(() -> {
            try {
                chatService.streamTokens(message, token -> {
                    try {
                        // 发送每个Token
                        emitter.send(SseEmitter.event()
                            .name("message")
                            .data("{\"token\":\"" + token + "\"}"));
                    } catch (IOException e) {
                        emitter.completeWithError(e);
                    }
                });
                // 发送完成信号
                emitter.send(SseEmitter.event()
                    .name("done")
                    .data("{\"done\":true}"));
                emitter.complete();
            } catch (Exception e) {
                emitter.completeWithError(e);
            }
        });

        // 超时回调
        emitter.onTimeout(() -> {
            System.out.println("SSE连接超时");
        });

        // 完成回调
        emitter.onCompletion(() -> {
            System.out.println("SSE传输完成");
        });

        return emitter;
    }
}

[java]

2.2 WebSocket全双工通信

WebSocket提供了真正的全双工通信能力,适用于需要频繁双向交互的场景。

WebSocket优势

┌─────────────────────────────────────────────────────────────┐
│                      WebSocket                              │
├─────────────────────────────────────────────────────────────┤
│  ✓ 全双工通信:客户端和服务器可同时发送数据                    │
│  ✓ 低延迟:无需HTTP握手开销,建立连接后可直接传输              │
│  ✓ 支持二进制:可以传输图片、音频等二进制数据                  │
│  ✓ 适用于:实时对话、多人协作、游戏等场景                     │
│  ✗ 实现复杂度较高                                           │
│  ✗ 服务器资源占用较大                                       │
└─────────────────────────────────────────────────────────────┘

WebSocket代码示例

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(chatWebSocketHandler(), "/ws/chat")
            .setAllowedOrigins("*")
            .addInterceptors(new HttpSessionHandshakeInterceptor());
    }
}

@Component
public class ChatWebSocketHandler extends TextWebSocketHandler {

    private final ChatService chatService;
    private final ConcurrentHashMap<String, WebSocketSession> sessions = new ConcurrentHashMap<>();

    @Override
    public void afterConnectionEstablished(WebSocketSession session) {
        sessions.put(session.getId(), session);
        System.out.println("WebSocket连接建立: " + session.getId());
    }

    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) {
        try {
            // 解析客户端消息
            ChatRequest request = objectMapper.readValue(
                message.getPayload(), ChatRequest.class);

            // 异步处理并流式返回
            chatService.streamChat(request.getMessage(), token -> {
                try {
                    // 发送Token到客户端
                    String response = "{\"token\":\"" + token + "\"}";
                    session.sendMessage(new TextMessage(response));
                } catch (IOException e) {
                    session.close(CloseStatus.SERVER_ERROR);
                }
            });

            // 发送完成信号
            session.sendMessage(new TextMessage("{\"done\":true}"));

        } catch (Exception e) {
            try {
                session.sendMessage(new TextMessage(
                    "{\"error\":\"" + e.getMessage() + "\"}"));
            } catch (IOException ex) {
                session.close(CloseStatus.SERVER_ERROR);
            }
        }
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
        sessions.remove(session.getId());
        System.out.println("WebSocket连接关闭: " + session.getId());
    }
}

[java]

2.3 HTTP Chunked Transfer

HTTP分块传输编码(Chunked Transfer Encoding)是另一种实现流式输出的方式。

分块传输原理

HTTP/1.1 200 OK
Content-Type: application/octet-stream
Transfer-Encoding: chunked

19\r\n
{"token": "今", "idx": 0}\r\n
19\r\n
{"token": "天", "idx": 1}\r\n
1B\r\n
{"token": "天气", "idx": 2}\r\n
0\r\n
\r\n

[http]

Java实现

@GetMapping("/http-stream")
public ResponseEntity<StreamingResponseBody> httpStream(
        @RequestParam String message) {

    StreamingResponseBody responseBody = outputStream -> {
        PrintWriter writer = new PrintWriter(
            new OutputStreamWriter(outputStream, StandardCharsets.UTF_8));

        chatService.streamTokens(message, token -> {
            // 发送每个Token
            String chunk = "{\"token\":\"" + token + "\"}\n";
            writer.print(Integer.toHexString(chunk.length()) + "\r\n");
            writer.print(chunk + "\r\n");
            writer.flush();
        });

        // 发送结束块
        writer.print("0\r\n\r\n");
        writer.flush();
    };

    return ResponseEntity.ok()
        .contentType(MediaType.APPLICATION_OCTET_STREAM)
        .header("Transfer-Encoding", "chunked")
        .body(responseBody);
}

[java]

2.4 方案选型建议

![流式输出实现方案对比](./stream_comparison.png)

特性

SSE

WebSocket

HTTP Chunked

**浏览器支持**

原生

原生

原生

**实现复杂度**

**双向通信**

否(单向)

否(单向)

**服务器开销**

**断线重连**

自动

需手动

需手动

**适用场景**

问答、新闻推送

实时对话、协作

文件下载、日志推送

**推荐指数**

⭐⭐⭐⭐⭐

⭐⭐⭐

⭐⭐⭐

结论:对于大多数LLM应用场景,SSE是最佳选择——实现简单、浏览器原生支持、自动重连机制完善。

三、Spring Boot流式输出实战

3.1 项目结构

src/main/java/com/example/llmstreaming/
├── config/
│   ├── AsyncConfig.java          # 异步线程池配置
│   ├── WebSocketConfig.java      # WebSocket配置
│   └── CorsConfig.java           # 跨域配置
├── controller/
│   ├── SSEController.java        # SSE流式接口
│   └── WebSocketController.java  # WebSocket接口
├── service/
│   ├── StreamingChatService.java # 核心流式服务
│   └── LLMClient.java            # LLM客户端封装
├── model/
│   ├── ChatRequest.java
│   ├── ChatResponse.java
│   └── StreamToken.java
└── LlmStreamingApplication.java

3.2 异步线程池配置

流式输出需要异步处理,合理配置线程池至关重要:

@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {

    @Bean(name = "streamingExecutor")
    public Executor streamingExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(50);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("streaming-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setKeepAliveSeconds(60);
        executor.initialize();
        return executor;
    }

    @Override
    public Executor getAsyncExecutor() {
        return streamingExecutor();
    }
}

[java]

3.3 LLM客户端封装

@Service
public class LLMClient {

    private final RestTemplate restTemplate;
    private final ObjectMapper objectMapper;

    public LLMClient() {
        this.restTemplate = new RestTemplateBuilder()
            .setConnectTimeout(Duration.ofSeconds(10))
            .setReadTimeout(Duration.ofSeconds(60))
            .build();
        this.objectMapper = new ObjectMapper();
    }

    /**
     * 流式调用LLM API
     * @param prompt 提示词
     * @param onToken 回调函数,每个Token生成时调用
     */
    public void streamChat(String prompt, Consumer<String> onToken) {
        // 根据不同LLM API实现,这里以OpenAI为例
        String apiUrl = "https://api.openai.com/v1/chat/completions";

        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON);
        headers.setBearerAuth(System.getenv("OPENAI_API_KEY"));

        Map<String, Object> requestBody = new HashMap<>();
        requestBody.put("model", "gpt-4");
        requestBody.put("messages", List.of(
            Map.of("role", "user", "content", prompt)
        ));
        requestBody.put("stream", true);  // 关键:启用流式输出

        HttpEntity<Map<String, Object>> entity = new HttpEntity<>(requestBody, headers);

        // 使用WebClient进行流式请求
        WebClient webClient = WebClient.create(apiUrl);

        webClient.post()
            .uri(apiUrl)
            .headers(h -> h.setBearerAuth(System.getenv("OPENAI_API_KEY")))
            .contentType(MediaType.APPLICATION_JSON)
            .bodyValue(requestBody)
            .retrieve()
            .bodyToFlux(String.class)
            .subscribe(
                chunk -> {
                    // 解析SSE数据
                    String token = parseToken(chunk);
                    if (token != null) {
                        onToken.accept(token);
                    }
                },
                error -> System.err.println("Stream error: " + error),
                () -> System.out.println("Stream completed")
            );
    }

    /**
     * 解析SSE格式的token
     */
    private String parseToken(String sseData) {
        try {
            // data: {"choices":[{"delta":{"content":"xxx"}}]}\n\n
            if (sseData.startsWith("data: ")) {
                String json = sseData.substring(6);
                if ("[DONE]".equals(json.trim())) {
                    return null;
                }
                Map<String, Object> data = objectMapper.readValue(json, Map.class);
                List<Map<String, Object>> choices = (List<Map<String, Object>>) data.get("choices");
                if (choices != null && !choices.isEmpty()) {
                    Map<String, Object> delta = (Map<String, Object>) choices.get(0).get("delta");
                    if (delta != null) {
                        return (String) delta.get("content");
                    }
                }
            }
        } catch (Exception e) {
            // 忽略解析错误
        }
        return null;
    }
}

[java]

3.4 流式聊天服务

@Service
@Slf4j
public class StreamingChatService {

    private final LLMClient llmClient;

    public StreamingChatService(LLMClient llmClient) {
        this.llmClient = llmClient;
    }

    /**
     * 流式聊天主方法
     */
    public void streamChat(String userMessage, Consumer<String> onToken) {
        log.info("开始流式处理用户消息: {}", userMessage);

        // 构建提示词
        String prompt = buildPrompt(userMessage);

        // 调用LLM流式接口
        llmClient.streamChat(prompt, token -> {
            // 回调处理每个Token
            onToken.accept(token);
        });
    }

    /**
     * 带上下文的流式聊天
     */
    public void streamChatWithContext(
            String userMessage,
            List<ChatMessage> history,
            Consumer<String> onToken) {

        String prompt = buildContextPrompt(userMessage, history);

        llmClient.streamChat(prompt, token -> {
            onToken.accept(token);
        });
    }

    private String buildPrompt(String userMessage) {
        return String.format("""
            你是一个专业的技术助手。请回答用户的问题。

            用户: %s
            """, userMessage);
    }

    private String buildContextPrompt(String userMessage, List<ChatMessage> history) {
        StringBuilder sb = new StringBuilder();
        sb.append("你是一个专业的技术助手。以下是之前的对话上下文:\n\n");

        for (ChatMessage msg : history) {
            sb.append(msg.getRole()).append(": ").append(msg.getContent()).append("\n");
        }

        sb.append("\n用户新问题: ").append(userMessage);
        return sb.toString();
    }
}

[java]

3.5 SSE控制器

@RestController
@RequestMapping("/api/chat")
@Slf4j
public class SSEController {

    private final StreamingChatService chatService;

    public SSEController(StreamingChatService chatService) {
        this.chatService = chatService;
    }

    /**
     * SSE流式聊天接口
     * 前端使用EventSource调用
     */
    @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter streamChat(
            @RequestParam String message,
            @RequestParam(required = false) String sessionId) {

        log.info("收到流式聊天请求: {}, sessionId: {}", message, sessionId);

        // 创建SseEmitter,设置超时为-1(不超时)
        SseEmitter emitter = new SseEmitter(-1L);

        // 使用异步线程池处理
        CompletableFuture.runAsync(() -> {
            try {
                StringBuilder fullResponse = new StringBuilder();

                chatService.streamChat(message, token -> {
                    try {
                        // 构建SSE格式数据
                        String sseData = String.format(
                            "data: {\"token\":\"%s\",\"content\":\"%s\"}\n\n",
                            escapeJson(token),
                            escapeJson(fullResponse.toString() + token)
                        );
                        emitter.send(sseData);
                    } catch (IOException e) {
                        log.error("发送SSE数据失败", e);
                        emitter.completeWithError(e);
                    }
                    fullResponse.append(token);
                });

                // 发送完成信号
                emitter.send("data: {\"done\":true,\"fullContent\":\"" +
                    escapeJson(fullResponse.toString()) + "\"}\n\n");

                emitter.complete();
                log.info("流式聊天完成,共发送{}个字符", fullResponse.length());

            } catch (Exception e) {
                log.error("流式聊天处理失败", e);
                emitter.completeWithError(e);
            }
        }, newThreadPoolExecutor());

        // 配置回调
        emitter.onTimeout(() -> log.warn("SSE连接超时"));
        emitter.onCompletion(() -> log.info("SSE传输完成"));
        emitter.onError(e -> log.error("SSE错误", e));

        return emitter;
    }

    /**
     * 带上下文的流式聊天
     */
    @GetMapping(value = "/stream/context", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter streamChatWithContext(
            @RequestParam String message,
            @RequestParam List<String> historyJson) {

        List<ChatMessage> history = parseHistory(historyJson);
        SseEmitter emitter = new SseEmitter(-1L);

        CompletableFuture.runAsync(() -> {
            try {
                StringBuilder fullResponse = new StringBuilder();

                chatService.streamChatWithContext(message, history, token -> {
                    try {
                        String sseData = String.format(
                            "data: {\"token\":\"%s\"}\n\n", escapeJson(token));
                        emitter.send(sseData);
                    } catch (IOException e) {
                        emitter.completeWithError(e);
                    }
                    fullResponse.append(token);
                });

                emitter.send("data: {\"done\":true}\n\n");
                emitter.complete();

            } catch (Exception e) {
                emitter.completeWithError(e);
            }
        });

        return emitter;
    }

    private Executor newThreadPoolExecutor() {
        return CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS,
            (runnable) -> {
                Thread t = new Thread(runnable, "sse-stream");
                t.setDaemon(true);
                return t;
            });
    }

    private String escapeJson(String text) {
        return text.replace("\\", "\\\\")
                   .replace("\"", "\\\"")
                   .replace("\n", "\\n")
                   .replace("\r", "\\r");
    }

    private List<ChatMessage> parseHistory(List<String> historyJson) {
        try {
            ObjectMapper mapper = new ObjectMapper();
            return historyJson.stream()
                .map(json -> {
                    try {
                        return mapper.readValue(json, ChatMessage.class);
                    } catch (Exception e) {
                        return null;
                    }
                })
                .filter(Objects::nonNull)
                .collect(Collectors.toList());
        } catch (Exception e) {
            return new ArrayList<>();
        }
    }
}

[java]

3.6 前端调用示例

使用EventSource(SSE)

class SSEClient {
    constructor(baseUrl) {
        this.baseUrl = baseUrl;
        this.eventSource = null;
        this.onMessage = null;
        this.onDone = null;
        this.onError = null;
    }

    /**
     * 发起流式聊天请求
     */
    connect(message, sessionId = null) {
        // 注意:EventSource只支持GET请求,且不支持传递JSON body
        let url = `${this.baseUrl}/api/chat/stream?message=${encodeURIComponent(message)}`;
        if (sessionId) {
            url += `&sessionId=${encodeURIComponent(sessionId)}`;
        }

        this.eventSource = new EventSource(url);

        this.eventSource.onmessage = (event) => {
            try {
                const data = JSON.parse(event.data);
                if (data.done) {
                    this.onDone && this.onDone(data.fullContent);
                } else if (this.onMessage) {
                    this.onMessage(data.token, data.content);
                }
            } catch (e) {
                console.error('解析SSE数据失败:', e);
            }
        };

        this.eventSource.onerror = (error) => {
            console.error('SSE连接错误:', error);
            this.onError && this.onError(error);
            this.close();
        };
    }

    close() {
        if (this.eventSource) {
            this.eventSource.close();
            this.eventSource = null;
        }
    }
}

// 使用示例
const client = new SSEClient('http://localhost:8080');

const messages = [];
let fullResponse = '';

client.onMessage = (token, content) => {
    fullResponse = content;
    // 更新UI,显示打字机效果
    document.getElementById('response').innerText = content;
};

client.onDone = (fullContent) => {
    console.log('响应完成:', fullContent);
    messages.push({ role: 'assistant', content: fullContent });
};

client.onError = (error) => {
    console.error('请求失败:', error);
};

client.connect('请介绍一下Java的Stream API');

[javascript]

使用Fetch API + ReadableStream

class FetchStreamClient {
    async streamChat(message) {
        const response = await fetch('/api/chat/stream', {
            method: 'GET',
            headers: {
                'Content-Type': 'application/json',
            },
            params: { message }
        });

        const reader = response.body.getReader();
        const decoder = new TextDecoder();
        let fullContent = '';

        while (true) {
            const { done, value } = await reader.read();
            if (done) break;

            const chunk = decoder.decode(value);
            // 解析SSE格式数据
            const lines = chunk.split('\n');

            for (const line of lines) {
                if (line.startsWith('data: ')) {
                    const data = line.substring(6);
                    try {
                        const parsed = JSON.parse(data);
                        if (parsed.done) {
                            return fullContent;
                        }
                        if (parsed.token) {
                            fullContent += parsed.token;
                            // 更新UI
                            this.updateDisplay(fullContent);
                        }
                    } catch (e) {
                        // 忽略解析错误
                    }
                }
            }
        }

        return fullContent;
    }

    updateDisplay(content) {
        document.getElementById('response').innerText = content;
    }
}

[javascript]

四、生产环境关键配置

4.1 Nginx配置

流式输出需要Nginx配置支持禁用缓冲:

server {
    listen 80;
    server_name your-domain.com;

    # 禁用代理缓冲,实现真正的流式传输
    location /api/chat/stream {
        proxy_pass http://127.0.0.1:8080;
        proxy_http_version 1.1;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;

        # 关键配置:禁用缓冲
        proxy_buffering off;
        proxy_cache off;
        chunked_transfer_encoding on;

        # 增加超时时间
        proxy_read_timeout 86400s;
        proxy_send_timeout 86400s;
    }
}

[nginx]

4.2 Spring Boot配置

# application.yml
server:
  tomcat:
    threads:
      max: 200
      min-spare: 10
    connection-timeout: 20000

spring:
  servlet:
    streaming:
      timeout: -1  # 流式接口不设置超时

chat:
  stream:
    buffer-size: 8192
    timeout-seconds: 3600

[yaml]

4.3 错误处理与重试

@Component
public class StreamErrorHandler {

    /**
     * 判断是否是可重试的错误
     */
    public boolean isRetryableError(Throwable error) {
        if (error instanceof TimeoutException) {
            return true;
        }
        if (error instanceof IOException) {
            String message = error.getMessage();
            return message != null && (
                message.contains("Connection reset") ||
                message.contains("Broken pipe")
            );
        }
        return false;
    }

    /**
     * 获取重试间隔(指数退避)
     */
    public long getRetryDelayMs(int attempt) {
        return Math.min(1000 * (long) Math.pow(2, attempt), 30000);
    }
}

[java]

4.4 限流与熔断

@Component
public class StreamRateLimiter {

    private final RateLimiter rateLimiter = RateLimiter.create(100.0); // 每秒100请求

    /**
     * 尝试获取令牌
     */
    public boolean tryAcquire(String userId) {
        return rateLimiter.tryAcquire();
    }

    /**
     * 获取等待时间
     */
    public double getWaitTimeSeconds() {
        return rateLimiter.estimateTimeToPermitPermits(1);
    }
}

@RestController
public class StreamController {

    private final StreamRateLimiter rateLimiter;

    @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public ResponseEntity<SseEmitter> streamChat(@RequestParam String message) {
        // 限流检查
        if (!rateLimiter.tryAcquire()) {
            return ResponseEntity.status(429).build();
        }

        // ... 后续处理
    }
}

[java]

五、性能优化与监控

5.1 连接池优化

@Configuration
public class WebClientConfig {

    @Bean
    public WebClient webClient() {
        ConnectionPool<ConnectionPool.Connection> connectionPool = ConnectionPool.builder()
            .maxConnections(500)
            .pendingAcquireTimeout(Duration.ofSeconds(60))
            .pendingAcquireInterval(Duration.ofMillis(100))
            .evictInBackground(Duration.ofSeconds(120))
            .build();

        return WebClient.builder()
            .clientConnector(new ReactorClientHttpConnector(
                HttpClient.from(HttpClient.create(connectionPool))
            ))
            .build();
    }
}

[java]

5.2 监控指标

@Configuration
public class StreamMetricsConfig {

    @Bean
    public MeterRegistry meterRegistry() {
        return new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
    }

    @Bean
    public StreamMetrics streamMetrics(MeterRegistry registry) {
        return new StreamMetrics(registry);
    }
}

@Component
public class StreamMetrics {

    private final Counter requestCounter;
    private final Counter tokenCounter;
    private final Timer latencyTimer;

    public StreamMetrics(MeterRegistry registry) {
        this.requestCounter = Counter.builder("llm.stream.requests")
            .description("流式请求总数")
            .register(registry);

        this.tokenCounter = Counter.builder("llm.stream.tokens")
            .description("发送的Token总数")
            .register(registry);

        this.latencyTimer = Timer.builder("llm.stream.latency")
            .description("流式响应延迟")
            .register(registry);
    }

    public void recordToken() {
        tokenCounter.increment();
    }

    public Timer.Sample startTimer() {
        return Timer.start();
    }

    public void recordLatency(Timer.Sample sample) {
        sample.stop(latencyTimer);
    }
}

[java]

5.3 内存管理

@Service
public class StreamSessionManager {

    private final ConcurrentHashMap<String, StreamSession> sessions = new ConcurrentHashMap<>();
    private final ScheduledExecutorService cleaner = Executors.newSingleThreadScheduledExecutor();

    @PostConstruct
    public void init() {
        // 每5分钟清理过期会话
        cleaner.scheduleAtFixedRate(() -> {
            long now = System.currentTimeMillis();
            sessions.entrySet().removeIf(entry ->
                now - entry.getValue().getLastActiveTime() > 30 * 60 * 1000
            );
        }, 5, 5, TimeUnit.MINUTES);
    }

    public void addSession(String sessionId, SseEmitter emitter) {
        sessions.put(sessionId, new StreamSession(emitter));
    }

    public void removeSession(String sessionId) {
        sessions.remove(sessionId);
    }
}

@Data
@AllArgsConstructor
class StreamSession {
    private SseEmitter emitter;
    private long lastActiveTime;
    private long tokenCount;

    public StreamSession(SseEmitter emitter) {
        this.emitter = emitter;
        this.lastActiveTime = System.currentTimeMillis();
        this.tokenCount = 0;
    }
}

[java]

六、常见问题与解决方案

6.1 首Token延迟过高

问题:连接建立后,首Token到达时间过长

排查步骤

  1. 检查LLM服务商API延迟
  2. 检查网络链路(DNS、代理)
  3. 检查服务端线程池配置

优化方案

// 使用连接预热
@Configuration
public class ConnectionWarmup {

    @PostConstruct
    public void warmup() {
        // 预建立LLM API连接
        webClient.get().uri("https://api.openai.com").retrieve().toBodilessEntity().block();
    }
}

[java]

6.2 SSE在某些浏览器中断连

问题:Chrome浏览器约50秒自动断开

解决方案

@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamChat() {
    SseEmitter emitter = new SseEmitter(60 * 1000L); // 50秒超时

    // 定期发送心跳,保持连接
    ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
    scheduler.scheduleAtFixedRate(() -> {
        try {
            emitter.send("event: ping\n\n");
        } catch (IOException e) {
            scheduler.shutdown();
        }
    }, 30, 30, TimeUnit.SECONDS);

    emitter.onCompletion(() -> scheduler.shutdown());
    emitter.onTimeout(() -> scheduler.shutdown());

    return emitter;
}

[java]

6.3 Token顺序错乱

问题:多个Token同时发送导致客户端接收顺序错乱

解决方案

// 每个Token带序号
private int tokenIndex = 0;
private final Object lock = new Object();

chatService.streamChat(message, token -> {
    synchronized (lock) {
        int currentIndex = tokenIndex++;
        String sseData = String.format(
            "data: {\"token\":\"%s\",\"index\":%d}\n\n",
            escapeJson(token),
            currentIndex
        );
        emitter.send(sseData);
    }
});

[java]

七、总结

本文详细介绍了大模型流式输出的完整技术方案:

核心要点

  1. 流式输出原理:Token逐个生成、即时传输、实时渲染
  2. 技术方案选择:SSE最适合大多数场景(实现简单、浏览器原生支持)
  3. Spring Boot实现:SseEmitter + 异步线程池 + 回调机制
  4. 生产环境配置:Nginx禁用缓冲、超时配置、限流熔断
  5. 性能优化:连接池预热、监控指标、内存管理

技术选型建议

场景

推荐方案

简单问答

SSE

实时对话

WebSocket

内容生成

SSE

多人协作

WebSocket

后续扩展

  • 结合上下文记忆实现多轮对话
  • 接入多种LLM服务商(OpenAI、Azure、Claude、本地模型)
  • 实现Token计数、费用统计、成本控制

掌握流式输出技术,将为你的AI应用带来质的体验提升!

---

作者:付雷刚(洛水石)

原创内容,转载需注明出处

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐