大家好,我是程序员小策。

首先给大家去一个例子:凌晨两点,P0 告警炸了。

AI 聊天接口全部超时,用户消息发出去转圈转了 120 秒然后报错。你打开监控一看:Tomcat 线程池满了,200 个工作线程全部卡在"等待 AI 回复"这个状态上。新的请求进来直接 reject。

原因很简单——OpenAI 那边的 API 延迟突然飙到了 90 秒。而你的后端用的是同步阻塞调用:httpClient.post(url, body),线程就一直干等着。一个用户等 90 秒,十个用户就是 900 秒·线程,两百个线程全部耗尽只用了不到三分钟。

这就是同步调用大模型在生产上最经典的翻车场景。

今天这篇文章,我就带你看一个生产级的 AI 聊天系统是怎么解决这个问题的——用 Reactor Flux + SSE 把同步阻塞变成异步流式,用独立的线程池把 AI 调用和 Web 请求线程隔离开,用 CountDownLatch 守住最后一道超时防线。


问题定义

AI 聊天的核心矛盾:LLM 响应时间极度不确定(5 秒到 180 秒),而 HTTP 请求线程是稀缺资源(Tomcat 默认 200 个)。

朴素方案是同步阻塞调用:请求进来 → 调 LLM → 等返回 → 返回给前端。在 LLM 响应稳定的情况下(比如 5 秒以内),这个方案没问题。但 LLM 的响应时间取决于模型负载、输入长度、输出长度,波动巨大。一旦延迟飙上去,线程池就被打满。

那必然的改进方向是异步 + 流式

但这不是简单的"把同步改成异步"就完事了。你还要面对:

  • 流式数据如何一帧一帧推给前端?用什么协议?
  • AI 调用在哪个线程里执行?会不会影响主线程?
  • 如果 AI 调用一直不返回,谁来兜底超时?
  • 流式过程中用户刷新页面断开连接,后端怎么感知?

核心概念

SSE(Server-Sent Events):基于 HTTP 协议的单向流式传输,服务端可以持续向客户端推送数据,客户端通过 EventSource API 接收。相比 WebSocket,SSE 更轻量,不需要协议升级,网关/CDN 兼容性更好。相比短轮询,SSE 实时性更高,不需要客户端反复请求。

Flux:Reactor 框架中的响应式流发布者,代表一个包含 0 到 N 个元素的异步序列。Flux<String> 可以逐个推送字符串给订阅者。

FluxSink:Flux 的编程式发射器——你不需要预先准备好所有数据,而是在运行时按需调用 sink.next(data) 把数据推出去。

用交通类比理解这个架构:

想象一条高速公路收费站。同步阻塞就是"一辆车到了收费窗口,等它交完钱、找完零、拿完票据、开走,下一辆车才能上来"。高峰期必堵。

异步流式就是"ETC 不停车收费"——车开过去,交易在后台异步完成。多车道并行处理,一辆车堵了不影响其他车道。

在这个系统里:

  • 收费站 = HTTP 请求线程(稀缺资源,快速释放)
  • ETC 后台 = 独立线程池 ThreadPoolTaskExecutor(专门执行 AI 调用)
  • 车道 = FluxSink(每个请求一条车道,互不干扰)
  • 超时栏杆 = CountDownLatch.await(5, TimeUnit.MINUTES)(5 分钟没通过就拦截)

实现

看代码。第一步,Controller 层——SSE 的入口:

@RestController
@RequestMapping("/api/xunzhi/v1/ai")
@RequiredArgsConstructor
public class AiMessageController {

    private final AiMessageService aiMessageService;

    @PostMapping(value = "/sessions/{sessionId}/chat", 
                 produces = MediaType.TEXT_EVENT_STREAM_VALUE)  // ← 告诉浏览器这是SSE流
    public Flux<String> chat(@PathVariable String sessionId,
                             @RequestBody AiMessageReqDTO requestParam,
                             @CurrentUser String username,
                             HttpServletResponse response) {
        response.setHeader("Cache-Control", "no-cache");      // ← 禁止浏览器缓存SSE数据
        response.setHeader("Connection", "keep-alive");       // ← 保持连接
        response.setHeader("Access-Control-Allow-Origin", "*");
        response.setHeader("Access-Control-Allow-Headers", "Cache-Control");
        requestParam.setSessionId(sessionId);
        return aiMessageService.aiChatFlux(requestParam, username);  // ← 返回Flux,Spring自动转SSE
    }
}

