Spring Boot 集成 Open WebUI 实现 AI 流式对话

本文介绍如何在 Spring Boot 项目中集成 Open WebUI,通过自动管理用户 Token、封装 OpenAI Java SDK 客户端,设计类似 Spring AI 的 Advisor 拦截链机制,实现 SSE 流式输出,并展示一个将业务数据(营销日志)注入对话上下文的完整 Advisor 实战案例。


目录

  1. 背景与架构概览
  2. 依赖与配置
  3. Token 生命周期管理
  4. 凭证提供与客户端管理
  5. 系统提示词管理
  6. Advisor 拦截链设计
  7. Advisor 实战:营销日志注入
  8. 请求与响应模型
  9. LLMService 流式对话核心实现
  10. LLMController 接口层
  11. 异步线程池配置
  12. 前端对接思路
  13. 整体流程图
  14. 设计总结与经验

背景与架构概览

在企业 CRM 系统中,我们希望为业务人员提供一个内嵌的 AI 助手。随着业务需求增加,单纯的"问答"已不能满足场景需求——我们需要在对话前后植入业务逻辑,例如将用户的营销工作日志自动注入到对话上下文,让 AI 能结合真实工作数据给出建议。

为此,在第一版直接调用 OpenAI SDK 的基础上,我们设计了一套 Advisor 拦截链机制,类似 Spring AI 的 Advisor 模式,实现请求的前置增强、后置处理与链式传递。

整体方案选型:

组件 说明
Open WebUI 开源 LLM 前端平台,提供 OpenAI 兼容接口,支持多模型管理
openai-java SDK 官方 Java 客户端,直接对接 OpenAI 兼容 API
Spring WebFlux 响应式编程,支持 SSE(Server-Sent Events)流式推送
Redis 缓存 Token,避免每次请求都重新登录 Open WebUI
Advisor Chain 自研拦截链,用于对话前后植入业务增强逻辑

整体请求链路:

前端输入框 → POST /v1/chat/completions
    → LLMController(SSE 接口)
    → LLMService(构造 ChatClient + ChatCompletionCreateParams)
    → ChatClient.stream()
        → Advisor 链(按 order 顺序依次执行)
            → EnrichMarketingLogAdvisor(业务数据注入)
            → ... 更多 Advisor
        → OpenAI Java SDK(流式调用 Open WebUI)
    → SSE 逐块推送回前端

依赖与配置

Maven 依赖

<!-- OpenAI Java 官方 SDK -->
<dependency>
    <groupId>com.openai</groupId>
    <artifactId>openai-java</artifactId>
    <version>2.5.0</version>
</dependency>

<!-- Spring WebFlux(SSE 支持) -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

<!-- Hutool(HTTP 工具 + JSON 解析) -->
<dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>5.8.x</version>
</dependency>

<!-- Spring Data Redis -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

application.yml 配置

open-web-ui:
  base-url: http://192.168.4.45:2500/api   # Open WebUI 的 OpenAI 兼容接口地址

常量统一管理,避免 hardcode 分散在各处:

public class OpenWebUIConstant {

    private OpenWebUIConstant() {}

    /** Open WebUI 登录接口地址 */
    public static final String LOGIN_URL = "http://192.168.4.45:2500/api/v1/auths/signin";

    /** 默认模型名称 */
    public static final String MODEL = "Qwen3-VL-8B-Instruct";
}

Token 生命周期管理

设计思路

Open WebUI 使用 JWT Token 进行鉴权,每次请求若都重新登录,不仅效率低,还会给服务造成压力。我们设计了**“先查缓存 → 有效直接用 → 过期再刷新”**的 Token 自动管理机制,并通过 Redis 实现跨实例共享。

TokenRepository 接口

public interface TokenRepository {
    OpenWebUIToken get(String key);
    void save(String key, OpenWebUIToken value);
    void delete(String key);
}

Redis 存储实现

public class RedisTokenRepository implements TokenRepository {

    private String prefix = "openwebui:";
    private final RedisTemplate<Object, Object> redisTemplate;

