大模型流式输出的服务端架构:SSE与增量渲染的工程实践

cover

一、等待焦虑:大模型推理延迟的用户体验困境

大模型推理的端到端延迟通常在数秒到数十秒之间,如果采用传统的请求-响应模式,用户需要等待整个生成过程完成后才能看到结果。这种"黑箱等待"体验在对话场景中尤为糟糕——用户不确定系统是否在工作,不确定是否需要重新发送请求,焦虑感随等待时间指数级增长。

流式输出(Streaming)通过将大模型的生成过程拆分为多个Token增量返回,让用户在首个Token生成后即可看到部分结果,极大改善了感知延迟。然而,流式输出的服务端实现远比一次性返回复杂:需要管理长连接的生命周期、处理生成过程中的异常中断、实现增量内容的缓存与断点续传、以及协调多个下游服务的流式响应合并。

本文将深入探讨大模型流式输出的服务端架构设计,覆盖SSE协议实现、流式编排、背压控制和容错机制四个核心维度。

二、流式输出架构设计

2.1 端到端流式架构

sequenceDiagram
    participant C as 客户端
    participant G as API网关
    participant S as 流式服务
    participant M as 大模型推理
    participant K as 缓存服务

    C->>G: SSE连接请求
    G->>S: 建立流式上下文
    S->>K: 检查缓存
    alt 缓存命中
        K-->>S: 返回缓存内容
        S-->>C: 流式推送缓存内容
    else 缓存未命中
        S->>M: 流式推理请求
        loop 增量Token
            M-->>S: Token片段
            S->>K: 写入缓存
            S-->>C: SSE推送Token
        end
        M-->>S: 生成完成信号
        S-->>C: 流式结束标记
    end

2.2 SSE协议实现

Server-Sent Events(SSE)是流式输出的主流协议选择。相比WebSocket,SSE基于HTTP协议,实现简单,天然支持断线重连和事件ID机制,更适合单向流式推送场景。

@Controller
public class StreamingController {

    private final StreamingService streamingService;

    @GetMapping(value = "/v1/chat/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public SseEmitter streamChat(@RequestParam String sessionId,
                                  @RequestBody ChatRequest request) {
        // 设置超时时间为5分钟,适配长文本生成场景
        SseEmitter emitter = new SseEmitter(300_000L);

        // 注册生命周期回调
        emitter.onCompletion(() -> streamingService.cleanup(sessionId));
        emitter.onTimeout(() -> streamingService.handleTimeout(sessionId));
        emitter.onError(e -> streamingService.handleError(sessionId, e));

        // 异步执行流式推理
        streamingService.streamGenerate(request, sessionId)
                .subscribe(
                        chunk -> {
                            try {
                                emitter.send(SseEmitter.event()
                                        .id(chunk.getEventId())
                                        .data(chunk.toJson()));
                            } catch (IOException e) {
                                emitter.completeWithError(e);
                            }
                        },
                        emitter::completeWithError,
                        () -> {
                            try {
                                emitter.send(SseEmitter.event()
                                        .name("done")
                                        .data("[DONE]"));
                                emitter.complete();
                            } catch (IOException e) {
                                emitter.completeWithError(e);
                            }
                        }
                );

        return emitter;
    }
}

2.3 流式缓存与断点续传

流式输出过程中的网络中断是常见问题。通过流式缓存机制,客户端可以在重连后从断点处继续接收内容,避免重复生成。

@Service
public class StreamingCacheManager {

    private final RedisTemplate<String, String> redisTemplate;
    private static final Duration CACHE_TTL = Duration.ofMinutes(30);

    /**
     * 追加写入流式内容到缓存
     */
    public void appendChunk(String sessionId, int sequenceId, String content) {
        String key = "stream:" + sessionId;
        // 使用Redis的Hash结构存储,field为序号
        redisTemplate.opsForHash().put(key,
                String.valueOf(sequenceId), content);
        redisTemplate.expire(key, CACHE_TTL);
    }