注意 produces = MediaType.TEXT_EVENT_STREAM_VALUE——这一句告诉 Spring:“这个接口的返回值不要一次性序列化成 JSON 返回,而是以 text/event-stream 格式逐条推送。”

第二步,核心——Service 层的流式编排。这是整个系统最重要的一段代码:

@Service
@RequiredArgsConstructor
public class AiMessageServiceImpl implements AiMessageService {

    private static final String DEFAULT_ERROR_CONTENT = "Sorry, an error occurred while processing your request.";
    private static final String UNSUPPORTED_AI_TYPE = "Current AI type is not supported";

    private final AiPropertiesService aiPropertiesService;
    private final AiConversationService aiConversationService;
    private final AiChatHandlerFactory aiChatHandlerFactory;
    private final ConversationMessageHistoryService conversationMessageHistoryService;
    private final ConversationMessagePersistenceService conversationMessagePersistenceService;
    private final ConversationStreamingSupport conversationStreamingSupport;
    private final ThreadPoolTaskExecutor threadPoolTaskExecutor;  // ← 独立线程池!

    @Override
    public Flux<String> aiChatFlux(AiMessageReqDTO requestParam, String username) {
        // 参数校验 + 权限校验
        if (requestParam == null) {
            return Flux.error(new ClientException("request body cannot be empty"));
        }
        if (StrUtil.isBlank(requestParam.getSessionId())) {
            return Flux.error(new ClientException("sessionId cannot be empty"));
        }
        aiConversationService.requireOwnedConversation(requestParam.getSessionId(), username);
        requestParam.setUserName(username);
        return aiChatFluxInternal(requestParam);
    }

    private Flux<String> aiChatFluxInternal(AiMessageReqDTO requestParam) {
        return Flux.create(sink -> {  // ← Flux.create():编程式创建Flux
            String userMessage = StrUtil.blankToDefault(requestParam.getInputMessage(), "No input");
            Long aiId = requestParam.getAiId();
            AIContentAccumulator accumulator = new AIContentAccumulator();  // ← 内容累积器

            // 核心!AI调用提交到独立线程池,不占用Tomcat线程
            threadPoolTaskExecutor.submit(() -> processChat(
                    sessionId, aiId, userMessage, sink, accumulator));
            
            // 监听前端断开连接事件
            sink.onCancel(() -> log.warn("AI chat flux cancelled, sessionId={}", sessionId));
            sink.onDispose(() -> log.info("AI chat flux disposed, sessionId={}", sessionId));
        });
    }
}

这段代码的设计精髓在三个点:

第一个:Flux.create() 而不是 Flux.just() Flux.just("a", "b", "c") 是你事先知道所有数据,一次性列出来。但 AI 聊天是流式的——你不可能事先知道 AI 会说什么。Flux.create() 给你一个 FluxSink,你可以在任何时候调用 sink.next(data) 推数据,非常灵活。

第二个:threadPoolTaskExecutor.submit() 把 AI 调用隔离到独立线程池。 这一步极其关键。Tomcat 的工作线程拿到请求后,校验参数、校验权限,然后把 AI 调用丢给独立线程池,自己立刻返回去处理下一个请求。Tomcat 线程不会被阻塞 90 秒。

第三个:sink.onCancel()sink.onDispose() 监听的是前端断开连接。 用户刷新页面或关闭标签页时,SSE 连接会断开,后端需要知道并及时释放资源。

第三步,Handler 层的流式发射和超时兜底。这是 UniversalAiChatHandler.streamToSink() 的核心:

