Java 程序员第 14 阶段:学习大模型流式输出,打造实时对话交互接口
引言
在当今的AI应用开发中,用户对响应速度的期待越来越高。传统的非流式输出方式需要等待大模型生成完整回答后才能返回,这通常会导致明显的延迟,严重影响用户体验。流式输出(Streaming)技术的出现,彻底改变了这一局面——用户可以实时看到AI正在"思考"和"打字"的过程,获得近乎自然的交互体验。
本文将深入讲解大模型流式输出的核心原理,对比SSE、WebSocket、HTTP Streaming等主流实现方案,并提供完整的Spring Boot实战代码,帮助Java程序员快速掌握这一关键技术。
一、流式输出核心原理
1.1 什么是流式输出
流式输出是指大模型在生成完整回答之前,就开始将已生成的Token逐个返回给客户端的技术。这种方式的核心优势在于:
非流式输出:
用户提问 → [等待30秒] → 完整回答一次性返回
流式输出:
用户提问 → [等待0.5秒] → "今" → "今天" → "今天天气" → ... → 完整回答
1.2 Token与流式输出
大模型的输出本质上是逐Token生成的。以GPT-4为例:
- 输入文本被分解为一系列Token(如"今天天气很好"可能被分解为"今"、"天"、"天"、"气"、"很"、"好"等Token)
- 模型每次推理生成一个Token
- 生成的Token立即通过流式方式传输给客户端
- 客户端接收到Token后立即渲染,无需等待完整回答
// Token流示例
Token 1: "今"
Token 2: "天"
Token 3: "天"
Token 4: "气"
Token 5: "很"
Token 6: "好"
Token 7: "。" // done=true,表示生成完成
[java]
1.3 流式传输的核心价值
|
维度 |
非流式 |
流式输出 |
|
**首Token延迟** |
30秒+ |
<500毫秒 |
|
**用户体验** |
等待焦虑 |
即时反馈 |
|
**交互感知** |
机械感强 |
流畅自然 |
|
**技术复杂度** |
简单 |
中等 |
|
**网络效率** |
低(完整响应) |
高(逐步传输) |
|
**适用场景** |
简单API调用 |
实时交互 |
二、流式输出技术方案对比

2.1 Server-Sent Events (SSE)
SSE是一种基于HTTP的服务器推送技术,允许服务器主动向客户端发送数据。