    /**
     * 获取已缓存的内容,用于断点续传
     */
    public List<CachedChunk> getCachedChunks(String sessionId,
                                              int fromSequence) {
        String key = "stream:" + sessionId;
        Map<Object, Object> entries = redisTemplate.opsForHash().entries(key);

        return entries.entrySet().stream()
                .filter(e -> Integer.parseInt(e.getKey().toString()) >= fromSequence)
                .sorted(Comparator.comparingInt(e ->
                        Integer.parseInt(e.getKey().toString())))
                .map(e -> new CachedChunk(
                        Integer.parseInt(e.getKey().toString()),
                        e.getValue().toString()))
                .collect(Collectors.toList());
    }
}

三、流式编排与背压控制

3.1 多模型流式编排

在RAG等场景中,一次请求可能需要先检索再生成,甚至需要多个模型依次处理。流式编排需要协调多个下游服务的流式响应。

@Service
public class StreamingOrchestrator {

    private final RetrievalService retrievalService;
    private final GenerationService generationService;

    public Flux<StreamChunk> orchestrate(ChatRequest request) {
        return Flux.create(sink -> {
            // 阶段一:检索(非流式,但需要快速返回)
            retrievalService.retrieve(request.getQuery())
                    .subscribe(context -> {
                        // 先推送检索状态
                        sink.next(StreamChunk.status("retrieving"));

                        // 阶段二:流式生成
                        generationService.streamGenerate(
                                request, context)
                                .subscribe(
                                        chunk -> sink.next(
                                                StreamChunk.content(chunk)),
                                        sink::error,
                                        sink::complete
                                );
                    }, sink::error);
        });
    }
}

3.2 背压控制

大模型的生成速度可能快于客户端的消费速度(如客户端网络带宽受限),此时需要背压机制防止服务端缓冲区溢出。

@Service
public class BackpressureAwareStreamer {

    private static final int MAX_INFLIGHT_CHUNKS = 100;

    public Flux<StreamChunk> streamWithBackpressure(
            Flux<String> modelOutput) {
        return modelOutput
                // 限制在途chunk数量,超出时暂停从模型读取
                .onBackpressureBuffer(MAX_INFLIGHT_CHUNKS,
                        () -> { /* 缓冲区满时的降级策略 */ })
                .map(StreamChunk::content)
                // 使用rate limiter控制推送速率
                .sample(Duration.ofMillis(50));
    }
}

四、架构权衡与边界分析

4.1 SSE与WebSocket的选型

SSE基于HTTP,实现简单,天然支持断线重连,但只能服务端向客户端单向推送;WebSocket支持双向通信,但实现复杂度更高,断线重连需要自行实现。对于纯流式输出场景,SSE是更优选择;如果需要客户端在流式过程中发送控制指令(如取消生成、调整参数),则应选择WebSocket。

4.2 缓存粒度与存储成本

流式缓存的粒度越细(如按Token缓存),断点续传的精度越高,但存储成本也越大;粒度越粗(如按句子缓存),存储成本低,但断点续传可能丢失部分内容。建议按句子粒度缓存,在存储成本和续传精度之间取得平衡。

4.3 长连接的资源消耗

每个SSE连接都会占用一个服务端线程(在使用Servlet时)或文件描述符。当并发连接数达到数万时,线程和文件描述符的资源消耗会成为瓶颈。建议使用WebFlux等异步非阻塞框架,或配置合理的连接超时和最大连接数限制。

五、总结

大模型流式输出的服务端架构需要在协议选型、缓存续传、流式编排和背压控制四个层面进行精细化设计。SSE协议实现简单且天然支持断线重连,流式缓存保障断点续传能力,多模型编排协调复杂的流式流程,背压控制防止缓冲区溢出。

落地建议:首先基于SSE协议实现基本的流式输出能力;其次增加流式缓存和断点续传,提升网络不稳定场景下的用户体验;最后,在并发量增长后引入背压控制和异步非阻塞框架,保障系统在高负载下的稳定性。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