    public RedisTokenRepository(RedisTemplate<Object, Object> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    @Override
    public OpenWebUIToken get(String key) {
        return BeanUtil.copyProperties(
            redisTemplate.opsForValue().get(prefix + key),
            OpenWebUIToken.class
        );
    }

    @Override
    public void save(String key, OpenWebUIToken value) {
        redisTemplate.opsForValue().set(prefix + key, value);
    }

    @Override
    public void delete(String key) {
        redisTemplate.delete(prefix + key);
    }
}

OpenWebUIToken 数据模型

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class OpenWebUIToken implements Serializable {

    private String accessToken;
    private Long expireAt;      // 过期时间(毫秒时间戳)
    private String username;
    private String password;

    /** Token 是否仍然有效 */
    public boolean isValid() {
        return StringUtils.isNotBlank(accessToken)
            && Objects.nonNull(expireAt)
            && TimeUtil.getLocalDateTime(expireAt).isAfter(LocalDateTime.now());
    }
}

OpenWebUITokenManager 核心管理器

新版中 OpenWebUITokenManager 新增了对 CredentialProvider 的依赖,可直接通过 userId 获取 Token,无需在调用方手动查询凭证:

@Slf4j
public class OpenWebUITokenManager {

    private static final Long TOKEN_EXPIRE_TIME = 60 * 60L; // 默认1小时

    private final String signInUrl;
    private final TokenRepository tokenRepository;
    private final CredentialProvider credentialProvider; // 新增依赖

    // ① 通过 userId 直接获取 Token(新增方法)
    public String getValidToken(String userId) {
        return getValidToken(credentialProvider.getCredential(userId));
    }

    // ② 通过 Credential 获取 Token
    public String getValidToken(Credential credential) {
        if (Objects.isNull(credential)) return null;
        return getValidToken(credential.getUsername(), credential.getPassword());
    }

    // ③ 核心:查缓存 → 有效返回 → 失效刷新
    public String getValidToken(String username, String password) {
        OpenWebUIToken cachedToken = tokenRepository.get(username);
        if (cachedToken != null && cachedToken.isValid()) {
            log.debug("使用缓存 Token: {}", username);
            return cachedToken.getAccessToken();
        }
        return refreshToken(username, password);
    }

    /** 并发安全的 Token 刷新(双检锁) */
    @Synchronized
    public String refreshToken(String username, String password) {
        // 加锁后二次检查,防止重复登录
        OpenWebUIToken cachedToken = tokenRepository.get(username);
        if (cachedToken != null && cachedToken.isValid()) {
            return cachedToken.getAccessToken();
        }
        try {
            HttpResponse response = HttpUtil.createPost(signInUrl)
                    .header("Content-Type", "application/json")
                    .body(JSONUtil.toJsonStr(Map.of("email", username, "password", password)))
                    .execute();

            JSONObject responseData = JSONUtil.parseObj(response.body());
            String token = responseData.getStr("token");

            Long expiresTime = TimeUtil.getEpochMilli(LocalDateTime.now().plusSeconds(TOKEN_EXPIRE_TIME));
            String expiresAt = responseData.get("expires_at").toString();
            if (StringUtils.hasText(expiresAt)) {
                expiresTime = Long.parseLong(expiresAt) * 1000;
            }

            OpenWebUIToken webUIToken = OpenWebUIToken.builder()
                    .accessToken(token).username(username).password(password)
                    .expireAt(expiresTime).build();
            tokenRepository.save(username, webUIToken);
            return token;
        } catch (Exception e) {
            log.error("登录获取 Token 失败: {}", username, e);
            throw new RuntimeException("Open WebUI 登录失败", e);
        }
    }
}

凭证提供与客户端管理

CredentialProvider

根据系统用户 ID 查询其对应的 Open WebUI 账号凭证:

@Slf4j
@Component
public class CredentialProvider {

    @Resource
    private SysUserService sysUserService;

    public Credential getCredential(String userId) {
        SysUser sysUser = sysUserService.getById(userId);
        if (Objects.isNull(sysUser)) {
            log.error("用户不存在: {}", userId);
            return null;
        }
        String aiEmail = sysUser.getAiEmail();
        // 密码规则:邮箱前缀 + 固定后缀
        String password = aiEmail.split("@")[0] + "AI++2025&";
        return new Credential(aiEmail, password);
    }
}

OpenAiClientManager(新增)

OpenAIClient 的构建抽离为独立组件,屏蔽 Token 获取细节,供上层直接使用:

@Slf4j
@Component
public class OpenAiClientManager {

    @Value("${open-web-ui.base-url}")
    private String baseUrl;

    @Resource
    private CredentialProvider credentialProvider;

    @Resource
    private OpenWebUITokenManager openWebUITokenManager;