@Override
public void streamToSink(AiPropertiesDO aiProperties, String userMessage, 
                         List<AiMessageHistoryRespDTO> historyMessages,
                         FluxSink<String> sink, AIContentAccumulator accumulator) throws Exception {
    ChatClient chatClient = createChatClient(aiProperties);
    List<Message> messages = buildMessages(aiProperties, userMessage, historyMessages);

    CountDownLatch latch = new CountDownLatch(1);         // ← 超时防线!
    final Throwable[] streamError = new Throwable[1];     // ← 异常收集器

    chatClient.prompt()
            .messages(messages)
            .stream()
            .chatResponse()
            .subscribe(
                    chatResponse -> {
                        // 正常收到一个token → 推送给前端
                        Generation generation = chatResponse.getResult();
                        if (generation != null) {
                            String content = generation.getOutput().getText();
                            if (StrUtil.isNotEmpty(content)) {
                                AiChatStreamRespDTO resp = AiChatStreamRespDTO.builder()
                                        .type("content")
                                        .content(content)
                                        .build();
                                sink.next(JSON.toJSONString(resp));     // ← 推给SSE
                                accumulator.appendSimpleContent(content); // ← 累积完整回复
                            }
                            
                            // DeepSeek R1 的 reasoning_content 处理 ← 关键!
                            String reasoning = null;
                            try {
                                Method getReasoningContent = generation.getOutput()
                                        .getClass().getMethod("getReasoningContent");
                                Object reasoningVal = getReasoningContent.invoke(generation.getOutput());
                                if (reasoningVal != null) reasoning = reasoningVal.toString();
                            } catch (Exception ignore) {}
                            
                            if (reasoning == null) {
                                Object reasoningObj = generation.getOutput()
                                        .getMetadata().get("reasoningContent");
                                if (reasoningObj != null) reasoning = reasoningObj.toString();
                            }
                            
                            if (StrUtil.isNotEmpty(reasoning)) {
                                AiChatStreamRespDTO resp = AiChatStreamRespDTO.builder()
                                        .type("reasoning_content")
                                        .content(reasoning)
                                        .build();
                                sink.next(JSON.toJSONString(resp));  // ← 深度思考推给前端
                                accumulator.appendReasoningChunk(reasoning.getBytes());
                            }
                        }
                    },
                    error -> {
                        log.error("流式响应发生错误", error);
                        streamError[0] = error;
                        sink.error(error);
                        latch.countDown();
                    },
                    latch::countDown  // ← 正常完成时countDown
            );

    // 核心!最多等5分钟 ← 超时兜底
    if (!latch.await(5, TimeUnit.MINUTES)) {
        throw new RuntimeException("AI 响应超时");
    }

    if (streamError[0] != null) {
        throw new RuntimeException(streamError[0]);
    }
}

这段代码里有三个生产级的实践细节:

第一:CountDownLatch 作为同步屏障。 Spring AI 的 stream().subscribe() 是异步的——你发起流式调用后,代码会立刻继续往下走。如果不加等待,方法直接返回了,FluxSink 还没收到任何数据。latch.await(5, MINUTES) 的意思是:“我等你最多 5 分钟,5 分钟后还没完成就抛超时异常”。

第二:streamError[] 数组收集异常。 为什么不是直接 throw?因为 subscribe() 的 error 回调是在另一个线程里执行的,直接 throw 不会传播到当前线程。用数组做中转,当前线程在 latch.await() 返回后检查。

第三:DeepSeek R1 的 reasoningContent 处理用了反射。 因为 Spring AI 的通用 AssistantMessage 类没有直接暴露 reasoningContent 字段,但 DeepSeek 的返回里有。代码先用反射去取 getReasoningContent() 方法,取不到再 fallback 到 metadata 字段——兼容的优雅写法


边界情况与陷阱

陷阱一:用户刷新页面前端断开连接,FluxSink 继续写数据会怎样?

会抛异常。但如果你的代码在收到 onCancel 信号后还在向 AI 发请求,这个请求就成了"孤儿请求"——白白消耗 Token,结果没人接收。这个系统的防御是:在 sink.onCancel() 里打日志,并在上层 processChat 里的 sink.isCancelled() 检查中跳过推送。

陷阱二:CountDownLatch 设 5 分钟够不够?

这取决于你的业务场景。对于 GPT-4 生成一篇长文章,5 分钟可能不够。但注意——CountDownLatch 的超时是整个流式过程的总时长,不是单个 token 的等待时间。只要 AI 一直在输出 token(即使很慢),latch 就不会超时,因为 latch.countDown() 只有在流式完成后才会调用。所以 5 分钟超时真正罩住的是"流式调用发起了但一直没有返回任何数据"或者"中间网络断了导致无限等待"的场景。