SSE工作原理
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Client │ │ Gateway │ │ LLM Svc │
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
│──── POST /stream ───────────>│ │
│ │──── HTTP Stream ────────────>│
│ │ │
│ │<─── Token 1 ────────────────│
│<─ data:{"t":"今"} ──────────│ │
│ │<─── Token 2 ────────────────│
│<─ data:{"t":"天"} ──────────│ │
│ │ │
│ │<─── Token N ────────────────│
│<─ data:{"t":"好"} ──────────│ │
│<─ event:done ────────────────│ │
SSE格式详解
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
data: {"token": "今", "index": 0}
data: {"token": "天", "index": 1}
data: {"token": "气", "index": 2}
data: {"token": "好", "done": true}
SSE代码示例
@RestController
@RequestMapping("/api/llm")
public class SSEController {
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamChat(@RequestParam String message) {
// 创建SseEmitter,-1L表示不设置超时
SseEmitter emitter = new SseEmitter(-1L);
// 异步处理流式输出
CompletableFuture.runAsync(() -> {
try {
chatService.streamTokens(message, token -> {
try {
// 发送每个Token
emitter.send(SseEmitter.event()
.name("message")
.data("{\"token\":\"" + token + "\"}"));
} catch (IOException e) {
emitter.completeWithError(e);
}
});
// 发送完成信号
emitter.send(SseEmitter.event()
.name("done")
.data("{\"done\":true}"));
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
});
// 超时回调
emitter.onTimeout(() -> {
System.out.println("SSE连接超时");
});
// 完成回调
emitter.onCompletion(() -> {
System.out.println("SSE传输完成");
});
return emitter;
}
}
[java]
2.2 WebSocket全双工通信
WebSocket提供了真正的全双工通信能力,适用于需要频繁双向交互的场景。
WebSocket优势
┌─────────────────────────────────────────────────────────────┐
│ WebSocket │
├─────────────────────────────────────────────────────────────┤
│ ✓ 全双工通信:客户端和服务器可同时发送数据 │
│ ✓ 低延迟:无需HTTP握手开销,建立连接后可直接传输 │
│ ✓ 支持二进制:可以传输图片、音频等二进制数据 │
│ ✓ 适用于:实时对话、多人协作、游戏等场景 │
│ ✗ 实现复杂度较高 │
│ ✗ 服务器资源占用较大 │
└─────────────────────────────────────────────────────────────┘
WebSocket代码示例
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(chatWebSocketHandler(), "/ws/chat")
.setAllowedOrigins("*")
.addInterceptors(new HttpSessionHandshakeInterceptor());
}
}
@Component
public class ChatWebSocketHandler extends TextWebSocketHandler {
private final ChatService chatService;
private final ConcurrentHashMap<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
@Override
public void afterConnectionEstablished(WebSocketSession session) {
sessions.put(session.getId(), session);
System.out.println("WebSocket连接建立: " + session.getId());
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) {
try {
// 解析客户端消息
ChatRequest request = objectMapper.readValue(
message.getPayload(), ChatRequest.class);
// 异步处理并流式返回
chatService.streamChat(request.getMessage(), token -> {
try {
// 发送Token到客户端
String response = "{\"token\":\"" + token + "\"}";
session.sendMessage(new TextMessage(response));
} catch (IOException e) {
session.close(CloseStatus.SERVER_ERROR);
}
});
// 发送完成信号
session.sendMessage(new TextMessage("{\"done\":true}"));
} catch (Exception e) {
try {
session.sendMessage(new TextMessage(
"{\"error\":\"" + e.getMessage() + "\"}"));
} catch (IOException ex) {
session.close(CloseStatus.SERVER_ERROR);
}
}
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
sessions.remove(session.getId());
System.out.println("WebSocket连接关闭: " + session.getId());
}
}
[java]
2.3 HTTP Chunked Transfer
HTTP分块传输编码(Chunked Transfer Encoding)是另一种实现流式输出的方式。
分块传输原理
HTTP/1.1 200 OK
Content-Type: application/octet-stream
Transfer-Encoding: chunked
19\r\n
{"token": "今", "idx": 0}\r\n
19\r\n
{"token": "天", "idx": 1}\r\n
1B\r\n
{"token": "天气", "idx": 2}\r\n
0\r\n
\r\n
[http]
Java实现
@GetMapping("/http-stream")
public ResponseEntity<StreamingResponseBody> httpStream(
@RequestParam String message) {
StreamingResponseBody responseBody = outputStream -> {
PrintWriter writer = new PrintWriter(
new OutputStreamWriter(outputStream, StandardCharsets.UTF_8));
chatService.streamTokens(message, token -> {
// 发送每个Token
String chunk = "{\"token\":\"" + token + "\"}\n";
writer.print(Integer.toHexString(chunk.length()) + "\r\n");
writer.print(chunk + "\r\n");
writer.flush();
});
// 发送结束块
writer.print("0\r\n\r\n");
writer.flush();
};
return ResponseEntity.ok()
.contentType(MediaType.APPLICATION_OCTET_STREAM)
.header("Transfer-Encoding", "chunked")
.body(responseBody);
}
[java]
2.4 方案选型建议

|
特性 |
SSE |
WebSocket |
HTTP Chunked |
|
**浏览器支持** |
原生 |
原生 |
原生 |
|
**实现复杂度** |
低 |
中 |
低 |
|
**双向通信** |
否(单向) |
是 |
否(单向) |
|
**服务器开销** |
低 |
高 |
低 |
|
**断线重连** |
自动 |
需手动 |
需手动 |
|
**适用场景** |
问答、新闻推送 |
实时对话、协作 |
文件下载、日志推送 |
|
**推荐指数** |
⭐⭐⭐⭐⭐ |
⭐⭐⭐ |
⭐⭐⭐ |
结论:对于大多数LLM应用场景,SSE是最佳选择——实现简单、浏览器原生支持、自动重连机制完善。
三、Spring Boot流式输出实战

3.1 项目结构
src/main/java/com/example/llmstreaming/
├── config/
│ ├── AsyncConfig.java # 异步线程池配置
│ ├── WebSocketConfig.java # WebSocket配置
│ └── CorsConfig.java # 跨域配置
├── controller/
│ ├── SSEController.java # SSE流式接口
│ └── WebSocketController.java # WebSocket接口
├── service/
│ ├── StreamingChatService.java # 核心流式服务
│ └── LLMClient.java # LLM客户端封装
├── model/
│ ├── ChatRequest.java
│ ├── ChatResponse.java
│ └── StreamToken.java
└── LlmStreamingApplication.java
3.2 异步线程池配置