    /**
     * 根据用户 ID 构建 OpenAIClient
     * 内部自动完成:凭证查询 → Token 获取(含缓存)→ 客户端构建
     */
    public OpenAIClient getClient(String userId) {
        log.info("获取 OpenAIClient: {}", userId);
        Credential credential = credentialProvider.getCredential(userId);
        String validToken = openWebUITokenManager.getValidToken(credential);
        return OpenAIOkHttpClient.builder()
                .baseUrl(baseUrl)
                .apiKey(validToken)
                .build();
    }
}

设计意图OpenAiClientManager 是 Token 管理与 SDK 客户端之间的桥接层,调用方只需传入 userId,不需关心底层鉴权细节。


系统提示词管理

提示词以 .txt 文件存放于 classpath,通过枚举统一管理,扩展新场景无需改代码。

@Getter
@AllArgsConstructor
public enum SystemPromptEnum {

    DEFAULT("default", "默认");

    private final String code;
    private final String message;

    public static SystemPromptEnum of(String code) {
        for (SystemPromptEnum value : values()) {
            if (value.getCode().equals(code)) return value;
        }
        return DEFAULT;
    }
}
public class SystemPromptLoader {

    private static final String SYSTEM_PROMPT_PATH = "prompt/";
    private static final String SYSTEM_PROMPT_SUFFIX = "-system-prompt.txt";

    private SystemPromptLoader() {}

    /** 从 classpath/prompt/{name}-system-prompt.txt 加载提示词 */
    public static String loadSystemPrompt(String promptName) {
        ClassPathResource resource = new ClassPathResource(
            SYSTEM_PROMPT_PATH + promptName + SYSTEM_PROMPT_SUFFIX
        );
        try (InputStream in = resource.getInputStream()) {
            return new String(in.readAllBytes(), Charset.defaultCharset());
        } catch (Exception e) {
            log.error("加载系统提示失败: {}", promptName, e);
            return "";
        }
    }

    public static String loadSystemPrompt(SystemPromptEnum promptEnum) {
        return loadSystemPrompt(promptEnum.getCode());
    }
}

Advisor 拦截链设计

这是本次重构最核心的部分。参考 Spring AI 的 Advisor 设计思路,我们为流式对话实现了一套职责链(Chain of Responsibility)模式的拦截机制,使得业务增强逻辑(注入上下文、日志记录、敏感词过滤等)与 AI 调用逻辑完全解耦。

整体类图

Advisor(基础接口)
└── StreamAdvisor(流式拦截接口)
        └── EnrichMarketingLogAdvisor(业务实现)

StreamAdvisorChain(链接口)
└── DefaultStreamAdvisorChain(链实现,包含递归调用逻辑)

ChatClient(门面,装配 Advisor 链并触发调用)

Advisor 接口

所有拦截器的基础接口,定义名称与执行顺序:

public interface Advisor {
    /** 拦截器名称(用于日志追踪) */
    String getName();
    /** 执行顺序,数字越小越先执行 */
    int getOrder();
}

StreamAdvisor 接口

流式拦截器接口,核心方法 adviseStream 接收对话参数和链对象,必须调用 chain.nextStream() 将控制权传递给下一个拦截器

public interface StreamAdvisor extends Advisor {

    /**
     * 流式拦截方法
     * @param chatCompletionCreateParams 当前(可能被修改的)对话参数
     * @param chain 链对象,调用 chain.nextStream() 继续传递
     */
    Flux<ChatCompletionChunk> adviseStream(
        ChatCompletionCreateParams chatCompletionCreateParams,
        StreamAdvisorChain chain
    );
}

StreamAdvisorChain 接口

链接口,持有 Advisor 参数(用于 Advisor 之间传递上下文):

public interface StreamAdvisorChain {

    /** 触发下一个拦截器(或最终的 AI 调用) */
    Flux<ChatCompletionChunk> nextStream(ChatCompletionCreateParams chatCompletionCreateParams);

    /** 获取 Advisor 上下文参数 */
    Map<String, Object> getAdvisorParams();
}

DefaultStreamAdvisorChain 实现

链的核心实现,使用递归创建下一级 Chain 的方式推进拦截链。当所有 Advisor 执行完毕后,触发真正的 OpenAI 流式调用:

@Data
@Slf4j
public class DefaultStreamAdvisorChain implements StreamAdvisorChain {

    private final List<StreamAdvisor> advisors;
    private final int currentIndex;           // 当前执行到第几个 Advisor
    private final OpenAIClient openAIClient;
    private Map<String, Object> advisorParams;

