Spring AI 多模型切换与异步并发:一套代码支撑多厂商 + CompletableFuture 并行调用

本文覆盖 Spring AI 中的多模型配置与动态切换、基于场景的模型路由、故障降级、CompletableFuture 并行调用、JDK 21 虚拟线程赛马模式、多模型投票决策,以及信号量控制的批量情感分析。面向有 Spring Boot 基础、正在用 Spring AI 做大模型应用开发的后端工程师。


一、为什么需要多模型切换

实际项目中,单一模型几乎无法满足所有场景。常见的多模型需求包括:

需求 说明
成本控制 高频简单任务用便宜模型(DeepSeek-Chat),复杂任务才上高端模型(qwen-max)
故障转移 主模型出故障时自动切换到备用模型,保证业务不中断
场景路由 客服场景用温度低的千问,代码审查场景用推理强的 DeepSeek
质量对冲 多个模型同时回答同一问题,投票表决,提升结果可信度

Spring AI 的抽象层把这件事做得非常优雅——ChatClientChatModel 是统一接口,底层换厂商只需要换 Starter 依赖和配置,业务代码一行不改。


二、多模型共存:配置与 Bean 注册

2.1 引入多厂商 Starter

以 DeepSeek(兼容 OpenAI 协议)+ 通义千问(DashScope)为例,pom.xml 需要同时引入两个 Starter:

<!-- DeepSeek(走 OpenAI 兼容协议) -->
<dependency>
    <groupId>org.springframework.ai</groupId>
    <artifactId>spring-ai-starter-model-openai</artifactId>
</dependency>

<!-- 通义千问(阿里 DashScope) -->
<dependency>
    <groupId>com.alibaba.cloud.ai</groupId>
    <artifactId>spring-ai-alibaba-starter-dashscope</artifactId>
    <version>1.1.2.0</version>
</dependency>

2.2 排除自动装配冲突

两个 Starter 都会自动注册 ChatModel Bean,直接启动会报 Bean 冲突。解决方式是在启动类上用 exclude 排除两家的自动配置,然后自己手动注册:

@SpringBootApplication(exclude = {
    // 排除 OpenAI 自动配置
    org.springframework.ai.autoconfigure.openai.OpenAiAutoConfiguration.class,
    // 排除 DashScope 自动配置
    com.alibaba.cloud.ai.autoconfigure.dashscope.DashScopeAutoConfiguration.class
})
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

2.3 配置文件:双模型并存

spring:
  ai:
    openai:
      base-url: https://api.deepseek.com
      api-key: ${DEEPSEEK_API_KEY}
      chat:
        options:
          model: deepseek-chat
          temperature: 0.7
          max-tokens: 2048
    dashscope:
      api-key: ${DASHSCOPE_API_KEY}
      chat:
        options:
          model: qwen-max
          temperature: 0.7
          max-tokens: 2048

2.4 手动注册多个 ChatClient

排除自动配置后,通过 @Configuration 手动注册不同厂商的 ChatClient,用 @Bean 名称区分:

@Configuration
public class MultiModelConfig {

    /** 主力模型:DeepSeek,便宜,适合高频调用 */
    @Bean("primaryChatClient")
    public ChatClient primaryChatClient(OpenAiChatModel openAiChatModel) {
        return ChatClient.builder(openAiChatModel)
                .defaultSystem("你是一个专业的技术助手")
                .build();
    }

    /** 备用模型:通义千问,质量高,用于降级或高质量任务 */
    @Bean("backupChatClient")
    public ChatClient backupChatClient(DashScopeChatModel dashScopeChatModel) {
        return ChatClient.builder(dashScopeChatModel)
                .defaultSystem("你是一个专业的技术助手")
                .build();
    }
}

在 Controller 中通过 @Qualifier 注入指定的 Client:

@RestController
@RequestMapping("/multi-chat")
public class MultiChatController {

    private final ChatClient customerServiceChatClient;
    private final ChatClient codingChatClient;

    public MultiChatController(
            @Qualifier("customerServiceChatClient") ChatClient customerServiceChatClient,
            @Qualifier("codingChatClient") ChatClient codingChatClient) {
        this.customerServiceChatClient = customerServiceChatClient;
        this.codingChatClient = codingChatClient;
    }

