汇报人:者亚杰

日期:2026-04-19

本周重点

SSE流式输出

为了提升用户体验,需要引入SSE(Server-SentEvents)流式输出。

方案选择

目前流式输出不支持结构化输出,但我们可以在流式返回的过程中拼接AI的返回结果(可以实时返回给前端),等全部输出完成后,再对拼接结果进行解析和保存。这样既保证了实时性,又不影响最终的处理流程。

在实现SSE的技术方案上,LangChain4j提供了两种方式:

1.LangChain4j+Reactor(本项目使用)

Reactor是指响应式编程,LangChain4j提供了响应式编程依赖包,可以直接把AI返回的内容封装为更通用的Flux响应式对象。可以把Flux想象成一个数据流,有了这个对象后,上游发来一块数据,下游就能处理一块数据。

LangChain4j 原生不自带 Reactor,通过独立模块 langchain4j-reactor 把大模型流式 Token 流,包装成 Reactor Flux<String>

  • 模型每返回一个文本分片 / Token → Flux 发出一次 onNext
  • 模型结束 → Flux onComplete
  • 模型报错 / 超时 → Flux onError

常用操作:

Flux 操作符

LangChain4j 流式用途

适用场景

备注

peek()

Token 日志、监控、打印

必用

只偷看不修改,流式调试首选

map()

Token 统一格式化、脱敏、加前缀

高频

同步一对一加工

filter()

过滤空字符、敏感词、无效分片

高频

流清洗

flatMapSequential()

异步二次处理、RAG 检索、工具调用

核心

有序异步,AI 链式处理必用

concatMap()

严格串行多轮对话拼接

对话上下文

串行无并发

onErrorReturn()

模型异常兜底文本

必配

生产兜底

onErrorResume()

异常降级、备用模型切换

高可用

timeout()

模型调用超时熔断

必配

防止挂起

onBackpressureBuffer()

下发过快缓存

长文本必备

背压防护

limitRate()

请求限流

高并发接口

delayElements()

Token 发送减速、前端流畅展示

前端友好

reduce()

所有 Token 拼接完整结果

汇总全文

Flux→Mono<String>

log()

全生命周期日志

开发调试

merge/concat

多模型流合并

多模型聚合

这种方案的优点是与前端集成更方便,通过 Flux 对象可以很容易地将流 式内容返回给前端。缺点是需要引入额外的依赖:

<!-- langchain4j-reactor -->
<dependency>
  <groupId>dev.langchain4j</groupId>
  <artifactId>langchain4j-reactor</artifactId>
  <version>1.1.0-beta7</version>
</dependency>

2.TokenStream(LangChain4j 内置流)

这是 LangChain4j 的原生实现方式,好处是提供了更多高级回调,比如工具调用完成回(onToolExecuted)、工具调用内容实时响应。 但缺点是使用起来相对复杂,而且要返回前端时还需要用 Flux 包装一层。

3.StreamingChatResponseHandler(原生回调)

也是原生实现,缺点较多,使用难度大

对比:

以下表格由ai生成

对比维度

方案 1:StreamingChatResponseHandler(原生回调)

方案 2:TokenStream(LangChain4j 内置流)

方案 3:Reactor Flux(langchain4j-reactor)

底层本质

回调接口、异步回调通知

Java 迭代器 Iterable 拉式流

Reactor 响应式发布者 Publisher

返回值类型

void

(无返回,内部回调)

TokenStream

Flux<String>

依赖要求

langchain4j 原生核心包,无额外依赖

langchain4j 原生核心包,无额外依赖

必须引入 langchain4j-reactor 桥接依赖

编程模型

面向回调、匿名内部类写法繁琐

增强 for 循环遍历、拉模式

链式编程、声明式、函数式编程

Reactor 操作符支持

❌ 完全不支持 map/filter/peek 等所有操作符

❌ 不支持 Reactor 整套操作符

完整兼容所有 Flux 操作符

WebFlux / Spring 原生适配

❌ 极差,无法直接返回接口,需手动封装 Sink

❌ 一般,不能直接返回 Controller,需转 Flux

完美原生适配,Controller 直接返回

SSE 前端流式推送

极麻烦,手动封装响应、背压自己处理

麻烦,需适配 Spring 响应式

极简,自动 SSE 流式,Spring 原生支持

背压 Backpressure

❌ 无背压,下发过快直接 OOM

❌ 无背压,无流量管控

原生背压全支持,限流、缓存、丢弃策略齐全

异常处理

回调内硬编码捕获,分散杂乱

遍历捕获异常,写法笨重

链式统一异常:onErrorReturn

/onErrorResume

/retry

流生命周期管控

难管控,取消、中断、超时麻烦

弱管控,无法优雅中断

极强:timeout

cancel

、超时熔断全支持

Token 顺序保证

原生有序

原生有序

原生有序,搭配flatMapSequential

依然保序

链式二次加工

困难,代码嵌套深

仅简单遍历处理

极强,可无限链式加工、聚合、窗口、合并流