    @Override
    public Flux<ChatCompletionChunk> nextStream(ChatCompletionCreateParams params) {
        if (currentIndex < advisors.size()) {
            // 还有 Advisor 未执行:取出当前 Advisor,创建新链(index+1),调用拦截方法
            StreamAdvisor current = advisors.get(currentIndex);
            log.info("当前拦截器:{}", current.getName());

            DefaultStreamAdvisorChain nextChain =
                new DefaultStreamAdvisorChain(advisors, currentIndex + 1, openAIClient);
            nextChain.setAdvisorParams(advisorParams);

            return current.adviseStream(params, nextChain);

        } else {
            // 所有 Advisor 已执行完毕:发起真正的流式 AI 调用
            log.info("advisors completion, calling OpenAI...");
            return Flux.using(
                () -> openAIClient.chat().completions().createStreaming(params),
                streamResponse -> Flux.fromStream(streamResponse.stream()),
                StreamResponse::close
            ).subscribeOn(Schedulers.boundedElastic());
        }
    }
}

执行流程示意:

chain.nextStream(params)
  → Advisor[0].adviseStream(params, chain[1])
      → 业务处理(修改 params)
      → chain[1].nextStream(newParams)
          → Advisor[1].adviseStream(newParams, chain[2])
              → chain[2].nextStream(newParams)
                  → [无更多 Advisor] → OpenAI 流式调用

ChatClient 封装

ChatClient 是整个 Advisor 机制的门面类,使用 Builder 模式装配 Advisor 和参数,调用 stream() 方法触发整个调用链:

@Data
@Slf4j
public class ChatClient {

    private final OpenAIClient openAIClient;
    private final List<Advisor> defaultAdvisors;  // 全局默认 Advisor
    private final List<Advisor> advisors;          // 本次调用的 Advisor
    private final Map<String, Object> advisorParams; // Advisor 上下文参数

    public static Builder builder(@NonNull OpenAIClient openAIClient) {
        return new Builder(openAIClient);
    }

    public static class Builder {
        private final OpenAIClient openAIClient;
        private List<Advisor> defaultAdvisors = new ArrayList<>();
        private List<Advisor> advisors = new ArrayList<>();
        private Map<String, Object> advisorParams = new HashMap<>();

        /** 注册本次调用的 Advisor */
        public Builder advisors(Advisor... advisors) {
            this.advisors.addAll(Arrays.asList(advisors));
            return this;
        }

        /** 通过 Consumer 设置 Advisor 参数 */
        public Builder advisorParams(Consumer<Map<String, Object>> consumer) {
            consumer.accept(this.advisorParams);
            return this;
        }

        /** 批量设置 Advisor 参数 */
        public Builder advisorParams(Map<String, Object> params) {
            this.advisorParams.putAll(params);
            return this;
        }

        public ChatClient build() {
            return new ChatClient(this);
        }
    }

    /**
     * 触发流式对话(经过 Advisor 链后调用 OpenAI)
     */
    public Flux<ChatCompletionChunk> stream(ChatCompletionCreateParams params) {
        log.info("stream call start....");

        // 合并 defaultAdvisors 和 advisors,按 order 排序
        List<StreamAdvisor> streamAdvisors = getAllAdvisors().stream()
                .filter(a -> a instanceof StreamAdvisor)
                .map(a -> (StreamAdvisor) a)
                .sorted(Comparator.comparingInt(Advisor::getOrder))
                .collect(Collectors.toList());

        // 构建并启动链
        DefaultStreamAdvisorChain chain =
            new DefaultStreamAdvisorChain(streamAdvisors, 0, openAIClient);
        chain.setAdvisorParams(this.advisorParams);

        return chain.nextStream(params);
    }
}

Advisor 实战:营销日志注入

EnrichMarketingLogAdvisor 是一个完整的业务 Advisor 实现,展示了如何在对话前将数据库中的营销日志注入到用户 prompt 中,让 AI 能基于真实工作数据给出 KPI 评分建议。

业务场景

在 KPI 考核评分场景中,前端传入被考核人的 kpiParticipantId,Advisor 负责:

  1. advisorParams 中取出 kpiParticipantId
  2. 查询该考核周期内被评估人的营销工作日志
  3. 将日志文本追加到用户 prompt 末尾
  4. 用修改后的参数继续传递给下一个拦截器(最终到达 AI)

完整实现

@Slf4j
@Component
public class EnrichMarketingLogAdvisor implements StreamAdvisor {

    @Resource
    private MarketingLogLogService marketingLogLogService;
    @Resource
    private KpiParticipantService kpiParticipantService;
    @Resource
    private KpiInfoService kpiInfoService;

