2026山东大学软件学院项目实训(三)
汇报人:者亚杰
日期:2026-04-19
本周重点
SSE流式输出
为了提升用户体验,需要引入SSE(Server-SentEvents)流式输出。
方案选择
目前流式输出不支持结构化输出,但我们可以在流式返回的过程中拼接AI的返回结果(可以实时返回给前端),等全部输出完成后,再对拼接结果进行解析和保存。这样既保证了实时性,又不影响最终的处理流程。
在实现SSE的技术方案上,LangChain4j提供了两种方式:
1.LangChain4j+Reactor(本项目使用)
Reactor是指响应式编程,LangChain4j提供了响应式编程依赖包,可以直接把AI返回的内容封装为更通用的Flux响应式对象。可以把Flux想象成一个数据流,有了这个对象后,上游发来一块数据,下游就能处理一块数据。
LangChain4j 原生不自带 Reactor,通过独立模块 langchain4j-reactor 把大模型流式 Token 流,包装成 Reactor Flux<String>。
- 模型每返回一个文本分片 / Token → Flux 发出一次
onNext - 模型结束 → Flux
onComplete - 模型报错 / 超时 → Flux
onError
常用操作:
|
Flux 操作符 |
LangChain4j 流式用途 |
适用场景 |
备注 |
|
|
Token 日志、监控、打印 |
必用 |
只偷看不修改,流式调试首选 |
|
|
Token 统一格式化、脱敏、加前缀 |
高频 |
同步一对一加工 |
|
|
过滤空字符、敏感词、无效分片 |
高频 |
流清洗 |
|
|
异步二次处理、RAG 检索、工具调用 |
核心 |
有序异步,AI 链式处理必用 |
|
|
严格串行多轮对话拼接 |
对话上下文 |
串行无并发 |
|
|
模型异常兜底文本 |
必配 |
生产兜底 |
|
|
异常降级、备用模型切换 |
高可用 |
|
|
|
模型调用超时熔断 |
必配 |
防止挂起 |
|
|
下发过快缓存 |
长文本必备 |
背压防护 |
|
|
请求限流 |
高并发接口 |
|
|
|
Token 发送减速、前端流畅展示 |
前端友好 |
|
|
|
所有 Token 拼接完整结果 |
汇总全文 |
Flux→Mono<String> |
|
|
全生命周期日志 |
开发调试 |
|
|
|
多模型流合并 |
多模型聚合 |
这种方案的优点是与前端集成更方便,通过 Flux 对象可以很容易地将流 式内容返回给前端。缺点是需要引入额外的依赖:
<!-- langchain4j-reactor -->
<dependency>
<groupId>dev.langchain4j</groupId>
<artifactId>langchain4j-reactor</artifactId>
<version>1.1.0-beta7</version>
</dependency>
2.TokenStream(LangChain4j 内置流)
这是 LangChain4j 的原生实现方式,好处是提供了更多高级回调,比如工具调用完成回(onToolExecuted)、工具调用内容实时响应。 但缺点是使用起来相对复杂,而且要返回前端时还需要用 Flux 包装一层。
3.StreamingChatResponseHandler(原生回调)
也是原生实现,缺点较多,使用难度大
对比:
以下表格由ai生成
|
对比维度 |
方案 1:StreamingChatResponseHandler(原生回调) |
方案 2:TokenStream(LangChain4j 内置流) |
方案 3:Reactor Flux(langchain4j-reactor) |
|
底层本质 |
回调接口、异步回调通知 |
Java 迭代器 Iterable 拉式流 |
Reactor 响应式发布者 Publisher |
|
返回值类型 |
(无返回,内部回调) |
|
|
|
依赖要求 |
langchain4j 原生核心包,无额外依赖 |
langchain4j 原生核心包,无额外依赖 |
必须引入 langchain4j-reactor 桥接依赖 |
|
编程模型 |
面向回调、匿名内部类写法繁琐 |
增强 for 循环遍历、拉模式 |
链式编程、声明式、函数式编程 |
|
Reactor 操作符支持 |
❌ 完全不支持 map/filter/peek 等所有操作符 |
❌ 不支持 Reactor 整套操作符 |
✅ 完整兼容所有 Flux 操作符 |
|
WebFlux / Spring 原生适配 |
❌ 极差,无法直接返回接口,需手动封装 Sink |
❌ 一般,不能直接返回 Controller,需转 Flux |
✅ 完美原生适配,Controller 直接返回 |
|
SSE 前端流式推送 |
极麻烦,手动封装响应、背压自己处理 |
麻烦,需适配 Spring 响应式 |
极简,自动 SSE 流式,Spring 原生支持 |
|
背压 Backpressure |
❌ 无背压,下发过快直接 OOM |
❌ 无背压,无流量管控 |
✅ 原生背压全支持,限流、缓存、丢弃策略齐全 |
|
异常处理 |
回调内硬编码捕获,分散杂乱 |
遍历捕获异常,写法笨重 |
链式统一异常: / / |
|
流生命周期管控 |
难管控,取消、中断、超时麻烦 |
弱管控,无法优雅中断 |
极强: 、 、超时熔断全支持 |
|
Token 顺序保证 |
原生有序 |
原生有序 |
原生有序,搭配 依然保序 |
|
链式二次加工 |
困难,代码嵌套深 |
仅简单遍历处理 |
极强,可无限链式加工、聚合、窗口、合并流 |
|
代码优雅度 |
⭐ 低,嵌套多、冗余量大 |
⭐⭐ 中等,遍历简单但扩展性差 |
⭐⭐⭐⭐⭐ 极高,一行链式写完 |
|
Spring 事务 / 上下文传递 |
丢失 ThreadLocal,事务极易失效 |
上下文容易丢失 |
完整传递 MDC、事务、线程上下文 |
|
多流合并(concat/merge/zip) |
❌ 不支持 |
❌ 不支持 |
✅ 原生全部支持 |
|
结果聚合(拼接完整文本) |
手动 String 拼接,线程不安全 |
手动遍历拼接 |
一行 自动聚合 |
|
内存溢出风险 |
高,无背压,token 堆积爆内存 |
较高 |
低,背压管控 + 限流 + 缓存上限保护 |
|
适用场景 |
底层自定义封装、老旧项目兼容、简单一次性调用 |
简单本地遍历、非 Web 环境、无需响应式 |
Spring WebFlux 项目、AI 流式接口、前端 SSE、RAG 链式、生产微服务 |
|
生产开发推荐 |
不推荐业务开发 |
仅简单本地脚本使用 |
官方首选、企业项目通用标准方案 |
开发实现
1.首先配置流式模型
streaming-chat-model:
base-url: https://ark.cn-beijing.volces.com/api/v3
api-key: api-key
model-name: doubao-seed-1-8-251228
timeout: 120000
log-requests: true
log-responses: true
max-tokens: 8192
我本地使用的是doubao1.8进行测试,非本项目最终大模型选型
2.在创建 AI Service 的工厂类中注入流式模型
@Configuration
public class AiCodeGeneratorServiceFactory {
@Resource
private ChatModel chatModel;
@Resource
private StreamingChatModel streamingChatModel;
@Bean
public AiCodeGeneratorService aiCodeGeneratorService() {
return AiServices.builder(AiCodeGeneratorService.class)
.chatModel(chatModel)
.streamingChatModel(streamingChatModel)
.build();
}
}
3.在 AI Service 中新增流式方法,跟之前方法的区别在于返回值改为了 Flux 对象
/**
* 生成 HTML 代码(流式)
*
* @param userMessage 用户消息
* @return 生成的代码结果
*/
@SystemMessage(fromResource = "prompt/codegen-html-system-prompt.txt")
Flux<String> generateHtmlCodeStream(String userMessage);
/**
* 生成多文件代码(流式)
*
* @param userMessage 用户消息
* @return 生成的代码结果
*/
@SystemMessage(fromResource = "prompt/codegen-multi-file-system-prompt.txt")
Flux<String> generateMultiFileCodeStream(String userMessage);
4.编写解析逻辑
由于流式输出返回的是字符串片段,需要在 AI 全部返回完成后进行解析
由于代码解析逻辑相对复杂,单独在 core 包下创建代码解析器 CodeParser。核心逻辑是通过正则表达式从完整 字符串中提取到对应的代码块,并返回结构化输出对象,这样可以复用之前的文件保存器。
/**
* 代码解析器
* 提供静态方法解析不同类型的代码内容
*/
public class CodeParser {
private static final Pattern HTML_CODE_PATTERN = Pattern.compile("```html\\s*\\n([\\s\\S]*?)```",
Pattern.CASE_INSENSITIVE);
private static final Pattern CSS_CODE_PATTERN = Pattern.compile("```css\\s*\\n([\\s\\S]*?)```",
Pattern.CASE_INSENSITIVE);
private static final Pattern JS_CODE_PATTERN =
Pattern.compile("```(?:js|javascript)\\s*\\n([\\s\\S]*?)```", Pattern.CASE_INSENSITIVE);
/**
* 解析 HTML 单文件代码
*/
public static HtmlCodeResult parseHtmlCode(String codeContent) {
HtmlCodeResult result = new HtmlCodeResult();
// 提取 HTML 代码
String htmlCode = extractHtmlCode(codeContent);
if (htmlCode != null && !htmlCode.trim().isEmpty()) {
result.setHtmlCode(htmlCode.trim());
} else {
// 如果没有找到代码块,将整个内容作为HTML
result.setHtmlCode(codeContent.trim());
}
return result;
}
/**
* 解析多文件代码(HTML + CSS + JS)
*/
public static MultiFileCodeResult parseMultiFileCode(String codeContent) {
MultiFileCodeResult result = new MultiFileCodeResult();
// 提取各类代码
String htmlCode = extractCodeByPattern(codeContent, HTML_CODE_PATTERN);
String cssCode = extractCodeByPattern(codeContent, CSS_CODE_PATTERN);
String jsCode = extractCodeByPattern(codeContent, JS_CODE_PATTERN);
// 设置HTML代码
if (htmlCode != null && !htmlCode.trim().isEmpty()) {
result.setHtmlCode(htmlCode.trim());
}
// 设置CSS代码
if (cssCode != null && !cssCode.trim().isEmpty()) {
result.setCssCode(cssCode.trim());
}
// 设置JS代码
if (jsCode != null && !jsCode.trim().isEmpty()) {
result.setJsCode(jsCode.trim());
}
return result;
}
/**
* 提取HTML代码内容
*
* @param content 原始内容
* @return HTML代码
*/
private static String extractHtmlCode(String content) {
Matcher matcher = HTML_CODE_PATTERN.matcher(content);
if (matcher.find()) {
return matcher.group(1);
}
return null;
}
/**
* 根据正则模式提取代码
*
* @param content 原始内容
* @param pattern 正则模式
* @return 提取的代码
*/
private static String extractCodeByPattern(String content, Pattern pattern) {
Matcher matcher = pattern.matcher(content);
if (matcher.find()) {
return matcher.group(1);
}
return null;
}
}
同时编写单元测试,结果符合预期
5.在 AiCodeGeneratorFacade 中添加流式调用 AI 的方法
针对每种生成模式,分别提供一个“生成代码并保存”的方法,核心逻辑都是:拼接 AI 实时响应的字符串,并在 流式返回完成后解析字符串并保存代码文件
/**
* 生成 HTML 模式的代码并保存(流式)
*
* @param userMessage 用户提示词
* @return 保存的目录
*/
private Flux<String> generateAndSaveHtmlCodeStream(String userMessage) {
Flux<String> result = aiCodeGeneratorService.generateHtmlCodeStream(userMessage);
// 当流式返回生成代码完成后,再保存代码
StringBuilder codeBuilder = new StringBuilder();
return result
.doOnNext(chunk -> {
// 实时收集代码片段
codeBuilder.append(chunk);
})
.doOnComplete(() -> {
// 流式返回完成后保存代码
try {
String completeHtmlCode = codeBuilder.toString();
HtmlCodeResult htmlCodeResult = CodeParser.parseHtmlCode(completeHtmlCode);
// 保存代码到文件
File savedDir = CodeFileSaver.saveHtmlCodeResult(htmlCodeResult);
log.info("保存成功,路径为:" + savedDir.getAbsolutePath());
} catch (Exception e) {
log.error("保存失败: {}", e.getMessage());
}
});
}
/**
* 生成多文件模式的代码并保存(流式)
*
* @param userMessage 用户提示词
* @return 保存的目录
*/
private Flux<String> generateAndSaveMultiFileCodeStream(String userMessage) {
Flux<String> result = aiCodeGeneratorService.generateMultiFileCodeStream(userMessage);
// 当流式返回生成代码完成后,再保存代码
StringBuilder codeBuilder = new StringBuilder();
return result
.doOnNext(chunk -> {
// 实时收集代码片段
codeBuilder.append(chunk);
})
.doOnComplete(() -> {
// 流式返回完成后保存代码
try {
String completeMultiFileCode = codeBuilder.toString();
MultiFileCodeResult multiFileResult =
CodeParser.parseMultiFileCode(completeMultiFileCode);
// 保存代码到文件
File savedDir = CodeFileSaver.saveMultiFileCodeResult(multiFileResult);
log.info("保存成功,路径为:" + savedDir.getAbsolutePath());
} catch (Exception e) {
log.error("保存失败: {}", e.getMessage());
}
});
}
6.在 AiCodeGeneratorFacade 中编写统一入口,根据生成模式枚举选择对应的流式方法
/**
* 统一入口:根据类型生成并保存代码(流式)
*
* @param userMessage 用户提示词
* @param codeGenTypeEnum 生成类型
*/
public Flux<String> generateAndSaveCodeStream(String userMessage, CodeGenTypeEnum codeGenTypeEnum) {
if (codeGenTypeEnum == null) {
throw new BusinessException(ErrorCode.SYSTEM_ERROR, "生成类型为空");
}
return switch (codeGenTypeEnum) {
case HTML -> generateAndSaveHtmlCodeStream(userMessage);
case MULTI_FILE -> generateAndSaveMultiFileCodeStream(userMessage);
default -> {
String errorMessage = "不支持的生成类型:" + codeGenTypeEnum.getValue();
throw new BusinessException(ErrorCode.SYSTEM_ERROR, errorMessage);
}
};
}
经过单元测试,实现流式输出,符合预期
思考
本次SSE流式输出的开发,不仅完成了业务目标,更让我在技术选型、问题解决、工程规范等方面得到了全面提升。我深刻认识到,开发不仅是“实现功能”,更是“做好功能”——既要兼顾业务需求和用户体验,也要注重代码的可维护性、稳定性和扩展性;既要掌握核心技术,也要学会权衡取舍,结合实际场景选择最合适的方案。后续,我将把本次开发积累的经验运用到后续工作中,不断优化开发思路,提升开发能力,为项目的稳定运行贡献力量。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)