流式输出需要异步处理,合理配置线程池至关重要:
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
@Bean(name = "streamingExecutor")
public Executor streamingExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("streaming-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setKeepAliveSeconds(60);
executor.initialize();
return executor;
}
@Override
public Executor getAsyncExecutor() {
return streamingExecutor();
}
}
[java]
3.3 LLM客户端封装
@Service
public class LLMClient {
private final RestTemplate restTemplate;
private final ObjectMapper objectMapper;
public LLMClient() {
this.restTemplate = new RestTemplateBuilder()
.setConnectTimeout(Duration.ofSeconds(10))
.setReadTimeout(Duration.ofSeconds(60))
.build();
this.objectMapper = new ObjectMapper();
}
/**
* 流式调用LLM API
* @param prompt 提示词
* @param onToken 回调函数,每个Token生成时调用
*/
public void streamChat(String prompt, Consumer<String> onToken) {
// 根据不同LLM API实现,这里以OpenAI为例
String apiUrl = "https://api.openai.com/v1/chat/completions";
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.setBearerAuth(System.getenv("OPENAI_API_KEY"));
Map<String, Object> requestBody = new HashMap<>();
requestBody.put("model", "gpt-4");
requestBody.put("messages", List.of(
Map.of("role", "user", "content", prompt)
));
requestBody.put("stream", true); // 关键:启用流式输出
HttpEntity<Map<String, Object>> entity = new HttpEntity<>(requestBody, headers);
// 使用WebClient进行流式请求
WebClient webClient = WebClient.create(apiUrl);
webClient.post()
.uri(apiUrl)
.headers(h -> h.setBearerAuth(System.getenv("OPENAI_API_KEY")))
.contentType(MediaType.APPLICATION_JSON)
.bodyValue(requestBody)
.retrieve()
.bodyToFlux(String.class)
.subscribe(
chunk -> {
// 解析SSE数据
String token = parseToken(chunk);
if (token != null) {
onToken.accept(token);
}
},
error -> System.err.println("Stream error: " + error),
() -> System.out.println("Stream completed")
);
}
/**
* 解析SSE格式的token
*/
private String parseToken(String sseData) {
try {
// data: {"choices":[{"delta":{"content":"xxx"}}]}\n\n
if (sseData.startsWith("data: ")) {
String json = sseData.substring(6);
if ("[DONE]".equals(json.trim())) {
return null;
}
Map<String, Object> data = objectMapper.readValue(json, Map.class);
List<Map<String, Object>> choices = (List<Map<String, Object>>) data.get("choices");
if (choices != null && !choices.isEmpty()) {
Map<String, Object> delta = (Map<String, Object>) choices.get(0).get("delta");
if (delta != null) {
return (String) delta.get("content");
}
}
}
} catch (Exception e) {
// 忽略解析错误
}
return null;
}
}
[java]
3.4 流式聊天服务
@Service
@Slf4j
public class StreamingChatService {
private final LLMClient llmClient;
public StreamingChatService(LLMClient llmClient) {
this.llmClient = llmClient;
}
/**
* 流式聊天主方法
*/
public void streamChat(String userMessage, Consumer<String> onToken) {
log.info("开始流式处理用户消息: {}", userMessage);
// 构建提示词
String prompt = buildPrompt(userMessage);
// 调用LLM流式接口
llmClient.streamChat(prompt, token -> {
// 回调处理每个Token
onToken.accept(token);
});
}
/**
* 带上下文的流式聊天
*/
public void streamChatWithContext(
String userMessage,
List<ChatMessage> history,
Consumer<String> onToken) {
String prompt = buildContextPrompt(userMessage, history);
llmClient.streamChat(prompt, token -> {
onToken.accept(token);
});
}
private String buildPrompt(String userMessage) {
return String.format("""
你是一个专业的技术助手。请回答用户的问题。
用户: %s
""", userMessage);
}
private String buildContextPrompt(String userMessage, List<ChatMessage> history) {
StringBuilder sb = new StringBuilder();
sb.append("你是一个专业的技术助手。以下是之前的对话上下文:\n\n");
for (ChatMessage msg : history) {
sb.append(msg.getRole()).append(": ").append(msg.getContent()).append("\n");
}
sb.append("\n用户新问题: ").append(userMessage);
return sb.toString();
}
}
[java]
3.5 SSE控制器