    private final String DEFAULT_RESPONSE = "暂无可参考的工作日志";

    @Override
    public Flux<ChatCompletionChunk> adviseStream(
            ChatCompletionCreateParams params,
            StreamAdvisorChain chain) {

        Map<String, Object> advisorParams = chain.getAdvisorParams();
        Object kpiParticipantId = advisorParams.get("kpiParticipantId");

        if (Objects.nonNull(kpiParticipantId)) {
            KpiParticipant kpiParticipant = kpiParticipantService.getById(kpiParticipantId.toString());

            if (Objects.nonNull(kpiParticipant)) {
                KpiInfo kpiInfo = kpiInfoService.getById(kpiParticipant.getKpiInfoId());

                // 以考核周期的「目标设定通知时间」和「考核评级截止时间」为日志查询时间范围
                String enrichMarketingLogStr = marketingLogLogService.enrichMarketingLog(
                    kpiInfo.getTargetSettingRemindDay(),
                    kpiInfo.getAssessGradeDeadline(),
                    kpiParticipant.getEvaluatedUserId()
                );

                // 无日志:直接返回默认提示,不调用 AI
                if (StringUtils.isBlank(enrichMarketingLogStr)) {
                    return defaultChunk();
                }

                // 取出原始消息列表,对最后一条用户消息进行增强
                List<ChatCompletionMessageParam> messages = params.messages();
                List<ChatCompletionMessageParam> newMessages =
                    new ArrayList<>(messages.subList(0, messages.size() - 1));

                ChatCompletionMessageParam lastMessage = messages.get(messages.size() - 1);

                if (lastMessage.isUser() && messages.size() <= 2) {
                    ChatCompletionUserMessageParam.Content content = lastMessage.asUser().content();
                    ChatCompletionMessageParam enrichedMessage;

                    if (content.isText()) {
                        // 将日志追加到用户 prompt 末尾
                        String enrichedText = content.asText() + "\n\n" + enrichMarketingLogStr;
                        enrichedMessage = ChatCompletionMessageParam.ofUser(
                            ChatCompletionUserMessageParam.builder()
                                .content(enrichedText)
                                .build()
                        );
                    } else {
                        enrichedMessage = ChatCompletionMessageParam.ofUser(
                            ChatCompletionUserMessageParam.builder().content(content).build()
                        );
                    }
                    newMessages.add(enrichedMessage);
                }

                // 用修改后的消息列表重建 params,透传其他参数
                ChatCompletionCreateParams enrichedParams = ChatCompletionCreateParams.builder()
                        .messages(newMessages)
                        .model(params.model())
                        .streamOptions(params.streamOptions())
                        .metadata(params.metadata())
                        .maxCompletionTokens(params.maxCompletionTokens())
                        .topP(params.topP())
                        .temperature(params.temperature())
                        .build();

                // 传递增强后的参数给下一个 Advisor
                return chain.nextStream(enrichedParams);
            }
        }

        // 无 kpiParticipantId:直接透传,不做任何修改
        return chain.nextStream(params);
    }

    /** 无日志时返回默认 Chunk,模拟 AI 响应格式 */
    private Flux<ChatCompletionChunk> defaultChunk() {
        return Flux.just(ChatCompletionChunk.builder()
                .id("default")
                .model("default")
                .created(System.currentTimeMillis())
                .choices(List.of(ChatCompletionChunk.Choice.builder()
                        .index(0)
                        .delta(ChatCompletionChunk.Choice.Delta.builder()
                                .content(DEFAULT_RESPONSE)
                                .build())
                        .finishReason(ChatCompletionChunk.Choice.FinishReason.STOP)
                        .build()))
                .build());
    }

    @Override
    public String getName() { return "marketing-log-enrichment"; }

    @Override
    public int getOrder() { return 0; }
}

Advisor 执行流程

前端传入:{ "prompt": "请评价该员工表现", "context": { "kpiParticipantId": "xxx" } }

EnrichMarketingLogAdvisor.adviseStream()
  ├── 从 advisorParams 取出 kpiParticipantId
  ├── 查询 KpiParticipant → KpiInfo(获取考核周期)
  ├── 查询营销日志(考核周期内)
  │   ├── 无日志 → 返回 defaultChunk(),终止链
  │   └── 有日志 → 追加到 prompt
  └── chain.nextStream(enrichedParams)
        → [无更多 Advisor] → OpenAI 流式调用
              → AI 基于真实日志给出评分建议

请求与响应模型

ChatRequest(更新版)

相比第一版,新增了 imageUrl(多模态)、options(模型参数)和 context(Advisor 上下文)三个字段:

@Data
public class ChatRequest {