    @GetMapping("/service")
    public String service(@RequestParam String message) {
        return customerServiceChatClient.prompt()
                .user(message)
                .call()
                .content();
    }

    @GetMapping("/code")
    public String code(@RequestParam String message) {
        return codingChatClient.prompt()
                .user(message)
                .call()
                .content();
    }
}

三、动态模型路由:三种实战方案

3.1 方案一:参数驱动——Map 存储 + 运行时选择

最直接的方式:把所有 ChatModel 放进一个 Map,请求时通过参数选择。

@RestController
@RequestMapping("/api/dynamic")
public class DynamicModelController {

    private final Map<String, ChatModel> modelMap;
    private final ChatModel defaultModel;

    public DynamicModelController(
            OpenAiChatModel openAiChatModel,
            DashScopeChatModel dashScopeChatModel) {
        this.modelMap = Map.of(
                "deepseek", openAiChatModel,
                "qianwen", dashScopeChatModel
        );
        this.defaultModel = openAiChatModel; // 默认 DeepSeek
    }

    @GetMapping("/chat")
    public String chat(
            @RequestParam String question,
            @RequestParam(defaultValue = "deepseek") String provider) {

        ChatModel model = modelMap.getOrDefault(provider, defaultModel);
        return ChatClient.create(model)
                .prompt()
                .user(question)
                .call()
                .content();
    }
}

调用示例:
- /api/dynamic/chat?question=你好&provider=deepseek —— 走 DeepSeek
- /api/dynamic/chat?question=你好&provider=qianwen —— 走通义千问

3.2 方案二:场景路由——按业务场景映射模型

不把模型名暴露给调用方,而是暴露业务场景,内部做映射:

@Service
public class ModelRouterService {

    private final Map<String, ChatClient> clientMap;

    public ModelRouterService(
            @Qualifier("primaryChatClient") ChatClient primary,
            @Qualifier("backupChatClient") ChatClient backup) {
        this.clientMap = Map.of(
                "primary", primary,
                "backup", backup
        );
    }

    /** 按业务场景选择模型 */
    public ChatClient routeByScene(String scene) {
        return switch (scene) {
            case "code_review" -> clientMap.get("primary");   // 代码审查 → DeepSeek
            case "customer_service" -> clientMap.get("backup"); // 客服 → 千问
            case "translation" -> clientMap.get("backup");      // 翻译 → 千问
            default -> clientMap.get("primary");
        };
    }
}

调用方只需传 scene=customer_service,不需要关心背后是哪个模型。模型切换时,只改这一处映射表即可。

3.3 方案三:配置驱动——自定义配置类 + 全局默认模型

把模型映射关系完全放到配置文件里,改配置不改代码:

app:
  ai:
    default-provider: deepseek
    providers:
      deepseek:
        model: deepseek-chat
        temperature: 0.7
      qianwen:
        model: qwen-max
        temperature: 0.5

对应的配置类:

@Component
@ConfigurationProperties(prefix = "app.ai")
public class AiProperties {

    private String defaultProvider;
    private Map<String, ProviderConfig> providers;

    // getter / setter 省略

    public static class ProviderConfig {
        private String model;
        private Double temperature;
        // getter / setter 省略
    }
}

路由服务读取配置,动态构建 Client 参数:

@Service
public class ConfigDrivenRouterService {

    private final Map<String, ChatClient> clientMap;
    private final ChatClient defaultClient;
    private final AiProperties properties;

    public ConfigDrivenRouterService(
            @Qualifier("primaryChatClient") ChatClient primary,
            @Qualifier("backupChatClient") ChatClient backup,
            AiProperties properties) {
        this.clientMap = Map.of("deepseek", primary, "qianwen", backup);
        this.properties = properties;
        this.defaultClient = clientMap.get(properties.getDefaultProvider());
    }

    public String chat(String provider, String question) {
        ChatClient client = clientMap.getOrDefault(provider, defaultClient);

        // 读取该 provider 的自定义配置
        AiProperties.ProviderConfig config = properties.getProviders().get(provider);
        ChatClient.CallPromptResponseSpec spec = client.prompt().user(question);

        // 如果配置了自定义参数,覆盖默认 options
        if (config != null && config.getTemperature() != null) {
            spec = (ChatClient.CallPromptResponseSpec) client.prompt()
                    .user(question)
                    .options(ChatOptionsBuilder.builder()
                            .temperature(config.getTemperature())
                            .build());
        }

        return client.prompt().user(question).call().content();
    }
}