@RestController
@RequestMapping("/api/chat")
@Slf4j
public class SSEController {
private final StreamingChatService chatService;
public SSEController(StreamingChatService chatService) {
this.chatService = chatService;
}
/**
* SSE流式聊天接口
* 前端使用EventSource调用
*/
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamChat(
@RequestParam String message,
@RequestParam(required = false) String sessionId) {
log.info("收到流式聊天请求: {}, sessionId: {}", message, sessionId);
// 创建SseEmitter,设置超时为-1(不超时)
SseEmitter emitter = new SseEmitter(-1L);
// 使用异步线程池处理
CompletableFuture.runAsync(() -> {
try {
StringBuilder fullResponse = new StringBuilder();
chatService.streamChat(message, token -> {
try {
// 构建SSE格式数据
String sseData = String.format(
"data: {\"token\":\"%s\",\"content\":\"%s\"}\n\n",
escapeJson(token),
escapeJson(fullResponse.toString() + token)
);
emitter.send(sseData);
} catch (IOException e) {
log.error("发送SSE数据失败", e);
emitter.completeWithError(e);
}
fullResponse.append(token);
});
// 发送完成信号
emitter.send("data: {\"done\":true,\"fullContent\":\"" +
escapeJson(fullResponse.toString()) + "\"}\n\n");
emitter.complete();
log.info("流式聊天完成,共发送{}个字符", fullResponse.length());
} catch (Exception e) {
log.error("流式聊天处理失败", e);
emitter.completeWithError(e);
}
}, newThreadPoolExecutor());
// 配置回调
emitter.onTimeout(() -> log.warn("SSE连接超时"));
emitter.onCompletion(() -> log.info("SSE传输完成"));
emitter.onError(e -> log.error("SSE错误", e));
return emitter;
}
/**
* 带上下文的流式聊天
*/
@GetMapping(value = "/stream/context", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamChatWithContext(
@RequestParam String message,
@RequestParam List<String> historyJson) {
List<ChatMessage> history = parseHistory(historyJson);
SseEmitter emitter = new SseEmitter(-1L);
CompletableFuture.runAsync(() -> {
try {
StringBuilder fullResponse = new StringBuilder();
chatService.streamChatWithContext(message, history, token -> {
try {
String sseData = String.format(
"data: {\"token\":\"%s\"}\n\n", escapeJson(token));
emitter.send(sseData);
} catch (IOException e) {
emitter.completeWithError(e);
}
fullResponse.append(token);
});
emitter.send("data: {\"done\":true}\n\n");
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
});
return emitter;
}
private Executor newThreadPoolExecutor() {
return CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS,
(runnable) -> {
Thread t = new Thread(runnable, "sse-stream");
t.setDaemon(true);
return t;
});
}
private String escapeJson(String text) {
return text.replace("\\", "\\\\")
.replace("\"", "\\\"")
.replace("\n", "\\n")
.replace("\r", "\\r");
}
private List<ChatMessage> parseHistory(List<String> historyJson) {
try {
ObjectMapper mapper = new ObjectMapper();
return historyJson.stream()
.map(json -> {
try {
return mapper.readValue(json, ChatMessage.class);
} catch (Exception e) {
return null;
}
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
} catch (Exception e) {
return new ArrayList<>();
}
}
}
[java]
3.6 前端调用示例
使用EventSource(SSE)
class SSEClient {
constructor(baseUrl) {
this.baseUrl = baseUrl;
this.eventSource = null;
this.onMessage = null;
this.onDone = null;
this.onError = null;
}
/**
* 发起流式聊天请求
*/
connect(message, sessionId = null) {
// 注意:EventSource只支持GET请求,且不支持传递JSON body
let url = `${this.baseUrl}/api/chat/stream?message=${encodeURIComponent(message)}`;
if (sessionId) {
url += `&sessionId=${encodeURIComponent(sessionId)}`;
}
this.eventSource = new EventSource(url);
this.eventSource.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
if (data.done) {
this.onDone && this.onDone(data.fullContent);
} else if (this.onMessage) {
this.onMessage(data.token, data.content);
}
} catch (e) {
console.error('解析SSE数据失败:', e);
}
};
this.eventSource.onerror = (error) => {
console.error('SSE连接错误:', error);
this.onError && this.onError(error);
this.close();
};
}
close() {
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
}
}
// 使用示例
const client = new SSEClient('http://localhost:8080');
const messages = [];
let fullResponse = '';
client.onMessage = (token, content) => {
fullResponse = content;
// 更新UI,显示打字机效果
document.getElementById('response').innerText = content;
};
client.onDone = (fullContent) => {
console.log('响应完成:', fullContent);
messages.push({ role: 'assistant', content: fullContent });
};
client.onError = (error) => {
console.error('请求失败:', error);
};
client.connect('请介绍一下Java的Stream API');
[javascript]
使用Fetch API + ReadableStream
class FetchStreamClient {
async streamChat(message) {
const response = await fetch('/api/chat/stream', {
method: 'GET',
headers: {
'Content-Type': 'application/json',
},
params: { message }
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let fullContent = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
// 解析SSE格式数据
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.substring(6);
try {
const parsed = JSON.parse(data);
if (parsed.done) {
return fullContent;
}
if (parsed.token) {
fullContent += parsed.token;
// 更新UI
this.updateDisplay(fullContent);
}
} catch (e) {
// 忽略解析错误
}
}
}
}
return fullContent;
}
updateDisplay(content) {
document.getElementById('response').innerText = content;
}
}
[javascript]
四、生产环境关键配置
4.1 Nginx配置
流式输出需要Nginx配置支持禁用缓冲:
server {
listen 80;
server_name your-domain.com;
# 禁用代理缓冲,实现真正的流式传输
location /api/chat/stream {
proxy_pass http://127.0.0.1:8080;
proxy_http_version 1.1;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
# 关键配置:禁用缓冲
proxy_buffering off;
proxy_cache off;
chunked_transfer_encoding on;
# 增加超时时间
proxy_read_timeout 86400s;
proxy_send_timeout 86400s;
}
}
[nginx]
4.2 Spring Boot配置
# application.yml
server:
tomcat:
threads:
max: 200
min-spare: 10
connection-timeout: 20000
spring:
servlet:
streaming:
timeout: -1 # 流式接口不设置超时
chat:
stream:
buffer-size: 8192
timeout-seconds: 3600
[yaml]
4.3 错误处理与重试
@Component
public class StreamErrorHandler {
/**
* 判断是否是可重试的错误
*/
public boolean isRetryableError(Throwable error) {
if (error instanceof TimeoutException) {
return true;
}
if (error instanceof IOException) {
String message = error.getMessage();
return message != null && (
message.contains("Connection reset") ||
message.contains("Broken pipe")
);
}
return false;
}
/**
* 获取重试间隔(指数退避)
*/
public long getRetryDelayMs(int attempt) {
return Math.min(1000 * (long) Math.pow(2, attempt), 30000);
}
}
[java]
4.4 限流与熔断
@Component
public class StreamRateLimiter {
private final RateLimiter rateLimiter = RateLimiter.create(100.0); // 每秒100请求
/**
* 尝试获取令牌
*/
public boolean tryAcquire(String userId) {
return rateLimiter.tryAcquire();
}
/**
* 获取等待时间
*/
public double getWaitTimeSeconds() {
return rateLimiter.estimateTimeToPermitPermits(1);
}
}
@RestController
public class StreamController {
private final StreamRateLimiter rateLimiter;
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public ResponseEntity<SseEmitter> streamChat(@RequestParam String message) {
// 限流检查
if (!rateLimiter.tryAcquire()) {
return ResponseEntity.status(429).build();
}
// ... 后续处理
}
}
[java]
五、性能优化与监控
5.1 连接池优化
@Configuration
public class WebClientConfig {
@Bean
public WebClient webClient() {
ConnectionPool<ConnectionPool.Connection> connectionPool = ConnectionPool.builder()
.maxConnections(500)
.pendingAcquireTimeout(Duration.ofSeconds(60))
.pendingAcquireInterval(Duration.ofMillis(100))
.evictInBackground(Duration.ofSeconds(120))
.build();
return WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(
HttpClient.from(HttpClient.create(connectionPool))
))
.build();
}
}
[java]
5.2 监控指标
@Configuration
public class StreamMetricsConfig {
@Bean
public MeterRegistry meterRegistry() {
return new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
}
@Bean
public StreamMetrics streamMetrics(MeterRegistry registry) {
return new StreamMetrics(registry);
}
}
@Component
public class StreamMetrics {
private final Counter requestCounter;
private final Counter tokenCounter;
private final Timer latencyTimer;
public StreamMetrics(MeterRegistry registry) {
this.requestCounter = Counter.builder("llm.stream.requests")
.description("流式请求总数")
.register(registry);
this.tokenCounter = Counter.builder("llm.stream.tokens")
.description("发送的Token总数")
.register(registry);
this.latencyTimer = Timer.builder("llm.stream.latency")
.description("流式响应延迟")
.register(registry);
}
public void recordToken() {
tokenCounter.increment();
}
public Timer.Sample startTimer() {
return Timer.start();
}
public void recordLatency(Timer.Sample sample) {
sample.stop(latencyTimer);
}
}
[java]
5.3 内存管理
@Service
public class StreamSessionManager {
private final ConcurrentHashMap<String, StreamSession> sessions = new ConcurrentHashMap<>();
private final ScheduledExecutorService cleaner = Executors.newSingleThreadScheduledExecutor();
@PostConstruct
public void init() {
// 每5分钟清理过期会话
cleaner.scheduleAtFixedRate(() -> {
long now = System.currentTimeMillis();
sessions.entrySet().removeIf(entry ->
now - entry.getValue().getLastActiveTime() > 30 * 60 * 1000
);
}, 5, 5, TimeUnit.MINUTES);
}
public void addSession(String sessionId, SseEmitter emitter) {
sessions.put(sessionId, new StreamSession(emitter));
}
public void removeSession(String sessionId) {
sessions.remove(sessionId);
}
}
@Data
@AllArgsConstructor
class StreamSession {
private SseEmitter emitter;
private long lastActiveTime;
private long tokenCount;
public StreamSession(SseEmitter emitter) {
this.emitter = emitter;
this.lastActiveTime = System.currentTimeMillis();
this.tokenCount = 0;
}
}
[java]
六、常见问题与解决方案
6.1 首Token延迟过高
问题:连接建立后,首Token到达时间过长
排查步骤:
- 检查LLM服务商API延迟
- 检查网络链路(DNS、代理)
- 检查服务端线程池配置
优化方案:
// 使用连接预热
@Configuration
public class ConnectionWarmup {
@PostConstruct
public void warmup() {
// 预建立LLM API连接
webClient.get().uri("https://api.openai.com").retrieve().toBodilessEntity().block();
}
}
[java]
6.2 SSE在某些浏览器中断连
问题:Chrome浏览器约50秒自动断开
解决方案:
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamChat() {
SseEmitter emitter = new SseEmitter(60 * 1000L); // 50秒超时
// 定期发送心跳,保持连接
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(() -> {
try {
emitter.send("event: ping\n\n");
} catch (IOException e) {
scheduler.shutdown();
}
}, 30, 30, TimeUnit.SECONDS);
emitter.onCompletion(() -> scheduler.shutdown());
emitter.onTimeout(() -> scheduler.shutdown());
return emitter;
}
[java]
6.3 Token顺序错乱
问题:多个Token同时发送导致客户端接收顺序错乱
解决方案:
// 每个Token带序号
private int tokenIndex = 0;
private final Object lock = new Object();
chatService.streamChat(message, token -> {
synchronized (lock) {
int currentIndex = tokenIndex++;
String sseData = String.format(
"data: {\"token\":\"%s\",\"index\":%d}\n\n",
escapeJson(token),
currentIndex
);
emitter.send(sseData);
}
});
[java]
七、总结
本文详细介绍了大模型流式输出的完整技术方案:
核心要点
- 流式输出原理:Token逐个生成、即时传输、实时渲染
- 技术方案选择:SSE最适合大多数场景(实现简单、浏览器原生支持)
- Spring Boot实现:SseEmitter + 异步线程池 + 回调机制
- 生产环境配置:Nginx禁用缓冲、超时配置、限流熔断
- 性能优化:连接池预热、监控指标、内存管理
技术选型建议
|
场景 |
推荐方案 |
|
简单问答 |
SSE |
|
实时对话 |
WebSocket |
|
内容生成 |
SSE |
|
多人协作 |
WebSocket |
后续扩展
- 结合上下文记忆实现多轮对话
- 接入多种LLM服务商(OpenAI、Azure、Claude、本地模型)
- 实现Token计数、费用统计、成本控制
掌握流式输出技术,将为你的AI应用带来质的体验提升!
---
作者:付雷刚(洛水石)
原创内容,转载需注明出处
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)