    @Schema(description = "场景标识(用于加载特定场景提示词),默认 default")
    private String code = SystemPromptEnum.DEFAULT.getCode();

    @NotEmpty(message = "请输入文字...")
    @Schema(description = "用户输入内容")
    private String prompt;

    @Schema(description = "图片资源(base64 或 URL),用于多模态场景")
    private String imageUrl;

    @Schema(description = "模型名称")
    private String model = OpenWebUIConstant.MODEL;

    @Schema(description = "模型调用参数")
    private ChatRequestOptions options = new ChatRequestOptions();

    @Schema(description = "Advisor 上下文参数,用于向拦截器传递业务数据")
    private Map<String, Object> context = new HashMap<>();

    @Data
    public static class ChatRequestOptions {
        private int maxTokens = 1024;
        private double temperature = 0.7;
        private double topP = 0.9;
    }
}

字段说明:

字段 类型 必填 说明
code String 场景标识,关联提示词文件
prompt String 用户输入的问题或指令
imageUrl String 图片 URL 或 base64,启用多模态能力
model String 模型名称,默认 Qwen3-VL-8B-Instruct
options Object maxTokens / temperature / topP
context Map 传递给 Advisor 的业务上下文参数

ChatStreamingVo

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class ChatStreamingVo {

    @Schema(description = "本次推送的内容片段")
    private String content;

    @Schema(description = "使用的模型名称")
    private String model;
}

LLMService 流式对话核心实现

新版 LLMService 使用 OpenAiClientManagerChatClient 替换了原来的直接调用,并新增了多模态图片支持:

@Slf4j
@Service
public class LLMService {

    @Resource
    private OpenAiClientManager openAiClientManager;

    @Resource
    private EnrichMarketingLogAdvisor enrichMarketingLogAdvisor;

    public Flux<ServerSentEvent<ChatStreamingVo>> chatStream(ChatRequest chatRequest) {

        // 1. 加载系统提示词
        String systemPrompt = SystemPromptLoader.loadSystemPrompt(
            SystemPromptEnum.of(chatRequest.getCode())
        );

        // 2. 构建 ChatClient,注入 Advisor 和上下文参数
        ChatClient chatClient = ChatClient.builder(
                    openAiClientManager.getClient(SecurityUtils.getUser().getId()))
                .advisors(enrichMarketingLogAdvisor)        // 注册业务 Advisor
                .advisorParams(chatRequest.getContext())    // 传递 Advisor 上下文
                .build();

        // 3. 构建用户消息(纯文本 or 多模态)
        ChatCompletionUserMessageParam userMessageParam;

        if (StringUtils.hasText(chatRequest.getImageUrl())) {
            // 多模态:文本 + 图片
            List<ChatCompletionContentPart> parts = new ArrayList<>();
            parts.add(ChatCompletionContentPart.ofText(
                ChatCompletionContentPartText.builder().text(chatRequest.getPrompt()).build()
            ));
            parts.add(ChatCompletionContentPart.ofImageUrl(
                ChatCompletionContentPartImage.builder()
                    .imageUrl(ChatCompletionContentPartImage.ImageUrl.builder()
                        .url(chatRequest.getImageUrl()).build())
                    .build()
            ));
            userMessageParam = ChatCompletionUserMessageParam.builder()
                .contentOfArrayOfContentParts(parts).build();
        } else {
            // 纯文本
            userMessageParam = ChatCompletionUserMessageParam.builder()
                .content(chatRequest.getPrompt()).build();
        }

        // 4. 构建完整对话参数
        ChatCompletionCreateParams params = ChatCompletionCreateParams.builder()
                .model(chatRequest.getModel())
                .addSystemMessage(systemPrompt)
                .addMessage(userMessageParam)
                .build();

        // 5. 通过 ChatClient 触发流式调用(经过 Advisor 链)
        return chatClient.stream(params)
                .map(chunk -> {
                    String content = chunk.choices().stream()
                            .findFirst()
                            .flatMap(choice -> choice.delta().content())
                            .orElse("");
                    return ServerSentEvent.<ChatStreamingVo>builder()
                            .data(new ChatStreamingVo(content, chatRequest.getModel()))
                            .build();
                })
                .onErrorResume(e -> {
                    log.error("LLM 流式对话异常", e);
                    throw new BusinessException("LLM 流式对话异常");
                });
    }
}

LLMController 接口层

@Tag(name = "LLM 对话")
@RestController
@RequestMapping("/v1/chat")
public class LLMController {

