Spring Boot 集成 Open WebUI 实现 AI 流式对话
Spring Boot 集成 Open WebUI 实现 AI 流式对话
本文介绍如何在 Spring Boot 项目中集成 Open WebUI,通过自动管理用户 Token、封装 OpenAI Java SDK 客户端,设计类似 Spring AI 的 Advisor 拦截链机制,实现 SSE 流式输出,并展示一个将业务数据(营销日志)注入对话上下文的完整 Advisor 实战案例。
目录
- 背景与架构概览
- 依赖与配置
- Token 生命周期管理
- 凭证提供与客户端管理
- 系统提示词管理
- Advisor 拦截链设计
- Advisor 实战:营销日志注入
- 请求与响应模型
- LLMService 流式对话核心实现
- LLMController 接口层
- 异步线程池配置
- 前端对接思路
- 整体流程图
- 设计总结与经验
背景与架构概览
在企业 CRM 系统中,我们希望为业务人员提供一个内嵌的 AI 助手。随着业务需求增加,单纯的"问答"已不能满足场景需求——我们需要在对话前后植入业务逻辑,例如将用户的营销工作日志自动注入到对话上下文,让 AI 能结合真实工作数据给出建议。
为此,在第一版直接调用 OpenAI SDK 的基础上,我们设计了一套 Advisor 拦截链机制,类似 Spring AI 的 Advisor 模式,实现请求的前置增强、后置处理与链式传递。
整体方案选型:
| 组件 | 说明 |
|---|---|
| Open WebUI | 开源 LLM 前端平台,提供 OpenAI 兼容接口,支持多模型管理 |
| openai-java SDK | 官方 Java 客户端,直接对接 OpenAI 兼容 API |
| Spring WebFlux | 响应式编程,支持 SSE(Server-Sent Events)流式推送 |
| Redis | 缓存 Token,避免每次请求都重新登录 Open WebUI |
| Advisor Chain | 自研拦截链,用于对话前后植入业务增强逻辑 |
整体请求链路:
前端输入框 → POST /v1/chat/completions
→ LLMController(SSE 接口)
→ LLMService(构造 ChatClient + ChatCompletionCreateParams)
→ ChatClient.stream()
→ Advisor 链(按 order 顺序依次执行)
→ EnrichMarketingLogAdvisor(业务数据注入)
→ ... 更多 Advisor
→ OpenAI Java SDK(流式调用 Open WebUI)
→ SSE 逐块推送回前端
依赖与配置
Maven 依赖
<!-- OpenAI Java 官方 SDK -->
<dependency>
<groupId>com.openai</groupId>
<artifactId>openai-java</artifactId>
<version>2.5.0</version>
</dependency>
<!-- Spring WebFlux(SSE 支持) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- Hutool(HTTP 工具 + JSON 解析) -->
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.x</version>
</dependency>
<!-- Spring Data Redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
application.yml 配置
open-web-ui:
base-url: http://192.168.4.45:2500/api # Open WebUI 的 OpenAI 兼容接口地址
常量统一管理,避免 hardcode 分散在各处:
public class OpenWebUIConstant {
private OpenWebUIConstant() {}
/** Open WebUI 登录接口地址 */
public static final String LOGIN_URL = "http://192.168.4.45:2500/api/v1/auths/signin";
/** 默认模型名称 */
public static final String MODEL = "Qwen3-VL-8B-Instruct";
}
Token 生命周期管理
设计思路
Open WebUI 使用 JWT Token 进行鉴权,每次请求若都重新登录,不仅效率低,还会给服务造成压力。我们设计了**“先查缓存 → 有效直接用 → 过期再刷新”**的 Token 自动管理机制,并通过 Redis 实现跨实例共享。
TokenRepository 接口
public interface TokenRepository {
OpenWebUIToken get(String key);
void save(String key, OpenWebUIToken value);
void delete(String key);
}
Redis 存储实现
public class RedisTokenRepository implements TokenRepository {
private String prefix = "openwebui:";
private final RedisTemplate<Object, Object> redisTemplate;
public RedisTokenRepository(RedisTemplate<Object, Object> redisTemplate) {
this.redisTemplate = redisTemplate;
}
@Override
public OpenWebUIToken get(String key) {
return BeanUtil.copyProperties(
redisTemplate.opsForValue().get(prefix + key),
OpenWebUIToken.class
);
}
@Override
public void save(String key, OpenWebUIToken value) {
redisTemplate.opsForValue().set(prefix + key, value);
}
@Override
public void delete(String key) {
redisTemplate.delete(prefix + key);
}
}
OpenWebUIToken 数据模型
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class OpenWebUIToken implements Serializable {
private String accessToken;
private Long expireAt; // 过期时间(毫秒时间戳)
private String username;
private String password;
/** Token 是否仍然有效 */
public boolean isValid() {
return StringUtils.isNotBlank(accessToken)
&& Objects.nonNull(expireAt)
&& TimeUtil.getLocalDateTime(expireAt).isAfter(LocalDateTime.now());
}
}
OpenWebUITokenManager 核心管理器
新版中 OpenWebUITokenManager 新增了对 CredentialProvider 的依赖,可直接通过 userId 获取 Token,无需在调用方手动查询凭证:
@Slf4j
public class OpenWebUITokenManager {
private static final Long TOKEN_EXPIRE_TIME = 60 * 60L; // 默认1小时
private final String signInUrl;
private final TokenRepository tokenRepository;
private final CredentialProvider credentialProvider; // 新增依赖
// ① 通过 userId 直接获取 Token(新增方法)
public String getValidToken(String userId) {
return getValidToken(credentialProvider.getCredential(userId));
}
// ② 通过 Credential 获取 Token
public String getValidToken(Credential credential) {
if (Objects.isNull(credential)) return null;
return getValidToken(credential.getUsername(), credential.getPassword());
}
// ③ 核心:查缓存 → 有效返回 → 失效刷新
public String getValidToken(String username, String password) {
OpenWebUIToken cachedToken = tokenRepository.get(username);
if (cachedToken != null && cachedToken.isValid()) {
log.debug("使用缓存 Token: {}", username);
return cachedToken.getAccessToken();
}
return refreshToken(username, password);
}
/** 并发安全的 Token 刷新(双检锁) */
@Synchronized
public String refreshToken(String username, String password) {
// 加锁后二次检查,防止重复登录
OpenWebUIToken cachedToken = tokenRepository.get(username);
if (cachedToken != null && cachedToken.isValid()) {
return cachedToken.getAccessToken();
}
try {
HttpResponse response = HttpUtil.createPost(signInUrl)
.header("Content-Type", "application/json")
.body(JSONUtil.toJsonStr(Map.of("email", username, "password", password)))
.execute();
JSONObject responseData = JSONUtil.parseObj(response.body());
String token = responseData.getStr("token");
Long expiresTime = TimeUtil.getEpochMilli(LocalDateTime.now().plusSeconds(TOKEN_EXPIRE_TIME));
String expiresAt = responseData.get("expires_at").toString();
if (StringUtils.hasText(expiresAt)) {
expiresTime = Long.parseLong(expiresAt) * 1000;
}
OpenWebUIToken webUIToken = OpenWebUIToken.builder()
.accessToken(token).username(username).password(password)
.expireAt(expiresTime).build();
tokenRepository.save(username, webUIToken);
return token;
} catch (Exception e) {
log.error("登录获取 Token 失败: {}", username, e);
throw new RuntimeException("Open WebUI 登录失败", e);
}
}
}
凭证提供与客户端管理
CredentialProvider
根据系统用户 ID 查询其对应的 Open WebUI 账号凭证:
@Slf4j
@Component
public class CredentialProvider {
@Resource
private SysUserService sysUserService;
public Credential getCredential(String userId) {
SysUser sysUser = sysUserService.getById(userId);
if (Objects.isNull(sysUser)) {
log.error("用户不存在: {}", userId);
return null;
}
String aiEmail = sysUser.getAiEmail();
// 密码规则:邮箱前缀 + 固定后缀
String password = aiEmail.split("@")[0] + "AI++2025&";
return new Credential(aiEmail, password);
}
}
OpenAiClientManager(新增)
将 OpenAIClient 的构建抽离为独立组件,屏蔽 Token 获取细节,供上层直接使用:
@Slf4j
@Component
public class OpenAiClientManager {
@Value("${open-web-ui.base-url}")
private String baseUrl;
@Resource
private CredentialProvider credentialProvider;
@Resource
private OpenWebUITokenManager openWebUITokenManager;
/**
* 根据用户 ID 构建 OpenAIClient
* 内部自动完成:凭证查询 → Token 获取(含缓存)→ 客户端构建
*/
public OpenAIClient getClient(String userId) {
log.info("获取 OpenAIClient: {}", userId);
Credential credential = credentialProvider.getCredential(userId);
String validToken = openWebUITokenManager.getValidToken(credential);
return OpenAIOkHttpClient.builder()
.baseUrl(baseUrl)
.apiKey(validToken)
.build();
}
}
设计意图:
OpenAiClientManager是 Token 管理与 SDK 客户端之间的桥接层,调用方只需传入userId,不需关心底层鉴权细节。
系统提示词管理
提示词以 .txt 文件存放于 classpath,通过枚举统一管理,扩展新场景无需改代码。
@Getter
@AllArgsConstructor
public enum SystemPromptEnum {
DEFAULT("default", "默认");
private final String code;
private final String message;
public static SystemPromptEnum of(String code) {
for (SystemPromptEnum value : values()) {
if (value.getCode().equals(code)) return value;
}
return DEFAULT;
}
}
public class SystemPromptLoader {
private static final String SYSTEM_PROMPT_PATH = "prompt/";
private static final String SYSTEM_PROMPT_SUFFIX = "-system-prompt.txt";
private SystemPromptLoader() {}
/** 从 classpath/prompt/{name}-system-prompt.txt 加载提示词 */
public static String loadSystemPrompt(String promptName) {
ClassPathResource resource = new ClassPathResource(
SYSTEM_PROMPT_PATH + promptName + SYSTEM_PROMPT_SUFFIX
);
try (InputStream in = resource.getInputStream()) {
return new String(in.readAllBytes(), Charset.defaultCharset());
} catch (Exception e) {
log.error("加载系统提示失败: {}", promptName, e);
return "";
}
}
public static String loadSystemPrompt(SystemPromptEnum promptEnum) {
return loadSystemPrompt(promptEnum.getCode());
}
}
Advisor 拦截链设计
这是本次重构最核心的部分。参考 Spring AI 的 Advisor 设计思路,我们为流式对话实现了一套职责链(Chain of Responsibility)模式的拦截机制,使得业务增强逻辑(注入上下文、日志记录、敏感词过滤等)与 AI 调用逻辑完全解耦。
整体类图
Advisor(基础接口)
└── StreamAdvisor(流式拦截接口)
└── EnrichMarketingLogAdvisor(业务实现)
StreamAdvisorChain(链接口)
└── DefaultStreamAdvisorChain(链实现,包含递归调用逻辑)
ChatClient(门面,装配 Advisor 链并触发调用)
Advisor 接口
所有拦截器的基础接口,定义名称与执行顺序:
public interface Advisor {
/** 拦截器名称(用于日志追踪) */
String getName();
/** 执行顺序,数字越小越先执行 */
int getOrder();
}
StreamAdvisor 接口
流式拦截器接口,核心方法 adviseStream 接收对话参数和链对象,必须调用 chain.nextStream() 将控制权传递给下一个拦截器:
public interface StreamAdvisor extends Advisor {
/**
* 流式拦截方法
* @param chatCompletionCreateParams 当前(可能被修改的)对话参数
* @param chain 链对象,调用 chain.nextStream() 继续传递
*/
Flux<ChatCompletionChunk> adviseStream(
ChatCompletionCreateParams chatCompletionCreateParams,
StreamAdvisorChain chain
);
}
StreamAdvisorChain 接口
链接口,持有 Advisor 参数(用于 Advisor 之间传递上下文):
public interface StreamAdvisorChain {
/** 触发下一个拦截器(或最终的 AI 调用) */
Flux<ChatCompletionChunk> nextStream(ChatCompletionCreateParams chatCompletionCreateParams);
/** 获取 Advisor 上下文参数 */
Map<String, Object> getAdvisorParams();
}
DefaultStreamAdvisorChain 实现
链的核心实现,使用递归创建下一级 Chain 的方式推进拦截链。当所有 Advisor 执行完毕后,触发真正的 OpenAI 流式调用:
@Data
@Slf4j
public class DefaultStreamAdvisorChain implements StreamAdvisorChain {
private final List<StreamAdvisor> advisors;
private final int currentIndex; // 当前执行到第几个 Advisor
private final OpenAIClient openAIClient;
private Map<String, Object> advisorParams;
@Override
public Flux<ChatCompletionChunk> nextStream(ChatCompletionCreateParams params) {
if (currentIndex < advisors.size()) {
// 还有 Advisor 未执行:取出当前 Advisor,创建新链(index+1),调用拦截方法
StreamAdvisor current = advisors.get(currentIndex);
log.info("当前拦截器:{}", current.getName());
DefaultStreamAdvisorChain nextChain =
new DefaultStreamAdvisorChain(advisors, currentIndex + 1, openAIClient);
nextChain.setAdvisorParams(advisorParams);
return current.adviseStream(params, nextChain);
} else {
// 所有 Advisor 已执行完毕:发起真正的流式 AI 调用
log.info("advisors completion, calling OpenAI...");
return Flux.using(
() -> openAIClient.chat().completions().createStreaming(params),
streamResponse -> Flux.fromStream(streamResponse.stream()),
StreamResponse::close
).subscribeOn(Schedulers.boundedElastic());
}
}
}
执行流程示意:
chain.nextStream(params)
→ Advisor[0].adviseStream(params, chain[1])
→ 业务处理(修改 params)
→ chain[1].nextStream(newParams)
→ Advisor[1].adviseStream(newParams, chain[2])
→ chain[2].nextStream(newParams)
→ [无更多 Advisor] → OpenAI 流式调用
ChatClient 封装
ChatClient 是整个 Advisor 机制的门面类,使用 Builder 模式装配 Advisor 和参数,调用 stream() 方法触发整个调用链:
@Data
@Slf4j
public class ChatClient {
private final OpenAIClient openAIClient;
private final List<Advisor> defaultAdvisors; // 全局默认 Advisor
private final List<Advisor> advisors; // 本次调用的 Advisor
private final Map<String, Object> advisorParams; // Advisor 上下文参数
public static Builder builder(@NonNull OpenAIClient openAIClient) {
return new Builder(openAIClient);
}
public static class Builder {
private final OpenAIClient openAIClient;
private List<Advisor> defaultAdvisors = new ArrayList<>();
private List<Advisor> advisors = new ArrayList<>();
private Map<String, Object> advisorParams = new HashMap<>();
/** 注册本次调用的 Advisor */
public Builder advisors(Advisor... advisors) {
this.advisors.addAll(Arrays.asList(advisors));
return this;
}
/** 通过 Consumer 设置 Advisor 参数 */
public Builder advisorParams(Consumer<Map<String, Object>> consumer) {
consumer.accept(this.advisorParams);
return this;
}
/** 批量设置 Advisor 参数 */
public Builder advisorParams(Map<String, Object> params) {
this.advisorParams.putAll(params);
return this;
}
public ChatClient build() {
return new ChatClient(this);
}
}
/**
* 触发流式对话(经过 Advisor 链后调用 OpenAI)
*/
public Flux<ChatCompletionChunk> stream(ChatCompletionCreateParams params) {
log.info("stream call start....");
// 合并 defaultAdvisors 和 advisors,按 order 排序
List<StreamAdvisor> streamAdvisors = getAllAdvisors().stream()
.filter(a -> a instanceof StreamAdvisor)
.map(a -> (StreamAdvisor) a)
.sorted(Comparator.comparingInt(Advisor::getOrder))
.collect(Collectors.toList());
// 构建并启动链
DefaultStreamAdvisorChain chain =
new DefaultStreamAdvisorChain(streamAdvisors, 0, openAIClient);
chain.setAdvisorParams(this.advisorParams);
return chain.nextStream(params);
}
}
Advisor 实战:营销日志注入
EnrichMarketingLogAdvisor 是一个完整的业务 Advisor 实现,展示了如何在对话前将数据库中的营销日志注入到用户 prompt 中,让 AI 能基于真实工作数据给出 KPI 评分建议。
业务场景
在 KPI 考核评分场景中,前端传入被考核人的 kpiParticipantId,Advisor 负责:
- 从
advisorParams中取出kpiParticipantId - 查询该考核周期内被评估人的营销工作日志
- 将日志文本追加到用户 prompt 末尾
- 用修改后的参数继续传递给下一个拦截器(最终到达 AI)
完整实现
@Slf4j
@Component
public class EnrichMarketingLogAdvisor implements StreamAdvisor {
@Resource
private MarketingLogLogService marketingLogLogService;
@Resource
private KpiParticipantService kpiParticipantService;
@Resource
private KpiInfoService kpiInfoService;
private final String DEFAULT_RESPONSE = "暂无可参考的工作日志";
@Override
public Flux<ChatCompletionChunk> adviseStream(
ChatCompletionCreateParams params,
StreamAdvisorChain chain) {
Map<String, Object> advisorParams = chain.getAdvisorParams();
Object kpiParticipantId = advisorParams.get("kpiParticipantId");
if (Objects.nonNull(kpiParticipantId)) {
KpiParticipant kpiParticipant = kpiParticipantService.getById(kpiParticipantId.toString());
if (Objects.nonNull(kpiParticipant)) {
KpiInfo kpiInfo = kpiInfoService.getById(kpiParticipant.getKpiInfoId());
// 以考核周期的「目标设定通知时间」和「考核评级截止时间」为日志查询时间范围
String enrichMarketingLogStr = marketingLogLogService.enrichMarketingLog(
kpiInfo.getTargetSettingRemindDay(),
kpiInfo.getAssessGradeDeadline(),
kpiParticipant.getEvaluatedUserId()
);
// 无日志:直接返回默认提示,不调用 AI
if (StringUtils.isBlank(enrichMarketingLogStr)) {
return defaultChunk();
}
// 取出原始消息列表,对最后一条用户消息进行增强
List<ChatCompletionMessageParam> messages = params.messages();
List<ChatCompletionMessageParam> newMessages =
new ArrayList<>(messages.subList(0, messages.size() - 1));
ChatCompletionMessageParam lastMessage = messages.get(messages.size() - 1);
if (lastMessage.isUser() && messages.size() <= 2) {
ChatCompletionUserMessageParam.Content content = lastMessage.asUser().content();
ChatCompletionMessageParam enrichedMessage;
if (content.isText()) {
// 将日志追加到用户 prompt 末尾
String enrichedText = content.asText() + "\n\n" + enrichMarketingLogStr;
enrichedMessage = ChatCompletionMessageParam.ofUser(
ChatCompletionUserMessageParam.builder()
.content(enrichedText)
.build()
);
} else {
enrichedMessage = ChatCompletionMessageParam.ofUser(
ChatCompletionUserMessageParam.builder().content(content).build()
);
}
newMessages.add(enrichedMessage);
}
// 用修改后的消息列表重建 params,透传其他参数
ChatCompletionCreateParams enrichedParams = ChatCompletionCreateParams.builder()
.messages(newMessages)
.model(params.model())
.streamOptions(params.streamOptions())
.metadata(params.metadata())
.maxCompletionTokens(params.maxCompletionTokens())
.topP(params.topP())
.temperature(params.temperature())
.build();
// 传递增强后的参数给下一个 Advisor
return chain.nextStream(enrichedParams);
}
}
// 无 kpiParticipantId:直接透传,不做任何修改
return chain.nextStream(params);
}
/** 无日志时返回默认 Chunk,模拟 AI 响应格式 */
private Flux<ChatCompletionChunk> defaultChunk() {
return Flux.just(ChatCompletionChunk.builder()
.id("default")
.model("default")
.created(System.currentTimeMillis())
.choices(List.of(ChatCompletionChunk.Choice.builder()
.index(0)
.delta(ChatCompletionChunk.Choice.Delta.builder()
.content(DEFAULT_RESPONSE)
.build())
.finishReason(ChatCompletionChunk.Choice.FinishReason.STOP)
.build()))
.build());
}
@Override
public String getName() { return "marketing-log-enrichment"; }
@Override
public int getOrder() { return 0; }
}
Advisor 执行流程
前端传入:{ "prompt": "请评价该员工表现", "context": { "kpiParticipantId": "xxx" } }
EnrichMarketingLogAdvisor.adviseStream()
├── 从 advisorParams 取出 kpiParticipantId
├── 查询 KpiParticipant → KpiInfo(获取考核周期)
├── 查询营销日志(考核周期内)
│ ├── 无日志 → 返回 defaultChunk(),终止链
│ └── 有日志 → 追加到 prompt
└── chain.nextStream(enrichedParams)
→ [无更多 Advisor] → OpenAI 流式调用
→ AI 基于真实日志给出评分建议
请求与响应模型
ChatRequest(更新版)
相比第一版,新增了 imageUrl(多模态)、options(模型参数)和 context(Advisor 上下文)三个字段:
@Data
public class ChatRequest {
@Schema(description = "场景标识(用于加载特定场景提示词),默认 default")
private String code = SystemPromptEnum.DEFAULT.getCode();
@NotEmpty(message = "请输入文字...")
@Schema(description = "用户输入内容")
private String prompt;
@Schema(description = "图片资源(base64 或 URL),用于多模态场景")
private String imageUrl;
@Schema(description = "模型名称")
private String model = OpenWebUIConstant.MODEL;
@Schema(description = "模型调用参数")
private ChatRequestOptions options = new ChatRequestOptions();
@Schema(description = "Advisor 上下文参数,用于向拦截器传递业务数据")
private Map<String, Object> context = new HashMap<>();
@Data
public static class ChatRequestOptions {
private int maxTokens = 1024;
private double temperature = 0.7;
private double topP = 0.9;
}
}
字段说明:
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
code |
String | 否 | 场景标识,关联提示词文件 |
prompt |
String | 是 | 用户输入的问题或指令 |
imageUrl |
String | 否 | 图片 URL 或 base64,启用多模态能力 |
model |
String | 否 | 模型名称,默认 Qwen3-VL-8B-Instruct |
options |
Object | 否 | maxTokens / temperature / topP |
context |
Map | 否 | 传递给 Advisor 的业务上下文参数 |
ChatStreamingVo
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class ChatStreamingVo {
@Schema(description = "本次推送的内容片段")
private String content;
@Schema(description = "使用的模型名称")
private String model;
}
LLMService 流式对话核心实现
新版 LLMService 使用 OpenAiClientManager 和 ChatClient 替换了原来的直接调用,并新增了多模态图片支持:
@Slf4j
@Service
public class LLMService {
@Resource
private OpenAiClientManager openAiClientManager;
@Resource
private EnrichMarketingLogAdvisor enrichMarketingLogAdvisor;
public Flux<ServerSentEvent<ChatStreamingVo>> chatStream(ChatRequest chatRequest) {
// 1. 加载系统提示词
String systemPrompt = SystemPromptLoader.loadSystemPrompt(
SystemPromptEnum.of(chatRequest.getCode())
);
// 2. 构建 ChatClient,注入 Advisor 和上下文参数
ChatClient chatClient = ChatClient.builder(
openAiClientManager.getClient(SecurityUtils.getUser().getId()))
.advisors(enrichMarketingLogAdvisor) // 注册业务 Advisor
.advisorParams(chatRequest.getContext()) // 传递 Advisor 上下文
.build();
// 3. 构建用户消息(纯文本 or 多模态)
ChatCompletionUserMessageParam userMessageParam;
if (StringUtils.hasText(chatRequest.getImageUrl())) {
// 多模态:文本 + 图片
List<ChatCompletionContentPart> parts = new ArrayList<>();
parts.add(ChatCompletionContentPart.ofText(
ChatCompletionContentPartText.builder().text(chatRequest.getPrompt()).build()
));
parts.add(ChatCompletionContentPart.ofImageUrl(
ChatCompletionContentPartImage.builder()
.imageUrl(ChatCompletionContentPartImage.ImageUrl.builder()
.url(chatRequest.getImageUrl()).build())
.build()
));
userMessageParam = ChatCompletionUserMessageParam.builder()
.contentOfArrayOfContentParts(parts).build();
} else {
// 纯文本
userMessageParam = ChatCompletionUserMessageParam.builder()
.content(chatRequest.getPrompt()).build();
}
// 4. 构建完整对话参数
ChatCompletionCreateParams params = ChatCompletionCreateParams.builder()
.model(chatRequest.getModel())
.addSystemMessage(systemPrompt)
.addMessage(userMessageParam)
.build();
// 5. 通过 ChatClient 触发流式调用(经过 Advisor 链)
return chatClient.stream(params)
.map(chunk -> {
String content = chunk.choices().stream()
.findFirst()
.flatMap(choice -> choice.delta().content())
.orElse("");
return ServerSentEvent.<ChatStreamingVo>builder()
.data(new ChatStreamingVo(content, chatRequest.getModel()))
.build();
})
.onErrorResume(e -> {
log.error("LLM 流式对话异常", e);
throw new BusinessException("LLM 流式对话异常");
});
}
}
LLMController 接口层
@Tag(name = "LLM 对话")
@RestController
@RequestMapping("/v1/chat")
public class LLMController {
@Resource
private LLMService llmService;
@Operation(summary = "LLM 流式对话")
@PreAuthorize("@knifeSecurity.authenticated()")
@PostMapping(value = "/completions", produces = "text/event-stream")
public Flux<ServerSentEvent<ChatStreamingVo>> chatStream(
@Valid @RequestBody ChatRequest chatRequest) {
return llmService.chatStream(chatRequest);
}
}
produces = "text/event-stream":声明返回 SSE 格式。- 返回
Flux<ServerSentEvent<...>>:Spring WebFlux 自动将其转换为持续推送的 SSE 响应。
异步线程池配置
Spring MVC 默认线程池不适合长连接的流式响应,需要配置专用的异步执行器:
@Configuration
public class AsyncConfig implements AsyncConfigurer, WebMvcConfigurer {
@Bean
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(50); // 核心线程数
executor.setMaxPoolSize(100); // 最大线程数
executor.setQueueCapacity(1000); // 队列容量
executor.setThreadNamePrefix("ai-stream-");
executor.initialize();
return executor;
}
@Override
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
configurer.setTaskExecutor((AsyncTaskExecutor) getAsyncExecutor());
}
}
说明:
WebMvcConfigurer.configureAsyncSupport将自定义执行器注册为 Spring MVC 异步支持的线程池,确保 SSE 长连接在专用线程池中处理,不影响主请求线程。
前端对接思路
Fetch API + ReadableStream
async function sendChat(prompt, context = {}) {
const response = await fetch('/v1/chat/completions', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
},
body: JSON.stringify({
prompt,
code: 'default',
context // 传递 Advisor 所需的业务上下文
})
});
const reader = response.body.getReader();
const decoder = new TextDecoder('utf-8');
let answer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
const lines = decoder.decode(value).split('\n')
.filter(line => line.startsWith('data:'));
for (const line of lines) {
const data = line.replace('data:', '').trim();
if (!data || data === '[DONE]') continue;
try {
answer += JSON.parse(data).content;
document.getElementById('answer').innerText = answer;
} catch (e) { /* 忽略非 JSON 行 */ }
}
}
}
// KPI 评分场景:传入 kpiParticipantId
sendChat('请结合工作日志对该员工进行评分', {
kpiParticipantId: '12345'
});
Vue 3 示例
<template>
<div class="ai-chat">
<el-input v-model="prompt" type="textarea" :rows="3"
placeholder="输入你的问题..." @keydown.ctrl.enter="sendChat" />
<el-button type="primary" :loading="loading" @click="sendChat">发送</el-button>
<div class="answer" v-if="answer"><pre>{{ answer }}</pre></div>
</div>
</template>
<script setup>
import { ref } from 'vue'
const prompt = ref('')
const answer = ref('')
const loading = ref(false)
async function sendChat() {
if (!prompt.value.trim()) return
loading.value = true
answer.value = ''
const response = await fetch('/v1/chat/completions', {
method: 'POST',
headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${token}` },
body: JSON.stringify({ prompt: prompt.value })
})
const reader = response.body.getReader()
const decoder = new TextDecoder()
try {
while (true) {
const { done, value } = await reader.read()
if (done) break
for (const line of decoder.decode(value).split('\n')) {
if (!line.startsWith('data:')) continue
const json = line.slice(5).trim()
if (!json || json === '[DONE]') continue
answer.value += JSON.parse(json).content
}
}
} finally {
loading.value = false
}
}
</script>
整体流程图
┌─────────────────────────────────────────────────────────────────────┐
│ 前端浏览器 │
│ 输入 prompt + context → POST /v1/chat/completions │
│ ← 逐块接收 SSE 数据 → 实时渲染到文本框 │
└────────────────────────────┬────────────────────────────────────────┘
│ HTTP SSE
┌────────────────────────────▼────────────────────────────────────────┐
│ LLMController │
│ @PostMapping(produces = "text/event-stream") │
└────────────────────────────┬────────────────────────────────────────┘
│
┌────────────────────────────▼────────────────────────────────────────┐
│ LLMService │
│ 1. SystemPromptLoader → 加载提示词 │
│ 2. OpenAiClientManager.getClient(userId) → 获取 OpenAIClient │
│ 3. ChatClient.builder(client) │
│ .advisors(enrichMarketingLogAdvisor) │
│ .advisorParams(context) → 构建 ChatClient │
│ 4. 构建 ChatCompletionCreateParams(支持多模态) │
│ 5. chatClient.stream(params) → 触发 Advisor 链 │
└────┬───────────────────────────────────┬───────────────────────────┘
│ 客户端管理 │ 链式调用
┌────▼─────────────────────┐ ┌──────────▼──────────────────────────┐
│ OpenAiClientManager │ │ DefaultStreamAdvisorChain │
│ credentialProvider │ │ ├── [0] EnrichMarketingLogAdvisor │
│ → tokenManager │ │ │ 查询营销日志,注入 prompt │
│ → 构建 OpenAIClient │ │ └── [末端] 调用 OpenAI 流式接口 │
└──────────────────────────┘ └──────────┬───────────────────────────┘
│
┌─────────────────────────────────────────▼──────────────────────────┐
│ Open WebUI │
│ POST /api/v1/auths/signin → 登录获取 Token(Redis 缓存) │
│ POST /api/v1/chat/completions → 流式返回 ChatCompletionChunk │
└────────────────────────────────────────────────────────────────────┘
设计总结与经验
✅ 核心亮点
| 设计点 | 说明 |
|---|---|
| Advisor 拦截链 | 仿 Spring AI 设计,职责链模式解耦业务增强与 AI 调用 |
| OpenAiClientManager | 统一客户端构建入口,屏蔽 Token 鉴权细节 |
| Token 双检锁 | @Synchronized + 二次校验,防止高并发下重复登录 |
| Token 直接按 userId 获取 | TokenManager 直接依赖 CredentialProvider,简化调用链 |
| 多模态支持 | imageUrl 字段 + 条件组装 ContentPart,文本/图片统一接口 |
| Advisor 上下文参数 | context 字段透传至 Advisor,业务数据注入与 AI 调用完全解耦 |
| 专用异步线程池 | AsyncConfig 配置 ai-stream- 线程池,避免阻塞主请求线程 |
| 提示词文件化 | classpath 枚举管理,扩展场景无需改代码 |
⚠️ 注意事项与改进建议
1. Advisor 参数类型安全
advisorParams 目前是 Map<String, Object>,建议定义强类型的参数封装类,避免 get("kpiParticipantId") 时的类型转换风险。
// 建议封装
@Data
public class KpiAdvisorParams {
private String kpiParticipantId;
}
2. Redis TTL 与 Token 有效期对齐
当前实现未设置 Redis Key 的过期时间,建议在 save 时同步设置 TTL,避免无效 Token 长期占用内存:
long ttlSeconds = (expiresTime - System.currentTimeMillis()) / 1000;
redisTemplate.opsForValue().set(prefix + key, value, ttlSeconds, TimeUnit.SECONDS);
3. Advisor 扩展点
当前链式结构已具备良好的扩展性。新增一个 Advisor 只需:
- 实现
StreamAdvisor接口; - 注册为 Spring Bean;
- 在
LLMService中通过.advisors(newAdvisor)注入。
典型扩展场景:敏感词过滤、对话上下文记忆(History)、Token 用量统计等。
4. Nginx SSE 配置
location /v1/chat/ {
proxy_pass http://backend;
proxy_buffering off;
proxy_read_timeout 120s;
proxy_set_header Cache-Control no-cache;
proxy_set_header Connection '';
proxy_http_version 1.1;
}
5. 多模态模型兼容性
imageUrl 字段仅在使用支持视觉能力的模型(如 Qwen3-VL-8B-Instruct)时有效,使用前需确认 Open WebUI 中部署的模型支持多模态输入。
小结
本文完整介绍了 CRM 系统中 AI 聊天模块的第二版架构演进,相比第一版的核心改进在于:
- Advisor 拦截链:将业务增强逻辑(如日志注入)与 AI 调用解耦,扩展新场景只需新增 Advisor;
- OpenAiClientManager:统一客户端管理,Token 获取细节对上层透明;
- 多模态支持:单一接口同时支持纯文本和图文输入;
- 上下文参数透传:
context字段使 Advisor 与前端业务参数无缝对接。
这套架构已在 KPI 评分、营销分析等业务场景中落地验证,具备良好的可扩展性和可维护性。
作者:北风朝向 · 2026年5月
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)