代码优雅度

⭐ 低,嵌套多、冗余量大

⭐⭐ 中等,遍历简单但扩展性差

⭐⭐⭐⭐⭐ 极高,一行链式写完

Spring 事务 / 上下文传递

丢失 ThreadLocal,事务极易失效

上下文容易丢失

完整传递 MDC、事务、线程上下文

多流合并(concat/merge/zip)

❌ 不支持

❌ 不支持

✅ 原生全部支持

结果聚合(拼接完整文本)

手动 String 拼接,线程不安全

手动遍历拼接

一行reduce(String::concat)

自动聚合

内存溢出风险

高,无背压,token 堆积爆内存

较高

低,背压管控 + 限流 + 缓存上限保护

适用场景

底层自定义封装、老旧项目兼容、简单一次性调用

简单本地遍历、非 Web 环境、无需响应式

Spring WebFlux 项目、AI 流式接口、前端 SSE、RAG 链式、生产微服务

生产开发推荐

不推荐业务开发

仅简单本地脚本使用

官方首选、企业项目通用标准方案

开发实现

1.首先配置流式模型
streaming-chat-model:
  base-url: https://ark.cn-beijing.volces.com/api/v3
  api-key: api-key
  model-name: doubao-seed-1-8-251228
  timeout: 120000
  log-requests: true
  log-responses: true
  max-tokens: 8192

我本地使用的是doubao1.8进行测试,非本项目最终大模型选型

2.在创建 AI Service 的工厂类中注入流式模型
@Configuration
public class AiCodeGeneratorServiceFactory {
    @Resource
    private ChatModel chatModel;

    @Resource
    private StreamingChatModel streamingChatModel;

    @Bean
    public AiCodeGeneratorService aiCodeGeneratorService() {
        return AiServices.builder(AiCodeGeneratorService.class)
        .chatModel(chatModel)
        .streamingChatModel(streamingChatModel)
        .build();
    }
}

3.在 AI Service 中新增流式方法,跟之前方法的区别在于返回值改为了 Flux 对象
/**
 * 生成 HTML 代码(流式)
 *
 * @param userMessage 用户消息
 * @return 生成的代码结果
 */
@SystemMessage(fromResource = "prompt/codegen-html-system-prompt.txt")
Flux<String> generateHtmlCodeStream(String userMessage);
/**
 * 生成多文件代码(流式)
 *
 * @param userMessage 用户消息
 * @return 生成的代码结果
 */
@SystemMessage(fromResource = "prompt/codegen-multi-file-system-prompt.txt")
Flux<String> generateMultiFileCodeStream(String userMessage);

4.编写解析逻辑

由于流式输出返回的是字符串片段,需要在 AI 全部返回完成后进行解析

由于代码解析逻辑相对复杂,单独在 core 包下创建代码解析器 CodeParser。核心逻辑是通过正则表达式从完整 字符串中提取到对应的代码块,并返回结构化输出对象,这样可以复用之前的文件保存器。

/**
 * 代码解析器
 * 提供静态方法解析不同类型的代码内容
 */
public class CodeParser {
    private static final Pattern HTML_CODE_PATTERN = Pattern.compile("```html\\s*\\n([\\s\\S]*?)```",
                                                                     Pattern.CASE_INSENSITIVE);
    private static final Pattern CSS_CODE_PATTERN = Pattern.compile("```css\\s*\\n([\\s\\S]*?)```",
                                                                    Pattern.CASE_INSENSITIVE);
    private static final Pattern JS_CODE_PATTERN =
    Pattern.compile("```(?:js|javascript)\\s*\\n([\\s\\S]*?)```", Pattern.CASE_INSENSITIVE);
    /**
     * 解析 HTML 单文件代码
     */
    public static HtmlCodeResult parseHtmlCode(String codeContent) {
        HtmlCodeResult result = new HtmlCodeResult();
        // 提取 HTML 代码
        String htmlCode = extractHtmlCode(codeContent);
        if (htmlCode != null && !htmlCode.trim().isEmpty()) {
            result.setHtmlCode(htmlCode.trim());
        } else {
            // 如果没有找到代码块,将整个内容作为HTML
            result.setHtmlCode(codeContent.trim());
        }
        return result;
    }
    /**
     * 解析多文件代码(HTML + CSS + JS)
     */
    public static MultiFileCodeResult parseMultiFileCode(String codeContent) {
        MultiFileCodeResult result = new MultiFileCodeResult();
        // 提取各类代码
        String htmlCode = extractCodeByPattern(codeContent, HTML_CODE_PATTERN);
        String cssCode = extractCodeByPattern(codeContent, CSS_CODE_PATTERN);
        String jsCode = extractCodeByPattern(codeContent, JS_CODE_PATTERN);
        // 设置HTML代码
        if (htmlCode != null && !htmlCode.trim().isEmpty()) {
            result.setHtmlCode(htmlCode.trim());
        }
        // 设置CSS代码
        if (cssCode != null && !cssCode.trim().isEmpty()) {
            result.setCssCode(cssCode.trim());
        }
        // 设置JS代码
        if (jsCode != null && !jsCode.trim().isEmpty()) {
            result.setJsCode(jsCode.trim());
        }
        return result;
    }
    /**
     * 提取HTML代码内容
     *
     * @param content 原始内容
     * @return HTML代码
     */
    private static String extractHtmlCode(String content) {
        Matcher matcher = HTML_CODE_PATTERN.matcher(content);
        if (matcher.find()) {
            return matcher.group(1);
        }
        return null;
    }
    /**
     * 根据正则模式提取代码
     *
     * @param content 原始内容
     * @param pattern 正则模式
     * @return 提取的代码
     */
    private static String extractCodeByPattern(String content, Pattern pattern) {
        Matcher matcher = pattern.matcher(content);
        if (matcher.find()) {
            return matcher.group(1);
        }
        return null;
    }
}