想全局切换默认模型?改一行配置重启即可:default-provider: qianwen


四、故障降级:主模型挂了自动切备用

多模型最核心的生产价值之一就是故障转移。思路很简单:主模型调用失败,从剩余模型中选一个继续。

@Service
public class FallbackService {

    private final Map<String, ChatClient> clientMap;
    private final String defaultProvider;

    public FallbackService(
            @Qualifier("primaryChatClient") ChatClient primary,
            @Qualifier("backupChatClient") ChatClient backup,
            AiProperties properties) {
        this.clientMap = new LinkedHashMap<>();
        this.clientMap.put("deepseek", primary);
        this.clientMap.put("qianwen", backup);
        this.defaultProvider = properties.getDefaultProvider();
    }

    public String chatWithFallback(String provider, String question) {
        String targetProvider = provider != null ? provider : defaultProvider;
        ChatClient primaryClient = clientMap.getOrDefault(targetProvider,
                clientMap.get(defaultProvider));

        try {
            // 尝试主模型
            return primaryClient.prompt().user(question).call().content();
        } catch (Exception e) {
            log.warn("主模型 [{}] 调用失败,开始降级: {}", targetProvider, e.getMessage());

            // 从剩余模型中选一个
            return clientMap.entrySet().stream()
                    .filter(entry -> !entry.getKey().equals(targetProvider))
                    .findFirst()
                    .map(entry -> {
                        log.info("降级到模型: {}", entry.getKey());
                        return entry.getValue().prompt()
                                .user(question).call().content();
                    })
                    .orElseThrow(() -> new RuntimeException("所有模型均不可用"));
        }
    }
}

关键设计
- 用 LinkedHashMap 保持模型顺序,方便按优先级遍历
- filter 排除已失败的模型,从剩余模型中选第一个
- 如果所有模型都不可用,抛出明确异常,上层可以做熔断处理


五、异步并发调用:告别串行等待

5.1 串行调用的代价

AI 模型的单次调用通常需要 1~10 秒。如果业务需要同时调两个模型:

串行:DeepSeek(3s) + 千问(2s) = 5s
并行:max(DeepSeek(3s), 千问(2s)) = 3s

省了 40% 的等待时间。模型越多,并行的收益越大。

5.2 CompletableFuture 并行调用

最经典的 JDK 8+ 方案,两个请求同时发出,以最慢的那个为准:

@Service
public class ParallelCallService {

    private final ChatClient deepSeekClient;
    private final ChatClient qianWenClient;

    public ParallelCallService(
            @Qualifier("primaryChatClient") ChatClient deepSeekClient,
            @Qualifier("backupChatClient") ChatClient qianWenClient) {
        this.deepSeekClient = deepSeekClient;
        this.qianWenClient = qianWenClient;
    }

    public Map<String, String> parallelCall(String question) throws Exception {
        CompletableFuture<String> deepSeekFuture = CompletableFuture.supplyAsync(() ->
                deepSeekClient.prompt().user(question).call().content()
        );

        CompletableFuture<String> qianWenFuture = CompletableFuture.supplyAsync(() ->
                qianWenClient.prompt().user(question).call().content()
        );

        // 等待所有完成,超时 30 秒
        CompletableFuture.allOf(deepSeekFuture, qianWenFuture)
                .get(30, TimeUnit.SECONDS);

        return Map.of(
                "deepseek", deepSeekFuture.get(),
                "qianwen", qianWenFuture.get()
        );
    }
}

CompletableFuture.allOf() 会等所有 Future 都完成,再统一取结果。超时可以自己设定,避免某个模型卡死拖垮整个请求。

5.3 JDK 21 虚拟线程 + 赛马模式

项目用的 JDK 21,可以开启虚拟线程获得更轻量的并发能力。先在配置里开启:

spring:
  threads:
    virtual:
      enabled: true

再利用 StructuredTaskScope 实现赛马模式——谁先返回用谁的结果,其余自动取消:

@Service
public class VirtualThreadService {

    private final ChatClient deepSeekClient;
    private final ChatClient qianWenClient;
    private final ExecutorService executor =
            Executors.newVirtualThreadPerTaskExecutor();