陷阱三:线程池的拒绝策略。

ThreadPoolTaskExecutor 如果配置不当(核心线程数太小、队列太大),高峰期任务会被堆积。默认的 AbortPolicy 会直接抛异常。这个系统用的是 ThreadPoolTaskExecutor,需要去看实际配置有没有设置合理的拒绝策略——比如 CallerRunsPolicy(让调用线程自己执行,虽然慢但不会丢任务)。


高级考量:SSE vs WebSocket vs gRPC Stream

当你的 AI 聊天不只是文本,还要支持图片、语音、文件传输时,SSE 还够不够用?

SSE 只支持服务端到客户端的单向推送。如果未来你需要客户端上传语音流、服务端实时转文字并流式返回,那 WebSocket 的双向通信就更合适。但 WebSocket 有它的代价——需要协议升级(HTTP → WS),某些网关/CDN 不兼容,调试也比 SSE 复杂。

gRPC Stream 是另一个选择——双向流,性能比 WebSocket 好,但需要客户端支持 gRPC(通常是后端服务之间通信用,浏览器端不太方便)。

本项目选 SSE 的原因:AI 聊天就是单向的(用户发一条消息 → AI 流式回复),SSE 正好够用,不引入额外复杂度。


对比表格

方案 实现方式 线程模型 超时控制 前端断开感知 适用场景
同步阻塞 httpClient.post() 等返回 Tomcat线程直接等待 HTTP超时(粗糙) HTTP断开抛异常 响应<3秒的低延迟API
DeferredResult Spring MVC 异步 业务线程池 + Servlet3.0异步 Future.get(timeout) onTimeout回调 传统Spring MVC项目改异步
WebFlux SSE(本项目) Flux.create() + FluxSink 独立线程池,Tomcat线程立刻释放 CountDownLatch.await(5min) sink.onCancel() AI流式聊天、实时推送
WebSocket WebSocketHandler 长连接线程池 心跳检测 onClose事件 双向实时通信
gRPC Stream StreamObserver gRPC线程池 deadline onCompleted/onError 微服务间流式通信

面试追问

面试追问 1:为什么用 Flux.create() 而不是 Flux.push()Flux.generate()

→ 回答方向:Flux.create() 最适合"外部异步源推数据"的场景——你拿到一个 FluxSink,可以在任何地方(包括其他线程)调用 sink.next()Flux.generate() 是同步的、一次一个地生成数据,不适合这里。Flux.push()create() 的单线程版本,但 AI 回调可能在多个线程里触发。所以 Flux.create() 是正确选择。

面试追问 2:CountDownLatch 等待 5 分钟,如果主线程是 Tomcat 线程,不还是阻塞了吗?

→ 回答方向:问得好,这就是 threadPoolTaskExecutor.submit() 的关键作用。streamToSink() 不是在 Tomcat 线程里执行的——它被提交到了独立线程池。Tomcat 线程在 Flux.create() 返回后就已经释放了。CountDownLatch.await() 阻塞的是独立线程池里的工作线程,不影响 Tomcat 接收新请求。

面试追问 3:AiChatStreamRespDTO 有两种 type——contentreasoning_content,前端怎么区分和渲染?

→ 回答方向:前端收到 SSE 事件后,按 type 字段分流。content 类型的渲染到聊天气泡的主区域,reasoning_content 类型的渲染到一个可折叠的"思考过程"区域(类似 DeepSeek 官网那种灰色小字)。两种类型交替推送——可能在思考过程中间夹杂正式回复,前端要做好顺序拼接。


总结

流式输出的关键是两个隔离:线程隔离(AI 调用不占用 Web 线程)和时间隔离(超时不依赖 HTTP 超时)。

读完这篇你应该能:

  • Flux.create() + FluxSink 实现一个 SSE 流式推送接口
  • CountDownLatch 给异步流式调用加超时兜底
  • 理解为什么需要独立线程池来执行 AI 调用
  • 在面试时说出"FluxSink 编程式发射 + 线程池隔离 + CountDownLatch 超时"而不只是"用了 WebFlux"
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