同时编写单元测试,结果符合预期

5.在 AiCodeGeneratorFacade 中添加流式调用 AI 的方法

针对每种生成模式,分别提供一个“生成代码并保存”的方法,核心逻辑都是:拼接 AI 实时响应的字符串,并在 流式返回完成后解析字符串并保存代码文件

/**
 * 生成 HTML 模式的代码并保存(流式)
 *
 * @param userMessage 用户提示词
 * @return 保存的目录
 */
private Flux<String> generateAndSaveHtmlCodeStream(String userMessage) {
	Flux<String> result = aiCodeGeneratorService.generateHtmlCodeStream(userMessage);
	// 当流式返回生成代码完成后,再保存代码
	StringBuilder codeBuilder = new StringBuilder();
	return result
			.doOnNext(chunk -> {
				// 实时收集代码片段
				codeBuilder.append(chunk);
			})
			.doOnComplete(() -> {
				// 流式返回完成后保存代码
				try {
					String completeHtmlCode = codeBuilder.toString();
					HtmlCodeResult htmlCodeResult = CodeParser.parseHtmlCode(completeHtmlCode);
					// 保存代码到文件
					File savedDir = CodeFileSaver.saveHtmlCodeResult(htmlCodeResult);
					log.info("保存成功,路径为:" + savedDir.getAbsolutePath());
				} catch (Exception e) {
					log.error("保存失败: {}", e.getMessage());
				}
			});
}

/**
 * 生成多文件模式的代码并保存(流式)
 *
 * @param userMessage 用户提示词
 * @return 保存的目录
 */
private Flux<String> generateAndSaveMultiFileCodeStream(String userMessage) {
	Flux<String> result = aiCodeGeneratorService.generateMultiFileCodeStream(userMessage);
	// 当流式返回生成代码完成后,再保存代码
	StringBuilder codeBuilder = new StringBuilder();
	return result
			.doOnNext(chunk -> {
				// 实时收集代码片段
				codeBuilder.append(chunk);
			})
			.doOnComplete(() -> {
				// 流式返回完成后保存代码
				try {
					String completeMultiFileCode = codeBuilder.toString();
					MultiFileCodeResult multiFileResult =
					CodeParser.parseMultiFileCode(completeMultiFileCode);
					// 保存代码到文件
					File savedDir = CodeFileSaver.saveMultiFileCodeResult(multiFileResult);
					log.info("保存成功,路径为:" + savedDir.getAbsolutePath());
				} catch (Exception e) {
					log.error("保存失败: {}", e.getMessage());
				}
			});
}

6.在 AiCodeGeneratorFacade 中编写统一入口,根据生成模式枚举选择对应的流式方法
/**
 * 统一入口:根据类型生成并保存代码(流式)
 *
 * @param userMessage     用户提示词
 * @param codeGenTypeEnum 生成类型
 */
public Flux<String> generateAndSaveCodeStream(String userMessage, CodeGenTypeEnum codeGenTypeEnum) {
    if (codeGenTypeEnum == null) {
        throw new BusinessException(ErrorCode.SYSTEM_ERROR, "生成类型为空");
    }
    return switch (codeGenTypeEnum) {
        case HTML -> generateAndSaveHtmlCodeStream(userMessage);
        case MULTI_FILE -> generateAndSaveMultiFileCodeStream(userMessage);
        default -> {
            String errorMessage = "不支持的生成类型:" + codeGenTypeEnum.getValue();
            throw new BusinessException(ErrorCode.SYSTEM_ERROR, errorMessage);
    }
};
}

经过单元测试,实现流式输出,符合预期

思考

本次SSE流式输出的开发,不仅完成了业务目标,更让我在技术选型、问题解决、工程规范等方面得到了全面提升。我深刻认识到,开发不仅是“实现功能”,更是“做好功能”——既要兼顾业务需求和用户体验,也要注重代码的可维护性、稳定性和扩展性;既要掌握核心技术,也要学会权衡取舍,结合实际场景选择最合适的方案。后续,我将把本次开发积累的经验运用到后续工作中,不断优化开发思路,提升开发能力,为项目的稳定运行贡献力量。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