    /** 赛马模式:最快的模型返回后立即响应,其余取消 */
    public String raceCall(String question) throws Exception {
        try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
            scope.fork(() -> deepSeekClient.prompt()
                    .user(question).call().content());
            scope.fork(() -> qianWenClient.prompt()
                    .user(question).call().content());

            scope.join(); // 等待第一个成功
            return scope.result(); // 返回最快的结果
        }
    }

    /** 全量模式:所有模型都跑完,收集全部结果 */
    public Map<String, String> allCall(String question) throws Exception {
        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(
                () -> deepSeekClient.prompt().user(question).call().content(),
                executor);
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(
                () -> qianWenClient.prompt().user(question).call().content(),
                executor);

        CompletableFuture.allOf(f1, f2).get(30, TimeUnit.SECONDS);
        return Map.of("deepseek", f1.get(), "qianwen", f2.get());
    }
}

赛马模式 vs 全量模式

模式 适用场景 耗时
赛马(ShutdownOnSuccess) 只要一个结果,追求最快响应 最快模型的耗时
全量(allOf) 需要多个模型的结果做对比或投票 最慢模型的耗时

六、实战:多模型投票决策

有些判断类任务(合同是否合规、代码是否有安全漏洞、商品是否符合售后条件),单个模型的回答不够可靠。让多个模型各自独立回答,然后统计票数,用多数决的方式提升结果置信度

6.1 投票服务

@Service
public class VoteService {

    private final List<ChatClient> clients;
    private final ExecutorService executor =
            Executors.newVirtualThreadPerTaskExecutor();

    /** 投票结果 */
    public record VoteResult(String answer, int votes, double confidence) {}

    public VoteService(
            @Qualifier("primaryChatClient") ChatClient primary,
            @Qualifier("backupChatClient") ChatClient backup) {
        this.clients = List.of(primary, backup);
    }

    public VoteResult vote(String question) {
        // 1. 所有模型并行回答,要求只回复 YES 或 NO
        String prompt = question + "\n请只回答 YES 或 NO,不要解释。";

        List<CompletableFuture<String>> futures = clients.stream()
                .map(client -> CompletableFuture.supplyAsync(
                        () -> client.prompt().user(prompt).call().content(),
                        executor))
                .toList();

        // 2. 等待所有完成
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();

        // 3. 收集结果,过滤异常
        List<String> answers = futures.stream()
                .map(f -> {
                    try { return f.get().trim().toUpperCase(); }
                    catch (Exception e) { return null; }
                })
                .filter(Objects::nonNull)
                .toList();

        // 4. 统计票数
        Map<String, Long> voteCounts = answers.stream()
                .collect(Collectors.groupingBy(a -> a, Collectors.counting()));

        // 5. 取票数最多的答案
        Map.Entry<String, Long> winner = voteCounts.entrySet().stream()
                .max(Map.Entry.comparingByValue())
                .orElseThrow();

        double confidence = (double) winner.getValue() / clients.size();
        return new VoteResult(winner.getKey(), winner.getValue().intValue(), confidence);
    }
}

6.2 调用示例

@GetMapping("/vote")
public VoteService.VoteResult vote(@RequestParam String question) {
    return voteService.vote(question);
}

请求:/api/vote?question=这段代码 SELECT * FROM users WHERE id = ' + input 是否有SQL注入风险?

返回:

{
  "answer": "YES",
  "votes": 2,
  "confidence": 1.0
}

两个模型都认为有注入风险,置信度 100%。如果一个说 YES 一个说 NO,置信度就是 50%——这时应该走人工复核。一般来说,设定 75%~80% 以上的置信度为可信阈值是比较合理的做法。


七、实战:信号量控制的批量情感分析

假设有 100 条用户评论需要做情感分析,全部一起丢给模型会瞬间打爆 API 限流。用 Semaphore 控制并发数,既保证效率又不触发限流。

7.1 数据模型

public enum Sentiment { POSITIVE, NEUTRAL, NEGATIVE }

public record SentimentResult(String content, Sentiment sentiment, int score) {}

7.2 批量分析服务

@Service
public class SentimentAnalysisService {

    private final ChatClient chatClient;

    public SentimentAnalysisService(
            @Qualifier("primaryChatClient") ChatClient chatClient) {
        this.chatClient = chatClient;
    }