    @Resource
    private LLMService llmService;

    @Operation(summary = "LLM 流式对话")
    @PreAuthorize("@knifeSecurity.authenticated()")
    @PostMapping(value = "/completions", produces = "text/event-stream")
    public Flux<ServerSentEvent<ChatStreamingVo>> chatStream(
            @Valid @RequestBody ChatRequest chatRequest) {
        return llmService.chatStream(chatRequest);
    }
}
  • produces = "text/event-stream":声明返回 SSE 格式。
  • 返回 Flux<ServerSentEvent<...>>:Spring WebFlux 自动将其转换为持续推送的 SSE 响应。

异步线程池配置

Spring MVC 默认线程池不适合长连接的流式响应,需要配置专用的异步执行器:

@Configuration
public class AsyncConfig implements AsyncConfigurer, WebMvcConfigurer {

    @Bean
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(50);     // 核心线程数
        executor.setMaxPoolSize(100);     // 最大线程数
        executor.setQueueCapacity(1000);  // 队列容量
        executor.setThreadNamePrefix("ai-stream-");
        executor.initialize();
        return executor;
    }

    @Override
    public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
        configurer.setTaskExecutor((AsyncTaskExecutor) getAsyncExecutor());
    }
}

说明WebMvcConfigurer.configureAsyncSupport 将自定义执行器注册为 Spring MVC 异步支持的线程池,确保 SSE 长连接在专用线程池中处理,不影响主请求线程。


前端对接思路

Fetch API + ReadableStream

async function sendChat(prompt, context = {}) {
  const response = await fetch('/v1/chat/completions', {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${token}`
    },
    body: JSON.stringify({
      prompt,
      code: 'default',
      context          // 传递 Advisor 所需的业务上下文
    })
  });

  const reader = response.body.getReader();
  const decoder = new TextDecoder('utf-8');
  let answer = '';

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    const lines = decoder.decode(value).split('\n')
      .filter(line => line.startsWith('data:'));

    for (const line of lines) {
      const data = line.replace('data:', '').trim();
      if (!data || data === '[DONE]') continue;
      try {
        answer += JSON.parse(data).content;
        document.getElementById('answer').innerText = answer;
      } catch (e) { /* 忽略非 JSON 行 */ }
    }
  }
}

// KPI 评分场景:传入 kpiParticipantId
sendChat('请结合工作日志对该员工进行评分', {
  kpiParticipantId: '12345'
});

Vue 3 示例

<template>
  <div class="ai-chat">
    <el-input v-model="prompt" type="textarea" :rows="3"
      placeholder="输入你的问题..." @keydown.ctrl.enter="sendChat" />
    <el-button type="primary" :loading="loading" @click="sendChat">发送</el-button>
    <div class="answer" v-if="answer"><pre>{{ answer }}</pre></div>
  </div>
</template>

<script setup>
import { ref } from 'vue'
const prompt = ref('')
const answer = ref('')
const loading = ref(false)

async function sendChat() {
  if (!prompt.value.trim()) return
  loading.value = true
  answer.value = ''

  const response = await fetch('/v1/chat/completions', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${token}` },
    body: JSON.stringify({ prompt: prompt.value })
  })

  const reader = response.body.getReader()
  const decoder = new TextDecoder()
  try {
    while (true) {
      const { done, value } = await reader.read()
      if (done) break
      for (const line of decoder.decode(value).split('\n')) {
        if (!line.startsWith('data:')) continue
        const json = line.slice(5).trim()
        if (!json || json === '[DONE]') continue
        answer.value += JSON.parse(json).content
      }
    }
  } finally {
    loading.value = false
  }
}
</script>

整体流程图

┌─────────────────────────────────────────────────────────────────────┐
│                          前端浏览器                                    │
│  输入 prompt + context → POST /v1/chat/completions                    │
│  ← 逐块接收 SSE 数据 → 实时渲染到文本框                                │
└────────────────────────────┬────────────────────────────────────────┘
                             │ HTTP SSE
┌────────────────────────────▼────────────────────────────────────────┐
│                       LLMController                                  │
│  @PostMapping(produces = "text/event-stream")                        │
└────────────────────────────┬────────────────────────────────────────┘
                             │
