大模型流式输出的服务端架构:SSE与增量渲染的工程实践
大模型流式输出的服务端架构:SSE与增量渲染的工程实践

一、等待焦虑:大模型推理延迟的用户体验困境
大模型推理的端到端延迟通常在数秒到数十秒之间,如果采用传统的请求-响应模式,用户需要等待整个生成过程完成后才能看到结果。这种"黑箱等待"体验在对话场景中尤为糟糕——用户不确定系统是否在工作,不确定是否需要重新发送请求,焦虑感随等待时间指数级增长。
流式输出(Streaming)通过将大模型的生成过程拆分为多个Token增量返回,让用户在首个Token生成后即可看到部分结果,极大改善了感知延迟。然而,流式输出的服务端实现远比一次性返回复杂:需要管理长连接的生命周期、处理生成过程中的异常中断、实现增量内容的缓存与断点续传、以及协调多个下游服务的流式响应合并。
本文将深入探讨大模型流式输出的服务端架构设计,覆盖SSE协议实现、流式编排、背压控制和容错机制四个核心维度。
二、流式输出架构设计
2.1 端到端流式架构
sequenceDiagram
participant C as 客户端
participant G as API网关
participant S as 流式服务
participant M as 大模型推理
participant K as 缓存服务
C->>G: SSE连接请求
G->>S: 建立流式上下文
S->>K: 检查缓存
alt 缓存命中
K-->>S: 返回缓存内容
S-->>C: 流式推送缓存内容
else 缓存未命中
S->>M: 流式推理请求
loop 增量Token
M-->>S: Token片段
S->>K: 写入缓存
S-->>C: SSE推送Token
end
M-->>S: 生成完成信号
S-->>C: 流式结束标记
end
2.2 SSE协议实现
Server-Sent Events(SSE)是流式输出的主流协议选择。相比WebSocket,SSE基于HTTP协议,实现简单,天然支持断线重连和事件ID机制,更适合单向流式推送场景。
@Controller
public class StreamingController {
private final StreamingService streamingService;
@GetMapping(value = "/v1/chat/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamChat(@RequestParam String sessionId,
@RequestBody ChatRequest request) {
// 设置超时时间为5分钟,适配长文本生成场景
SseEmitter emitter = new SseEmitter(300_000L);
// 注册生命周期回调
emitter.onCompletion(() -> streamingService.cleanup(sessionId));
emitter.onTimeout(() -> streamingService.handleTimeout(sessionId));
emitter.onError(e -> streamingService.handleError(sessionId, e));
// 异步执行流式推理
streamingService.streamGenerate(request, sessionId)
.subscribe(
chunk -> {
try {
emitter.send(SseEmitter.event()
.id(chunk.getEventId())
.data(chunk.toJson()));
} catch (IOException e) {
emitter.completeWithError(e);
}
},
emitter::completeWithError,
() -> {
try {
emitter.send(SseEmitter.event()
.name("done")
.data("[DONE]"));
emitter.complete();
} catch (IOException e) {
emitter.completeWithError(e);
}
}
);
return emitter;
}
}
2.3 流式缓存与断点续传
流式输出过程中的网络中断是常见问题。通过流式缓存机制,客户端可以在重连后从断点处继续接收内容,避免重复生成。
@Service
public class StreamingCacheManager {
private final RedisTemplate<String, String> redisTemplate;
private static final Duration CACHE_TTL = Duration.ofMinutes(30);
/**
* 追加写入流式内容到缓存
*/
public void appendChunk(String sessionId, int sequenceId, String content) {
String key = "stream:" + sessionId;
// 使用Redis的Hash结构存储,field为序号
redisTemplate.opsForHash().put(key,
String.valueOf(sequenceId), content);
redisTemplate.expire(key, CACHE_TTL);
}
/**
* 获取已缓存的内容,用于断点续传
*/
public List<CachedChunk> getCachedChunks(String sessionId,
int fromSequence) {
String key = "stream:" + sessionId;
Map<Object, Object> entries = redisTemplate.opsForHash().entries(key);
return entries.entrySet().stream()
.filter(e -> Integer.parseInt(e.getKey().toString()) >= fromSequence)
.sorted(Comparator.comparingInt(e ->
Integer.parseInt(e.getKey().toString())))
.map(e -> new CachedChunk(
Integer.parseInt(e.getKey().toString()),
e.getValue().toString()))
.collect(Collectors.toList());
}
}
三、流式编排与背压控制
3.1 多模型流式编排
在RAG等场景中,一次请求可能需要先检索再生成,甚至需要多个模型依次处理。流式编排需要协调多个下游服务的流式响应。
@Service
public class StreamingOrchestrator {
private final RetrievalService retrievalService;
private final GenerationService generationService;
public Flux<StreamChunk> orchestrate(ChatRequest request) {
return Flux.create(sink -> {
// 阶段一:检索(非流式,但需要快速返回)
retrievalService.retrieve(request.getQuery())
.subscribe(context -> {
// 先推送检索状态
sink.next(StreamChunk.status("retrieving"));
// 阶段二:流式生成
generationService.streamGenerate(
request, context)
.subscribe(
chunk -> sink.next(
StreamChunk.content(chunk)),
sink::error,
sink::complete
);
}, sink::error);
});
}
}
3.2 背压控制
大模型的生成速度可能快于客户端的消费速度(如客户端网络带宽受限),此时需要背压机制防止服务端缓冲区溢出。
@Service
public class BackpressureAwareStreamer {
private static final int MAX_INFLIGHT_CHUNKS = 100;
public Flux<StreamChunk> streamWithBackpressure(
Flux<String> modelOutput) {
return modelOutput
// 限制在途chunk数量,超出时暂停从模型读取
.onBackpressureBuffer(MAX_INFLIGHT_CHUNKS,
() -> { /* 缓冲区满时的降级策略 */ })
.map(StreamChunk::content)
// 使用rate limiter控制推送速率
.sample(Duration.ofMillis(50));
}
}
四、架构权衡与边界分析
4.1 SSE与WebSocket的选型
SSE基于HTTP,实现简单,天然支持断线重连,但只能服务端向客户端单向推送;WebSocket支持双向通信,但实现复杂度更高,断线重连需要自行实现。对于纯流式输出场景,SSE是更优选择;如果需要客户端在流式过程中发送控制指令(如取消生成、调整参数),则应选择WebSocket。
4.2 缓存粒度与存储成本
流式缓存的粒度越细(如按Token缓存),断点续传的精度越高,但存储成本也越大;粒度越粗(如按句子缓存),存储成本低,但断点续传可能丢失部分内容。建议按句子粒度缓存,在存储成本和续传精度之间取得平衡。
4.3 长连接的资源消耗
每个SSE连接都会占用一个服务端线程(在使用Servlet时)或文件描述符。当并发连接数达到数万时,线程和文件描述符的资源消耗会成为瓶颈。建议使用WebFlux等异步非阻塞框架,或配置合理的连接超时和最大连接数限制。
五、总结
大模型流式输出的服务端架构需要在协议选型、缓存续传、流式编排和背压控制四个层面进行精细化设计。SSE协议实现简单且天然支持断线重连,流式缓存保障断点续传能力,多模型编排协调复杂的流式流程,背压控制防止缓冲区溢出。
落地建议:首先基于SSE协议实现基本的流式输出能力;其次增加流式缓存和断点续传,提升网络不稳定场景下的用户体验;最后,在并发量增长后引入背压控制和异步非阻塞框架,保障系统在高负载下的稳定性。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)