    public List<SentimentResult> batchAnalyze(List<String> texts, int concurrency) {
        ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
        Semaphore semaphore = new Semaphore(concurrency);

        // 1. 为每条文本创建一个异步任务
        List<CompletableFuture<SentimentResult>> futures = texts.stream()
                .map(text -> CompletableFuture.supplyAsync(() -> {
                    try {
                        semaphore.acquire(); // 获取令牌
                        String response = chatClient.prompt()
                                .user("""
                                    分析以下文本的情感倾向。
                                    文本:%s

                                    只返回JSON格式:
                                    {"sentiment":"POSITIVE/NEUTRAL/NEGATIVE","score":1-10}
                                    其中 score 10 为最正面,1 为最负面。
                                    """.formatted(text))
                                .call()
                                .content();
                        // 解析返回的 JSON(简化处理)
                        Sentiment sentiment = parseSentiment(response);
                        int score = parseScore(response);
                        return new SentimentResult(text, sentiment, score);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new RuntimeException(e);
                    } finally {
                        semaphore.release(); // 成功失败都要释放
                    }
                }, executor))
                .toList();

        // 2. 等待全部完成
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();

        // 3. 收集结果
        return futures.stream()
                .map(f -> {
                    try { return f.get(); }
                    catch (Exception e) { return null; }
                })
                .filter(Objects::nonNull)
                .toList();
    }
}

7.3 Controller

@PostMapping("/sentiment/batch")
public List<SentimentResult> batchSentiment(@RequestBody List<String> texts) {
    // 5 并发处理
    return sentimentAnalysisService.batchAnalyze(texts, 5);
}

请求体:

["产品非常好用,推荐!", "质量很差,退货了", "还行,一般般"]

返回:

[
  {"content": "产品非常好用,推荐!", "sentiment": "POSITIVE", "score": 9},
  {"content": "质量很差,退货了", "sentiment": "NEGATIVE", "score": 2},
  {"content": "还行,一般般", "sentiment": "NEUTRAL", "score": 5}
]

设计要点

  • Semaphore(5) 表示同时最多 5 个请求在跑,防止打爆 API 限流
  • finally 块中必须 release(),否则某个任务异常后信号量不释放,后续任务会永远等待
  • 虚拟线程 + 信号量的组合非常适合 IO 密集型的大模型调用场景——线程创建成本极低,信号量控制并发上限

八、踩坑提醒

8.1 多 Starter 的依赖传递问题

同时引入 OpenAI Starter 和 DashScope Starter 时,可能遇到 ClassNotFoundException。原因是某些 Starter 把部分依赖标记为 optional=true,传递依赖链断裂。典型报错:

java.lang.NoClassDefFoundError: com/fasterxml/jackson/...

解决方案:在自己的 pom.xml 中显式引入缺失的依赖:

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>

8.2 自动装配排除要彻底

如果只排除了一个厂商的 AutoConfiguration,另一个厂商的 Bean 会自动注册但和你手动注册的 Bean 冲突。两个都排除,全部手动注册,是最稳妥的做法。

8.3 CompletableFuture 超时必须设

AI 调用的耗时不可预测。如果不设超时,一个模型卡住会拖垮整个请求:

// 一定要设超时
CompletableFuture.allOf(futures).get(30, TimeUnit.SECONDS);

九、总结

能力 方案 关键技术
多模型共存 排除自动配置 + 手动注册 Bean @SpringBootApplication(exclude=...)
动态路由 Map 存储 + 参数/场景/配置驱动 Map<String, ChatClient>
故障降级 try-catch + 遍历备用模型 Stream filter + findFirst
并行调用 CompletableFuture.allOf supplyAsync + get(timeout)
赛马模式 JDK 21 StructuredTaskScope ShutdownOnSuccess
投票决策 多模型并行 + 统计票数 groupingBy + counting
批量处理 信号量控制并发 Semaphore + 虚拟线程

核心思想就一句话:Spring AI 的 ChatClient 抽象屏蔽了厂商差异,多模型切换就是多 Bean 管理问题,异步并发就是标准 Java 并发编程问题。把这两件事拆开看,复杂度立刻降下来。


本系列后续内容:下一篇将聚焦 Spring AI 生产环境避坑指南与 RAG 向量库实战,包括 Embedding 模型选型、向量数据库集成(Milvus / PGVector)、检索增强生成的完整链路,以及生产环境中的限流、重试、成本监控等工程实践。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