┌────────────────────────────▼────────────────────────────────────────┐
│                        LLMService                                    │
│  1. SystemPromptLoader → 加载提示词                                   │
│  2. OpenAiClientManager.getClient(userId) → 获取 OpenAIClient        │
│  3. ChatClient.builder(client)                                       │
│       .advisors(enrichMarketingLogAdvisor)                           │
│       .advisorParams(context) → 构建 ChatClient                      │
│  4. 构建 ChatCompletionCreateParams(支持多模态)                      │
│  5. chatClient.stream(params) → 触发 Advisor 链                      │
└────┬───────────────────────────────────┬───────────────────────────┘
     │ 客户端管理                          │ 链式调用
┌────▼─────────────────────┐  ┌──────────▼──────────────────────────┐
│  OpenAiClientManager      │  │  DefaultStreamAdvisorChain           │
│  credentialProvider       │  │  ├── [0] EnrichMarketingLogAdvisor   │
│  → tokenManager           │  │  │   查询营销日志,注入 prompt         │
│  → 构建 OpenAIClient      │  │  └── [末端] 调用 OpenAI 流式接口      │
└──────────────────────────┘  └──────────┬───────────────────────────┘
                                          │
┌─────────────────────────────────────────▼──────────────────────────┐
│                        Open WebUI                                    │
│  POST /api/v1/auths/signin  → 登录获取 Token(Redis 缓存)            │
│  POST /api/v1/chat/completions → 流式返回 ChatCompletionChunk         │
└────────────────────────────────────────────────────────────────────┘

设计总结与经验

✅ 核心亮点

设计点 说明
Advisor 拦截链 仿 Spring AI 设计,职责链模式解耦业务增强与 AI 调用
OpenAiClientManager 统一客户端构建入口,屏蔽 Token 鉴权细节
Token 双检锁 @Synchronized + 二次校验,防止高并发下重复登录
Token 直接按 userId 获取 TokenManager 直接依赖 CredentialProvider,简化调用链
多模态支持 imageUrl 字段 + 条件组装 ContentPart,文本/图片统一接口
Advisor 上下文参数 context 字段透传至 Advisor,业务数据注入与 AI 调用完全解耦
专用异步线程池 AsyncConfig 配置 ai-stream- 线程池,避免阻塞主请求线程
提示词文件化 classpath 枚举管理,扩展场景无需改代码

⚠️ 注意事项与改进建议

1. Advisor 参数类型安全

advisorParams 目前是 Map<String, Object>,建议定义强类型的参数封装类,避免 get("kpiParticipantId") 时的类型转换风险。

// 建议封装
@Data
public class KpiAdvisorParams {
    private String kpiParticipantId;
}

2. Redis TTL 与 Token 有效期对齐

当前实现未设置 Redis Key 的过期时间,建议在 save 时同步设置 TTL,避免无效 Token 长期占用内存:

long ttlSeconds = (expiresTime - System.currentTimeMillis()) / 1000;
redisTemplate.opsForValue().set(prefix + key, value, ttlSeconds, TimeUnit.SECONDS);

3. Advisor 扩展点

当前链式结构已具备良好的扩展性。新增一个 Advisor 只需:

  1. 实现 StreamAdvisor 接口;
  2. 注册为 Spring Bean;
  3. LLMService 中通过 .advisors(newAdvisor) 注入。

典型扩展场景:敏感词过滤、对话上下文记忆(History)、Token 用量统计等。

4. Nginx SSE 配置

location /v1/chat/ {
    proxy_pass http://backend;
    proxy_buffering off;
    proxy_read_timeout 120s;
    proxy_set_header Cache-Control no-cache;
    proxy_set_header Connection '';
    proxy_http_version 1.1;
}

5. 多模态模型兼容性

imageUrl 字段仅在使用支持视觉能力的模型(如 Qwen3-VL-8B-Instruct)时有效,使用前需确认 Open WebUI 中部署的模型支持多模态输入。


小结

本文完整介绍了 CRM 系统中 AI 聊天模块的第二版架构演进,相比第一版的核心改进在于:

  1. Advisor 拦截链:将业务增强逻辑(如日志注入)与 AI 调用解耦,扩展新场景只需新增 Advisor;
  2. OpenAiClientManager:统一客户端管理,Token 获取细节对上层透明;
  3. 多模态支持:单一接口同时支持纯文本和图文输入;
  4. 上下文参数透传context 字段使 Advisor 与前端业务参数无缝对接。

这套架构已在 KPI 评分、营销分析等业务场景中落地验证,具备良好的可扩展性和可维护性。


作者:北风朝向 · 2026年5月

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