Spring AI 多模型切换与异步并发:一套代码支撑多厂商 + CompletableFuture 并行调用
Spring AI 多模型切换与异步并发:一套代码支撑多厂商 + CompletableFuture 并行调用
本文覆盖 Spring AI 中的多模型配置与动态切换、基于场景的模型路由、故障降级、CompletableFuture 并行调用、JDK 21 虚拟线程赛马模式、多模型投票决策,以及信号量控制的批量情感分析。面向有 Spring Boot 基础、正在用 Spring AI 做大模型应用开发的后端工程师。
一、为什么需要多模型切换
实际项目中,单一模型几乎无法满足所有场景。常见的多模型需求包括:
| 需求 | 说明 |
|---|---|
| 成本控制 | 高频简单任务用便宜模型(DeepSeek-Chat),复杂任务才上高端模型(qwen-max) |
| 故障转移 | 主模型出故障时自动切换到备用模型,保证业务不中断 |
| 场景路由 | 客服场景用温度低的千问,代码审查场景用推理强的 DeepSeek |
| 质量对冲 | 多个模型同时回答同一问题,投票表决,提升结果可信度 |
Spring AI 的抽象层把这件事做得非常优雅——ChatClient 和 ChatModel 是统一接口,底层换厂商只需要换 Starter 依赖和配置,业务代码一行不改。
二、多模型共存:配置与 Bean 注册
2.1 引入多厂商 Starter
以 DeepSeek(兼容 OpenAI 协议)+ 通义千问(DashScope)为例,pom.xml 需要同时引入两个 Starter:
<!-- DeepSeek(走 OpenAI 兼容协议) -->
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-starter-model-openai</artifactId>
</dependency>
<!-- 通义千问(阿里 DashScope) -->
<dependency>
<groupId>com.alibaba.cloud.ai</groupId>
<artifactId>spring-ai-alibaba-starter-dashscope</artifactId>
<version>1.1.2.0</version>
</dependency>
2.2 排除自动装配冲突
两个 Starter 都会自动注册 ChatModel Bean,直接启动会报 Bean 冲突。解决方式是在启动类上用 exclude 排除两家的自动配置,然后自己手动注册:
@SpringBootApplication(exclude = {
// 排除 OpenAI 自动配置
org.springframework.ai.autoconfigure.openai.OpenAiAutoConfiguration.class,
// 排除 DashScope 自动配置
com.alibaba.cloud.ai.autoconfigure.dashscope.DashScopeAutoConfiguration.class
})
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
2.3 配置文件:双模型并存
spring:
ai:
openai:
base-url: https://api.deepseek.com
api-key: ${DEEPSEEK_API_KEY}
chat:
options:
model: deepseek-chat
temperature: 0.7
max-tokens: 2048
dashscope:
api-key: ${DASHSCOPE_API_KEY}
chat:
options:
model: qwen-max
temperature: 0.7
max-tokens: 2048
2.4 手动注册多个 ChatClient
排除自动配置后,通过 @Configuration 手动注册不同厂商的 ChatClient,用 @Bean 名称区分:
@Configuration
public class MultiModelConfig {
/** 主力模型:DeepSeek,便宜,适合高频调用 */
@Bean("primaryChatClient")
public ChatClient primaryChatClient(OpenAiChatModel openAiChatModel) {
return ChatClient.builder(openAiChatModel)
.defaultSystem("你是一个专业的技术助手")
.build();
}
/** 备用模型:通义千问,质量高,用于降级或高质量任务 */
@Bean("backupChatClient")
public ChatClient backupChatClient(DashScopeChatModel dashScopeChatModel) {
return ChatClient.builder(dashScopeChatModel)
.defaultSystem("你是一个专业的技术助手")
.build();
}
}
在 Controller 中通过 @Qualifier 注入指定的 Client:
@RestController
@RequestMapping("/multi-chat")
public class MultiChatController {
private final ChatClient customerServiceChatClient;
private final ChatClient codingChatClient;
public MultiChatController(
@Qualifier("customerServiceChatClient") ChatClient customerServiceChatClient,
@Qualifier("codingChatClient") ChatClient codingChatClient) {
this.customerServiceChatClient = customerServiceChatClient;
this.codingChatClient = codingChatClient;
}
@GetMapping("/service")
public String service(@RequestParam String message) {
return customerServiceChatClient.prompt()
.user(message)
.call()
.content();
}
@GetMapping("/code")
public String code(@RequestParam String message) {
return codingChatClient.prompt()
.user(message)
.call()
.content();
}
}
三、动态模型路由:三种实战方案
3.1 方案一:参数驱动——Map 存储 + 运行时选择
最直接的方式:把所有 ChatModel 放进一个 Map,请求时通过参数选择。
@RestController
@RequestMapping("/api/dynamic")
public class DynamicModelController {
private final Map<String, ChatModel> modelMap;
private final ChatModel defaultModel;
public DynamicModelController(
OpenAiChatModel openAiChatModel,
DashScopeChatModel dashScopeChatModel) {
this.modelMap = Map.of(
"deepseek", openAiChatModel,
"qianwen", dashScopeChatModel
);
this.defaultModel = openAiChatModel; // 默认 DeepSeek
}
@GetMapping("/chat")
public String chat(
@RequestParam String question,
@RequestParam(defaultValue = "deepseek") String provider) {
ChatModel model = modelMap.getOrDefault(provider, defaultModel);
return ChatClient.create(model)
.prompt()
.user(question)
.call()
.content();
}
}
调用示例:
- /api/dynamic/chat?question=你好&provider=deepseek —— 走 DeepSeek
- /api/dynamic/chat?question=你好&provider=qianwen —— 走通义千问
3.2 方案二:场景路由——按业务场景映射模型
不把模型名暴露给调用方,而是暴露业务场景,内部做映射:
@Service
public class ModelRouterService {
private final Map<String, ChatClient> clientMap;
public ModelRouterService(
@Qualifier("primaryChatClient") ChatClient primary,
@Qualifier("backupChatClient") ChatClient backup) {
this.clientMap = Map.of(
"primary", primary,
"backup", backup
);
}
/** 按业务场景选择模型 */
public ChatClient routeByScene(String scene) {
return switch (scene) {
case "code_review" -> clientMap.get("primary"); // 代码审查 → DeepSeek
case "customer_service" -> clientMap.get("backup"); // 客服 → 千问
case "translation" -> clientMap.get("backup"); // 翻译 → 千问
default -> clientMap.get("primary");
};
}
}
调用方只需传 scene=customer_service,不需要关心背后是哪个模型。模型切换时,只改这一处映射表即可。
3.3 方案三:配置驱动——自定义配置类 + 全局默认模型
把模型映射关系完全放到配置文件里,改配置不改代码:
app:
ai:
default-provider: deepseek
providers:
deepseek:
model: deepseek-chat
temperature: 0.7
qianwen:
model: qwen-max
temperature: 0.5
对应的配置类:
@Component
@ConfigurationProperties(prefix = "app.ai")
public class AiProperties {
private String defaultProvider;
private Map<String, ProviderConfig> providers;
// getter / setter 省略
public static class ProviderConfig {
private String model;
private Double temperature;
// getter / setter 省略
}
}
路由服务读取配置,动态构建 Client 参数:
@Service
public class ConfigDrivenRouterService {
private final Map<String, ChatClient> clientMap;
private final ChatClient defaultClient;
private final AiProperties properties;
public ConfigDrivenRouterService(
@Qualifier("primaryChatClient") ChatClient primary,
@Qualifier("backupChatClient") ChatClient backup,
AiProperties properties) {
this.clientMap = Map.of("deepseek", primary, "qianwen", backup);
this.properties = properties;
this.defaultClient = clientMap.get(properties.getDefaultProvider());
}
public String chat(String provider, String question) {
ChatClient client = clientMap.getOrDefault(provider, defaultClient);
// 读取该 provider 的自定义配置
AiProperties.ProviderConfig config = properties.getProviders().get(provider);
ChatClient.CallPromptResponseSpec spec = client.prompt().user(question);
// 如果配置了自定义参数,覆盖默认 options
if (config != null && config.getTemperature() != null) {
spec = (ChatClient.CallPromptResponseSpec) client.prompt()
.user(question)
.options(ChatOptionsBuilder.builder()
.temperature(config.getTemperature())
.build());
}
return client.prompt().user(question).call().content();
}
}
想全局切换默认模型?改一行配置重启即可:default-provider: qianwen。
四、故障降级:主模型挂了自动切备用
多模型最核心的生产价值之一就是故障转移。思路很简单:主模型调用失败,从剩余模型中选一个继续。
@Service
public class FallbackService {
private final Map<String, ChatClient> clientMap;
private final String defaultProvider;
public FallbackService(
@Qualifier("primaryChatClient") ChatClient primary,
@Qualifier("backupChatClient") ChatClient backup,
AiProperties properties) {
this.clientMap = new LinkedHashMap<>();
this.clientMap.put("deepseek", primary);
this.clientMap.put("qianwen", backup);
this.defaultProvider = properties.getDefaultProvider();
}
public String chatWithFallback(String provider, String question) {
String targetProvider = provider != null ? provider : defaultProvider;
ChatClient primaryClient = clientMap.getOrDefault(targetProvider,
clientMap.get(defaultProvider));
try {
// 尝试主模型
return primaryClient.prompt().user(question).call().content();
} catch (Exception e) {
log.warn("主模型 [{}] 调用失败,开始降级: {}", targetProvider, e.getMessage());
// 从剩余模型中选一个
return clientMap.entrySet().stream()
.filter(entry -> !entry.getKey().equals(targetProvider))
.findFirst()
.map(entry -> {
log.info("降级到模型: {}", entry.getKey());
return entry.getValue().prompt()
.user(question).call().content();
})
.orElseThrow(() -> new RuntimeException("所有模型均不可用"));
}
}
}
关键设计:
- 用 LinkedHashMap 保持模型顺序,方便按优先级遍历
- filter 排除已失败的模型,从剩余模型中选第一个
- 如果所有模型都不可用,抛出明确异常,上层可以做熔断处理
五、异步并发调用:告别串行等待
5.1 串行调用的代价
AI 模型的单次调用通常需要 1~10 秒。如果业务需要同时调两个模型:
串行:DeepSeek(3s) + 千问(2s) = 5s
并行:max(DeepSeek(3s), 千问(2s)) = 3s
省了 40% 的等待时间。模型越多,并行的收益越大。
5.2 CompletableFuture 并行调用
最经典的 JDK 8+ 方案,两个请求同时发出,以最慢的那个为准:
@Service
public class ParallelCallService {
private final ChatClient deepSeekClient;
private final ChatClient qianWenClient;
public ParallelCallService(
@Qualifier("primaryChatClient") ChatClient deepSeekClient,
@Qualifier("backupChatClient") ChatClient qianWenClient) {
this.deepSeekClient = deepSeekClient;
this.qianWenClient = qianWenClient;
}
public Map<String, String> parallelCall(String question) throws Exception {
CompletableFuture<String> deepSeekFuture = CompletableFuture.supplyAsync(() ->
deepSeekClient.prompt().user(question).call().content()
);
CompletableFuture<String> qianWenFuture = CompletableFuture.supplyAsync(() ->
qianWenClient.prompt().user(question).call().content()
);
// 等待所有完成,超时 30 秒
CompletableFuture.allOf(deepSeekFuture, qianWenFuture)
.get(30, TimeUnit.SECONDS);
return Map.of(
"deepseek", deepSeekFuture.get(),
"qianwen", qianWenFuture.get()
);
}
}
CompletableFuture.allOf() 会等所有 Future 都完成,再统一取结果。超时可以自己设定,避免某个模型卡死拖垮整个请求。
5.3 JDK 21 虚拟线程 + 赛马模式
项目用的 JDK 21,可以开启虚拟线程获得更轻量的并发能力。先在配置里开启:
spring:
threads:
virtual:
enabled: true
再利用 StructuredTaskScope 实现赛马模式——谁先返回用谁的结果,其余自动取消:
@Service
public class VirtualThreadService {
private final ChatClient deepSeekClient;
private final ChatClient qianWenClient;
private final ExecutorService executor =
Executors.newVirtualThreadPerTaskExecutor();
/** 赛马模式:最快的模型返回后立即响应,其余取消 */
public String raceCall(String question) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
scope.fork(() -> deepSeekClient.prompt()
.user(question).call().content());
scope.fork(() -> qianWenClient.prompt()
.user(question).call().content());
scope.join(); // 等待第一个成功
return scope.result(); // 返回最快的结果
}
}
/** 全量模式:所有模型都跑完,收集全部结果 */
public Map<String, String> allCall(String question) throws Exception {
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(
() -> deepSeekClient.prompt().user(question).call().content(),
executor);
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(
() -> qianWenClient.prompt().user(question).call().content(),
executor);
CompletableFuture.allOf(f1, f2).get(30, TimeUnit.SECONDS);
return Map.of("deepseek", f1.get(), "qianwen", f2.get());
}
}
赛马模式 vs 全量模式:
| 模式 | 适用场景 | 耗时 |
|---|---|---|
| 赛马(ShutdownOnSuccess) | 只要一个结果,追求最快响应 | 最快模型的耗时 |
| 全量(allOf) | 需要多个模型的结果做对比或投票 | 最慢模型的耗时 |
六、实战:多模型投票决策
有些判断类任务(合同是否合规、代码是否有安全漏洞、商品是否符合售后条件),单个模型的回答不够可靠。让多个模型各自独立回答,然后统计票数,用多数决的方式提升结果置信度。
6.1 投票服务
@Service
public class VoteService {
private final List<ChatClient> clients;
private final ExecutorService executor =
Executors.newVirtualThreadPerTaskExecutor();
/** 投票结果 */
public record VoteResult(String answer, int votes, double confidence) {}
public VoteService(
@Qualifier("primaryChatClient") ChatClient primary,
@Qualifier("backupChatClient") ChatClient backup) {
this.clients = List.of(primary, backup);
}
public VoteResult vote(String question) {
// 1. 所有模型并行回答,要求只回复 YES 或 NO
String prompt = question + "\n请只回答 YES 或 NO,不要解释。";
List<CompletableFuture<String>> futures = clients.stream()
.map(client -> CompletableFuture.supplyAsync(
() -> client.prompt().user(prompt).call().content(),
executor))
.toList();
// 2. 等待所有完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
// 3. 收集结果,过滤异常
List<String> answers = futures.stream()
.map(f -> {
try { return f.get().trim().toUpperCase(); }
catch (Exception e) { return null; }
})
.filter(Objects::nonNull)
.toList();
// 4. 统计票数
Map<String, Long> voteCounts = answers.stream()
.collect(Collectors.groupingBy(a -> a, Collectors.counting()));
// 5. 取票数最多的答案
Map.Entry<String, Long> winner = voteCounts.entrySet().stream()
.max(Map.Entry.comparingByValue())
.orElseThrow();
double confidence = (double) winner.getValue() / clients.size();
return new VoteResult(winner.getKey(), winner.getValue().intValue(), confidence);
}
}
6.2 调用示例
@GetMapping("/vote")
public VoteService.VoteResult vote(@RequestParam String question) {
return voteService.vote(question);
}
请求:/api/vote?question=这段代码 SELECT * FROM users WHERE id = ' + input 是否有SQL注入风险?
返回:
{
"answer": "YES",
"votes": 2,
"confidence": 1.0
}
两个模型都认为有注入风险,置信度 100%。如果一个说 YES 一个说 NO,置信度就是 50%——这时应该走人工复核。一般来说,设定 75%~80% 以上的置信度为可信阈值是比较合理的做法。
七、实战:信号量控制的批量情感分析
假设有 100 条用户评论需要做情感分析,全部一起丢给模型会瞬间打爆 API 限流。用 Semaphore 控制并发数,既保证效率又不触发限流。
7.1 数据模型
public enum Sentiment { POSITIVE, NEUTRAL, NEGATIVE }
public record SentimentResult(String content, Sentiment sentiment, int score) {}
7.2 批量分析服务
@Service
public class SentimentAnalysisService {
private final ChatClient chatClient;
public SentimentAnalysisService(
@Qualifier("primaryChatClient") ChatClient chatClient) {
this.chatClient = chatClient;
}
public List<SentimentResult> batchAnalyze(List<String> texts, int concurrency) {
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
Semaphore semaphore = new Semaphore(concurrency);
// 1. 为每条文本创建一个异步任务
List<CompletableFuture<SentimentResult>> futures = texts.stream()
.map(text -> CompletableFuture.supplyAsync(() -> {
try {
semaphore.acquire(); // 获取令牌
String response = chatClient.prompt()
.user("""
分析以下文本的情感倾向。
文本:%s
只返回JSON格式:
{"sentiment":"POSITIVE/NEUTRAL/NEGATIVE","score":1-10}
其中 score 10 为最正面,1 为最负面。
""".formatted(text))
.call()
.content();
// 解析返回的 JSON(简化处理)
Sentiment sentiment = parseSentiment(response);
int score = parseScore(response);
return new SentimentResult(text, sentiment, score);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} finally {
semaphore.release(); // 成功失败都要释放
}
}, executor))
.toList();
// 2. 等待全部完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
// 3. 收集结果
return futures.stream()
.map(f -> {
try { return f.get(); }
catch (Exception e) { return null; }
})
.filter(Objects::nonNull)
.toList();
}
}
7.3 Controller
@PostMapping("/sentiment/batch")
public List<SentimentResult> batchSentiment(@RequestBody List<String> texts) {
// 5 并发处理
return sentimentAnalysisService.batchAnalyze(texts, 5);
}
请求体:
["产品非常好用,推荐!", "质量很差,退货了", "还行,一般般"]
返回:
[
{"content": "产品非常好用,推荐!", "sentiment": "POSITIVE", "score": 9},
{"content": "质量很差,退货了", "sentiment": "NEGATIVE", "score": 2},
{"content": "还行,一般般", "sentiment": "NEUTRAL", "score": 5}
]
设计要点:
Semaphore(5)表示同时最多 5 个请求在跑,防止打爆 API 限流finally块中必须release(),否则某个任务异常后信号量不释放,后续任务会永远等待- 虚拟线程 + 信号量的组合非常适合 IO 密集型的大模型调用场景——线程创建成本极低,信号量控制并发上限
八、踩坑提醒
8.1 多 Starter 的依赖传递问题
同时引入 OpenAI Starter 和 DashScope Starter 时,可能遇到 ClassNotFoundException。原因是某些 Starter 把部分依赖标记为 optional=true,传递依赖链断裂。典型报错:
java.lang.NoClassDefFoundError: com/fasterxml/jackson/...
解决方案:在自己的 pom.xml 中显式引入缺失的依赖:
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
8.2 自动装配排除要彻底
如果只排除了一个厂商的 AutoConfiguration,另一个厂商的 Bean 会自动注册但和你手动注册的 Bean 冲突。两个都排除,全部手动注册,是最稳妥的做法。
8.3 CompletableFuture 超时必须设
AI 调用的耗时不可预测。如果不设超时,一个模型卡住会拖垮整个请求:
// 一定要设超时
CompletableFuture.allOf(futures).get(30, TimeUnit.SECONDS);
九、总结
| 能力 | 方案 | 关键技术 |
|---|---|---|
| 多模型共存 | 排除自动配置 + 手动注册 Bean | @SpringBootApplication(exclude=...) |
| 动态路由 | Map 存储 + 参数/场景/配置驱动 | Map<String, ChatClient> |
| 故障降级 | try-catch + 遍历备用模型 | Stream filter + findFirst |
| 并行调用 | CompletableFuture.allOf | supplyAsync + get(timeout) |
| 赛马模式 | JDK 21 StructuredTaskScope | ShutdownOnSuccess |
| 投票决策 | 多模型并行 + 统计票数 | groupingBy + counting |
| 批量处理 | 信号量控制并发 | Semaphore + 虚拟线程 |
核心思想就一句话:Spring AI 的 ChatClient 抽象屏蔽了厂商差异,多模型切换就是多 Bean 管理问题,异步并发就是标准 Java 并发编程问题。把这两件事拆开看,复杂度立刻降下来。
本系列后续内容:下一篇将聚焦 Spring AI 生产环境避坑指南与 RAG 向量库实战,包括 Embedding 模型选型、向量数据库集成(Milvus / PGVector)、检索增强生成的完整链路,以及生产环境中的限流、重试、成本监控等工程实践。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)