模型路由降级机制详解:基于断路器模式,实现高可用AI服务
在AI服务开发中,模型调用的稳定性是核心痛点之一——无论是第三方模型接口超时、限流,还是自建模型服务故障,只要单个模型异常,就可能导致整个AI服务中断,直接影响用户体验和业务连续性。为解决这一问题,我们设计了一套模型路由降级机制,通过多模型候选、断路器保护、首包探测等核心设计,实现“单个模型故障不影响整体服务”的高可用目标。
本文将从“为什么需要降级”“整体架构”“核心组件源码解析”“流程图解”“实际场景”五个维度,全方位拆解这套机制,结合完整源码和通俗讲解,让开发者既能理解原理,也能快速落地应用。
一、痛点直击:为什么必须做模型路由降级?
AI服务中,模型调用的不稳定性主要来自三个方面:第三方模型接口波动、网络延迟或中断、模型自身负载过高/故障。如果只依赖单个模型,一旦出现问题,服务将直接不可用——这在生产环境中是无法接受的。
我们对比两种方案,就能清晰看到降级机制的必要性:
┌─────────────────────────────────────────────────────────────┐
│ 方案1:只用一个模型 │
│ ❌ 模型挂了 = 服务挂了 │
│ 场景:依赖单一第三方模型,接口限流/宕机,整个AI服务瘫痪 │
├─────────────────────────────────────────────────────────────┤
│ 方案2:多模型 + 降级(我们的方案) │
│ 模型A → 模型B → 模型C → 全部失败 → 返回错误 │
│ ✅ 一个挂了,自动切换下一个 │
│ 场景:模型A宕机,自动切换到模型B,服务正常提供,用户无感知 │
└─────────────────────────────────────────────────────────────┘
核心目标:通过多模型冗余和智能路由,实现“故障自动切换、服务持续可用”,同时通过断路器模式保护模型,避免无效调用,提升服务性能。
二、整体架构:四大核心组件,构建高可用路由体系
模型路由降级机制的整体架构围绕「路由调度+健康监控+多模型冗余」展开,核心分为4个组件,各组件协同工作,实现智能降级和故障恢复。架构图如下:
┌─────────────────────────────────────────────────────────────────────┐
│ RoutingLLMService │
│ (路由式 LLM 服务) │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────────┐ ┌───────────────────┐ │
│ │ ModelSelector │ ──→ │ ModelRoutingExecutor│ │
│ │ (模型选择器) │ │ (路由执行器) │ │
│ └───────────────┘ └─────────┬─────────┘ │
│ │ │ │
│ │ ↓ │
│ │ ┌───────────────────┐ │
│ │ │ ModelHealthStore │ │
│ │ │ (健康状态存储) │ │
│ │ │ │ │
│ │ │ CLOSED ──────────│────────→ OPEN │
│ │ │ ↑ │ 失败阈值触发 │
│ │ │ │ │ │
│ │ │ └──── HALF_OPEN ←─── 冷却结束 │
│ │ └───────────────────┘ │
│ │ │
│ └───────────────────┬───────────────────────────────────────┘
│ ↓
│ ┌─────────────────────────────────────────────────────────────────┐
│ │ 多个 ChatClient │
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ │ 硅基流动 │ │ 阿里云百炼 │ │ Ollama │ │
│ │ │ SiliconFlow │ │ DashScope │ │ (本地) │ │
│ │ └─────────────┘ └─────────────┘ └─────────────┘ │
│ └─────────────────────────────────────────────────────────────────┘
│ │
└─────────────────────────────────────────────────────────────────────┘
各组件核心职责:
- RoutingLLMService:服务入口,统一接收模型调用请求(同步/流式),协调后续组件执行路由和降级逻辑。
- ModelSelector:模型选择器,根据请求类型(普通/深度思考)和模型优先级、健康状态,筛选出候选模型列表。
- ModelRoutingExecutor:路由执行器,遍历候选模型,执行调用逻辑,处理失败降级,串联整个调用流程。
- ModelHealthStore:健康状态存储,基于断路器模式,维护每个模型的健康状态(正常/熔断/半开),控制模型是否允许被调用。
- ChatClient:多模型客户端,对接不同提供商的模型(硅基流动、阿里云百炼、本地Ollama等),提供统一调用接口。
三、核心配置:一键适配多场景,灵活调整降级规则
机制的灵活性源于可配置化设计,通过YAML配置即可调整模型候选、熔断阈值、冷却时间等核心参数,适配开发、生产等不同场景。完整配置如下,关键参数附带详细解读:
# application.yaml
ai:
# 模型选择策略(断路器核心配置)
selection:
failure-threshold: 2 # 连续失败2次,触发熔断(可根据模型稳定性调整)
open-duration-ms: 30000 # 熔断持续30秒(期间拒绝调用,避免无效重试)
# 聊天模型组(多模型候选配置)
chat:
default-model: qwen-plus # 普通请求默认模型
deep-thinking-model: qwen-max # 深度思考请求(复杂问题)专用模型
# 候选模型列表(按 priority 排序,优先级1最高,依次兜底)
candidates:
- id: siliconflow-qwen
provider: siliconflow
model: Qwen/Qwen2.5-72B-Instruct
url: https://api.siliconflow.cn/v1
priority: 1 # 优先级最高,优先调用
enabled: true # 是否启用该模型
supports-thinking: false # 是否支持深度思考模式(复杂问题)
- id: dashscope-qwen
provider: dashscope
model: qwen-plus
url: https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation
priority: 2 # 备选模型,优先级次之
enabled: true
supports-thinking: true # 支持深度思考,可处理复杂问题
- id: ollama-local
provider: ollama
model: llama3.1
url: http://localhost:11434/api/generate
priority: 3 # 兜底模型(本地部署,无网络依赖)
enabled: true
supports-thinking: false
配置核心要点:
- 优先级配置:
priority数值越小,优先级越高,确保核心模型优先被调用,兜底模型最后生效。 - 熔断配置:
failure-threshold控制触发熔断的失败次数,open-duration-ms控制熔断冷却时间,避免模型故障时反复调用消耗资源。 - 功能适配:
supports-thinking标记模型是否支持深度思考,用于区分普通请求和复杂请求的模型选择。
四、源码解析:逐组件拆解,读懂降级核心逻辑
以下结合完整源码,逐一对核心组件进行解析,重点讲解“模型选择、路由执行、断路器保护、首包探测”四大核心逻辑,让开发者能直接复用代码并理解底层原理。
4.1 服务入口:RoutingLLMService(统一接收请求)
该类是整个路由降级机制的入口,实现了LLMService接口,统一处理同步聊天和流式聊天请求,协调模型选择、路由执行等逻辑。
文件路径:infra-ai/.../chat/RoutingLLMService.java
@Slf4j
@Service
@Primary // 注入优先使用这个实现,覆盖其他LLMService实现
public class RoutingLLMService implements LLMService {
// 注入三个核心组件
private final ModelSelector selector; // 模型选择器
private final ModelHealthStore healthStore; // 健康状态存储(断路器)
private final ModelRoutingExecutor executor; // 路由执行器
private final Map<String, ChatClient> clientsByProvider; // 各提供商客户端(key:provider,value:客户端)
// 构造方法注入依赖
public RoutingLLMService(ModelSelector selector, ModelHealthStore healthStore,
ModelRoutingExecutor executor, Map<String, ChatClient> clientsByProvider) {
this.selector = selector;
this.healthStore = healthStore;
this.executor = executor;
this.clientsByProvider = clientsByProvider;
}
/**
* 同步聊天请求(非流式)
* 核心:调用executor.executeWithFallback,实现失败降级
*/
@Override
@RagTraceNode(name = "llm-chat-routing", type = "LLM_ROUTING")
public String chat(ChatRequest request) {
// 同步调用:使用 executeWithFallback 自动处理降级
return executor.executeWithFallback(
ModelCapability.CHAT, // 能力类型(此处为聊天)
// 选择候选模型:根据请求是否需要深度思考,筛选合适的模型
selector.selectChatCandidates(request.getThinking()),
// 如何获取对应模型的客户端(根据provider匹配)
target -> clientsByProvider.get(target.candidate().getProvider()),
// 如何执行模型调用(调用客户端的chat方法)
(client, target) -> client.chat(request, target)
);
}
/**
* 流式聊天请求(核心:首包探测,避免用户等待过久)
* 核心:遍历候选模型,尝试调用,首包超时/失败则切换下一个
*/
@Override
@RagTraceNode(name = "llm-stream-routing", type = "LLM_ROUTING")
public StreamCancellationHandle streamChat(ChatRequest request, StreamCallback callback) {
// 1. 选择候选模型列表
List<ModelTarget> targets = selector.selectChatCandidates(request.getThinking());
if (CollUtil.isEmpty(targets)) {
throw new RemoteException(STREAM_NO_PROVIDER_MESSAGE);
}
// 2. 遍历候选模型,尝试流式调用
for (ModelTarget target : targets) {
// 获取当前模型的客户端
ChatClient client = resolveClient(target, label);
if (client == null) continue;
// 3. 首包探测:等待首包响应(避免模型卡住,用户长时间等待)
FirstPacketAwaiter awaiter = new FirstPacketAwaiter();
// 缓冲回调:首包成功前,缓冲所有事件,避免失败模型的内容污染输出
ProbeBufferingCallback wrapper = new ProbeBufferingCallback(callback, awaiter);
// 4. 尝试发起流式调用
StreamCancellationHandle handle = client.streamChat(request, wrapper, target);
// 5. 等待首包(60秒超时,可配置)
FirstPacketAwaiter.Result result = awaitFirstPacket(awaiter, handle, callback);
// 6. 判断首包结果
if (result.isSuccess()) {
// 首包成功:标记模型健康,返回调用句柄(后续内容通过回调推送)
healthStore.markSuccess(target.id());
return handle;
}
// 首包失败:标记模型失败,取消当前调用,切换下一个模型
healthStore.markFailure(target.id());
handle.cancel();
}
// 所有模型都失败了,抛出异常
throw notifyAllFailed(callback, lastError);
}
// 辅助方法:解析模型客户端(省略实现)
private ChatClient resolveClient(ModelTarget target, String label) {
// ... 省略:根据target的provider,从clientsByProvider中获取客户端
}
}
核心解读:
- 同步调用:依赖
ModelRoutingExecutor的executeWithFallback方法,自动遍历候选模型,处理失败降级,无需手动干预。 - 流式调用:增加了「首包探测」逻辑——如果60秒内未收到模型的首包响应,就取消当前调用,切换到下一个模型,避免用户长时间等待无响应。
- 缓冲回调:
ProbeBufferingCallback会在首包成功前缓冲所有输出事件,避免失败模型的错误内容推送给用户,保证体验。
4.2 模型选择器:ModelSelector(筛选候选模型)
该类负责根据请求类型(普通/深度思考)和模型配置,筛选出“可用、适配、按优先级排序”的候选模型列表,是路由的核心前提。
文件路径:infra-ai/.../model/ModelSelector.java
@Component
@RequiredArgsConstructor
public class ModelSelector {
private final AIModelProperties properties; // 模型配置(对应application.yaml中的ai配置)
private final ModelHealthStore healthStore; // 模型健康状态,用于过滤不健康模型
/**
* 核心方法:选择聊天模型候选列表
* @param deepThinking 是否需要深度思考(true:复杂问题,false:普通问题)
* @return 按优先级排序的候选模型列表
*/
public List<ModelTarget> selectChatCandidates(Boolean deepThinking) {
AIModelProperties.ModelGroup group = properties.getChat();
if (group == null) {
return List.of();
}
// 1. 解析首选模型(根据是否需要深度思考,选择默认模型或深度思考模型)
String firstChoiceModelId = resolveFirstChoiceModel(group, deepThinking);
// 2. 构建候选列表(过滤、排序、调整首选模型位置)
return selectCandidates(group, firstChoiceModelId, deepThinking);
}
/**
* 辅助方法:根据请求模式,选择首选模型
*/
private String resolveFirstChoiceModel(AIModelProperties.ModelGroup group, Boolean deepThinking) {
// 如果是深度思考模式,且配置了深度思考模型,优先选择该模型
if (Boolean.TRUE.equals(deepThinking)) {
String deepModel = group.getDeepThinkingModel();
if (StrUtil.isNotBlank(deepModel)) {
return deepModel;
}
}
// 否则选择普通默认模型
return group.getDefaultModel();
}
/**
* 核心方法:构建候选列表(过滤、排序、调整首选模型)
*/
private List<ModelTarget> selectCandidates(
AIModelProperties.ModelGroup group,
String firstChoiceModelId,
Boolean deepThinking) {
if (group == null || group.getCandidates() == null) {
return List.of();
}
List<AIModelProperties.ModelCandidate> candidates = group.getCandidates();
// 1. 过滤可用模型(启用状态、支持深度思考模式)
List<AIModelProperties.ModelCandidate> enabledCandidates = candidates.stream()
.filter(c -> c != null && !Boolean.FALSE.equals(c.getEnabled())) // 只保留启用的模型
.filter(c -> !Boolean.TRUE.equals(deepThinking) ||
Boolean.TRUE.equals(c.getSupportsThinking())) // 深度思考模式需筛选支持的模型
.sorted(Comparator
.comparing(AIModelProperties.ModelCandidate::getPriority,
Comparator.nullsLast(Integer::compareTo))) // 按优先级升序排序(1最高)
.collect(Collectors.toCollection(ArrayList::new));
// 2. 把首选模型放到最前面(确保首选模型优先被调用)
if (firstChoiceModelId != null) {
reorderFirstChoice(enabledCandidates, firstChoiceModelId);
}
// 3. 构建最终的ModelTarget列表(封装模型信息,供后续调用)
return buildAvailableTargets(enabledCandidates);
}
/**
* 辅助方法:将首选模型调整到候选列表第一位
*/
private void reorderFirstChoice(List<AIModelProperties.ModelCandidate> candidates, String firstChoiceModelId) {
for (int i = 0; i < candidates.size(); i++) {
AIModelProperties.ModelCandidate candidate = candidates.get(i);
if (firstChoiceModelId.equals(candidate.getId())) {
// 移除当前位置,插入到第一位
candidates.remove(i);
candidates.add(0, candidate);
break;
}
}
}
/**
* 辅助方法:将ModelCandidate转换为ModelTarget(封装模型ID、候选信息等)
*/
private List<ModelTarget> buildAvailableTargets(List<AIModelProperties.ModelCandidate> candidates) {
return candidates.stream()
.map(candidate -> ModelTarget.of(candidate.getId(), candidate))
.collect(Collectors.toList());
}
}
核心解读:
- 筛选逻辑:先过滤“未启用”和“不支持深度思考”的模型,确保候选模型都是适配当前请求的。
- 排序逻辑:按
priority升序排序,优先级1的模型排在最前面,同时将“首选模型”(默认/深度思考模型)调整到第一位,确保核心模型优先被调用。 - 灵活性:通过配置区分普通请求和深度思考请求,适配不同复杂度的业务场景。
4.3 路由执行器:ModelRoutingExecutor(执行降级逻辑)
该类是降级机制的核心执行组件,负责遍历候选模型,检查模型健康状态,执行调用逻辑,处理失败降级,确保“一个模型失败,自动切换下一个”。
文件路径:infra-ai/.../model/ModelRoutingExecutor.java
@Component
@RequiredArgsConstructor
public class ModelRoutingExecutor {
private final ModelHealthStore healthStore; // 模型健康状态存储(断路器)
/**
* 核心方法:执行带降级的模型调用
* @param capability 能力类型(CHAT/EMBEDDING/RERANK等)
* @param targets 候选模型列表(按优先级排序)
* @param clientResolver 如何根据ModelTarget获取对应的ChatClient
* @param caller 如何执行模型调用(封装调用逻辑)
* @return 模型调用结果
*/
public <C, T> T executeWithFallback(
ModelCapability capability,
List<ModelTarget> targets,
Function<ModelTarget, C> clientResolver,
ModelCaller<C, T> caller) {
String label = capability.getDisplayName(); // 能力名称(如"CHAT")
Exception lastException = null; // 记录最后一次失败异常
// 遍历候选模型,依次尝试调用
for (ModelTarget target : targets) {
// 1. 获取当前模型的客户端(如siliconflow的客户端)
C client = clientResolver.apply(target);
if (client == null) {
log.warn("Provider client missing: {}", target.candidate().getProvider());
continue; // 客户端不存在,跳过当前模型,切换下一个
}
// 2. 检查模型健康状态(断路器逻辑:是否允许调用)
if (!healthStore.allowCall(target.id())) {
log.info("Model {} is circuit-broken, skip", target.id());
continue; // 模型处于熔断状态,拒绝调用,切换下一个
}
try {
// 3. 执行模型调用(调用caller的call方法,即客户端的chat方法)
T response = caller.call(client, target);
// 4. 调用成功:标记模型健康,返回结果(结束遍历)
healthStore.markSuccess(target.id());
return response;
} catch (Exception e) {
// 5. 调用失败:标记模型失败,记录异常,继续下一个模型
healthStore.markFailure(target.id());
lastException = e;
log.warn("Model {} failed, fallback to next. Error: {}",
target.id(), e.getMessage());
}
}
// 所有模型都失败了,抛出异常(携带最后一次失败信息)
throw new RemoteException("All " + label + " model candidates failed", lastException);
}
// 函数式接口:定义模型调用逻辑(客户端+目标模型 → 调用结果)
@FunctionalInterface
public interface ModelCaller<C, T> {
T call(C client, ModelTarget target) throws Exception;
}
}
核心解读:
- 遍历逻辑:按候选模型列表的优先级,依次尝试调用,只要有一个模型调用成功,就返回结果,结束遍历。
- 健康检查:调用前通过
healthStore.allowCall检查模型状态,熔断中的模型直接跳过,避免无效调用。 - 失败处理:调用失败后,通过
healthStore.markFailure标记模型失败,累计失败次数,触发熔断逻辑,同时切换到下一个模型。 - 通用性:通过函数式接口
ModelCaller封装调用逻辑,适配不同类型的模型调用(CHAT/EMBEDDING等),通用性强。
4.4 健康状态存储:ModelHealthStore(断路器核心)
该类实现了「断路器模式」,维护每个模型的健康状态(CLOSED/OPEN/HALF_OPEN),控制模型是否允许被调用,避免模型故障时反复调用,保护服务资源。
文件路径:infra-ai/.../model/ModelHealthStore.java
@Component
@RequiredArgsConstructor
public class ModelHealthStore {
private final AIModelProperties properties; // 配置参数(熔断阈值、冷却时间)
// 存储每个模型的健康状态(key:模型id,value:ModelHealth),线程安全
private final Map<String, ModelHealth> healthById = new ConcurrentHashMap<>();
/**
* 核心方法:检查模型是否允许被调用(断路器核心逻辑)
* 三种状态:CLOSED(正常)、OPEN(熔断)、HALF_OPEN(半开,尝试恢复)
*/
public boolean allowCall(String id) {
if (id == null) return false;
long now = System.currentTimeMillis();
final boolean[] allowed = {false}; // 用于存储是否允许调用的结果
// 原子操作:确保多线程环境下状态一致(ConcurrentHashMap的compute方法线程安全)
healthById.compute(id, (k, v) -> {
if (v == null) {
v = new ModelHealth(); // 模型首次调用,初始化健康状态
}
// 状态1:OPEN(熔断中)
if (v.state == State.OPEN) {
if (v.openUntil > now) {
// 还在熔断冷却期,拒绝调用
return v;
}
// 熔断冷却期结束,切换到HALF_OPEN状态(尝试恢复)
v.state = State.HALF_OPEN;
v.halfOpenInFlight = true; // HALF_OPEN状态下,只允许一个请求尝试
allowed[0] = true;
return v;
}
// 状态2:HALF_OPEN(半开,尝试恢复)
if (v.state == State.HALF_OPEN) {
if (v.halfOpenInFlight) {
// 已有一个请求在尝试恢复,拒绝其他请求
return v;
}
v.halfOpenInFlight = true;
allowed[0] = true;
return v;
}
// 状态3:CLOSED(正常),允许调用
allowed[0] = true;
return v;
});
return allowed[0];
}
/**
* 标记模型调用成功:重置健康状态,切换到CLOSED
*/
public void markSuccess(String id) {
healthById.compute(id, (k, v) -> {
if (v == null) {
return new ModelHealth();
}
v.state = State.CLOSED; // 重置为正常状态
v.consecutiveFailures = 0; // 重置连续失败计数
v.halfOpenInFlight = false; // 重置HALF_OPEN状态的请求标记
return v;
});
}
/**
* 标记模型调用失败:累计失败次数,触发熔断(超过阈值则切换到OPEN)
*/
public void markFailure(String id) {
long now = System.currentTimeMillis();
healthById.compute(id, (k, v) -> {
if (v == null) {
v = new ModelHealth();
}
// 如果是HALF_OPEN状态失败,立即重新熔断(恢复失败)
if (v.state == State.HALF_OPEN) {
v.state = State.OPEN;
// 设置熔断结束时间(当前时间 + 冷却时间)
v.openUntil = now + properties.getSelection().getOpenDurationMs();
return v;
}
// 否则,累计连续失败次数
v.consecutiveFailures++;
// 连续失败次数超过阈值,触发熔断(切换到OPEN状态)
if (v.consecutiveFailures >= properties.getSelection().getFailureThreshold()) {
v.state = State.OPEN;
v.openUntil = now + properties.getSelection().getOpenDurationMs();
v.consecutiveFailures = 0; // 重置失败计数
}
return v;
});
}
// 内部类:封装模型的健康状态信息
private static class ModelHealth {
private int consecutiveFailures; // 连续失败次数
private long openUntil; // 熔断结束时间(毫秒)
private boolean halfOpenInFlight; // HALF_OPEN状态下,是否有请求正在尝试恢复
private State state = State.CLOSED; // 初始状态:正常(CLOSED)
}
// 枚举:断路器三种状态
private enum State {
CLOSED, // 正常:允许调用,累计失败次数
OPEN, // 熔断:拒绝调用,冷却一段时间
HALF_OPEN // 半开:冷却结束后,允许一个请求尝试恢复
}
}
核心解读(断路器逻辑):
- CLOSED(正常状态):允许模型调用,每次失败累计失败次数,成功则重置计数;连续失败次数达到
failure-threshold(默认2次),切换到OPEN状态。 - OPEN(熔断状态):拒绝所有调用,持续
open-duration-ms(默认30秒);冷却时间结束后,切换到HALF_OPEN状态,尝试恢复。 - HALF_OPEN(半开状态):只允许一个请求尝试调用模型;调用成功则切换到CLOSED状态(恢复正常),失败则重新切换到OPEN状态(继续熔断)。
- 线程安全:使用
ConcurrentHashMap存储健康状态,compute方法保证原子操作,避免多线程环境下的状态不一致。
4.5 首包探测:FirstPacketAwaiter + ProbeBufferingCallback
针对流式调用场景,为避免用户长时间等待无响应,设计了首包探测机制——等待模型60秒内返回首包,超时则切换模型;同时通过缓冲回调,避免失败模型的内容污染输出。
4.5.1 首包等待器:FirstPacketAwaiter
文件路径:infra-ai/.../chat/FirstPacketAwaiter.java
public class FirstPacketAwaiter {
// 倒计时锁存器:用于等待首包响应(1表示需要等待1个事件)
private final CountDownLatch latch = new CountDownLatch(1);
// 标记是否收到首包内容
private final AtomicBoolean hasContent = new AtomicBoolean(false);
// 标记首包是否出现错误
private final AtomicReference<Throwable> error = new AtomicReference<>();
/**
* 等待首包响应(60秒超时,可配置)
* @return 首包探测结果(成功/超时/错误/无内容)
*/
public Result await(long timeout, TimeUnit unit) throws InterruptedException {
// 等待首包,超时返回false
boolean completed = latch.await(timeout, unit);
// 有错误:返回错误结果
if (error.get() != null) {
return Result.error(error.get());
}
// 超时:返回超时结果
if (!completed) {
return Result.timeout();
}
// 无内容:返回无内容结果
if (!hasContent.get()) {
return Result.noContent();
}
// 成功:返回成功结果
return Result.success();
}
// 标记收到首包内容
public void markContent() {
hasContent.set(true);
latch.countDown(); // 触发锁存器,结束等待
}
// 标记首包出现错误
public void markError(Throwable t) {
error.set(t);
latch.countDown();
}
// 标记首包无内容
public void markNoContent() {
hasContent.set(false);
latch.countDown();
}
// 首包探测结果封装
@Getter
public static class Result {
public enum Type { SUCCESS, ERROR, TIMEOUT, NO_CONTENT }
private final Type type;
private final Throwable error;
// 私有构造,通过静态方法创建实例
private Result(Type type, Throwable error) {
this.type = type;
this.error = error;
}
// 静态工厂方法:成功
public static Result success() {
return new Result(Type.SUCCESS, null);
}
// 静态工厂方法:错误
public static Result error(Throwable error) {
return new Result(Type.ERROR, error);
}
// 静态工厂方法:超时
public static Result timeout() {
return new Result(Type.TIMEOUT, null);
}
// 静态工厂方法:无内容
public static Result noContent() {
return new Result(Type.NO_CONTENT, null);
}
// 判断是否成功
public boolean isSuccess() {
return Type.SUCCESS.equals(this.type);
}
}
}
4.5.2 探测缓冲回调:ProbeBufferingCallback
该类是RoutingLLMService的内部类,用于缓冲首包探测期间的所有输出事件,避免失败模型的内容推送给用户。
// RoutingLLMService 内部类
private static final class ProbeBufferingCallback implements StreamCallback {
private final StreamCallback downstream; // 下游回调(最终推送给用户)
private final FirstPacketAwaiter awaiter; // 首包等待器
private final List<BufferedEvent> bufferedEvents = new ArrayList<>(); // 事件缓冲列表
private volatile boolean committed; // 是否提交缓冲(首包成功后提交)
private final Object lock = new Object(); // 锁,保证线程安全
// 构造方法:传入下游回调和首包等待器
public ProbeBufferingCallback(StreamCallback downstream, FirstPacketAwaiter awaiter) {
this.downstream = downstream;
this.awaiter = awaiter;
}
/**
* 收到模型内容回调
*/
@Override
public void onContent(String content) {
awaiter.markContent(); // 标记收到首包内容
bufferOrDispatch(BufferedEvent.content(content)); // 缓冲或转发事件
}
/**
* 收到模型思考过程回调(如函数调用前的思考)
*/
@Override
public void onThinking(String content) {
bufferOrDispatch(BufferedEvent.thinking(content));
}
/**
* 调用完成回调
*/
@Override
public void onComplete() {
bufferOrDispatch(BufferedEvent.complete());
}
/**
* 调用失败回调
*/
@Override
public void onError(Throwable throwable) {
awaiter.markError(throwable); // 标记首包错误
downstream.onError(throwable); // 直接转发错误(无需缓冲)
}
/**
* 核心方法:缓冲事件或直接转发
* 首包成功前:缓冲;首包成功后:直接转发
*/
private void bufferOrDispatch(BufferedEvent event) {
synchronized (lock) {
if (committed) {
// 首包成功,已提交,直接转发事件给下游
dispatch(event);
} else {
// 首包未成功,缓冲事件
bufferedEvents.add(event);
}
}
}
/**
* 提交缓冲:首包成功后,回放所有缓冲事件
*/
void commit() {
synchronized (lock) {
committed = true;
// 按顺序回放缓冲的事件
for (BufferedEvent event : bufferedEvents) {
dispatch(event);
}
bufferedEvents.clear(); // 清空缓冲
}
}
/**
* 转发事件到下游回调
*/
private void dispatch(BufferedEvent event) {
switch (event.type) {
case CONTENT -> downstream.onContent(event.content);
case THINKING -> downstream.onThinking(event.content);
case COMPLETE -> downstream.onComplete();
}
}
// 内部类:缓冲事件封装
private static class BufferedEvent {
private final Type type;
private final String content;
// 私有构造
private BufferedEvent(Type type, String content) {
this.type = type;
this.content = content;
}
// 静态工厂方法:内容事件
public static BufferedEvent content(String content) {
return new BufferedEvent(Type.CONTENT, content);
}
// 静态工厂方法:思考事件
public static BufferedEvent thinking(String content) {
return new BufferedEvent(Type.THINKING, content);
}
// 静态工厂方法:完成事件
public static BufferedEvent complete() {
return new BufferedEvent(Type.COMPLETE, null);
}
// 事件类型枚举
private enum Type { CONTENT, THINKING, COMPLETE }
}
}
核心解读:
- 首包探测:
FirstPacketAwaiter通过CountDownLatch等待首包事件,60秒超时则返回超时结果,触发模型切换。 - 事件缓冲:
ProbeBufferingCallback在首包成功前,将所有输出事件缓冲起来;首包成功后,再一次性回放所有事件,避免失败模型的内容推送给用户。 - 失败处理:如果首包出现错误,直接转发错误给下游,同时标记模型失败,切换到下一个模型。
五、流程图解:直观理解降级和断路器工作流程
结合以下三张流程图,能更直观地理解整个模型路由降级机制的工作流程,包括同步调用、流式调用和断路器状态切换。
5.1 同步调用降级流程图
用户请求(同步聊天)
↓
RoutingLLMService.chat()
↓
┌─────────────────────────────────────────────────────────────────┐
│ ModelSelector.selectChatCandidates() │
│ → 按 priority 排序:[模型A, 模型B, 模型C] │
└─────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────┐
│ ModelRoutingExecutor.executeWithFallback() │
│ → 遍历候选模型,执行调用+降级逻辑 │
│ │
│ ① 处理模型A: │
│ - healthStore.allowCall(A) → CLOSED → 允许调用 │
│ - 调用clientA.chat(request) │
│ ├─ 成功 → healthStore.markSuccess(A) → 返回结果 ✅ │
│ └─ 失败 → healthStore.markFailure(A) → consecutiveFailures=1 │
│ └─ 继续下一个模型 │
│ │
│ ② 处理模型B: │
│ - healthStore.allowCall(B) → CLOSED → 允许调用 │
│ - 调用clientB.chat(request) │
│ ├─ 成功 → healthStore.markSuccess(B) → 返回结果 ✅ │
│ └─ 失败 → healthStore.markFailure(B) → consecutiveFailures=1 │
│ └─ 继续下一个模型 │
│ │
│ ③ 处理模型C: │
│ - healthStore.allowCall(C) → CLOSED → 允许调用 │
│ - 调用clientC.chat(request) │
│ ├─ 成功 → healthStore.markSuccess(C) → 返回结果 ✅ │
│ └─ 失败 → healthStore.markFailure(C) → consecutiveFailures=1 │
│ └─ 所有模型都失败 → 抛异常 ❌ │
└─────────────────────────────────────────────────────────────────┘
5.2 流式调用+首包探测流程图
用户请求(流式聊天)
↓
RoutingLLMService.streamChat()
↓
ModelSelector.selectChatCandidates() → [模型A, 模型B, 模型C]
↓
for 模型A in [A, B, C]:
│
├─→ ① 调用clientA.streamChat(request) → 发起流式请求
│ │
│ ├─→ ② 创建FirstPacketAwaiter,等待首包(60秒超时)
│ │ │
│ │ ├─→ 收到首包 → ProbeBufferingCallback.commit() → 回放缓冲事件
│ │ │ └─→ healthStore.markSuccess(A) → 返回StreamCancellationHandle ✅
│ │ ├─→ 60秒超时 → 调用handle.cancel() → healthStore.markFailure(A) → 切换模型B
│ │ └─→ 出现错误 → 调用handle.cancel() → healthStore.markFailure(A) → 切换模型B
│ │
│ └─→ 首包成功后,后续内容通过回调推送给用户...
│
├─→ 模型B 执行上述相同逻辑,失败则切换到模型C
└─→ 模型C 失败 → 所有模型都失败 → 抛异常 ❌
5.3 断路器状态机流程图
连续失败 >= 2次(failure-threshold=2)
─────────────────
│ │
↓ │
┌─────────────────────────────────────────┐ │
│ │ │
│ CLOSED │ │
│ (正常状态) │ │
│ │ │
│ ┌───────────────────────────────┐ │ │
│ │ consecutiveFailures: 0 │ │ │
│ │ allowCall(): true │ │ │
│ │ 收到成功 → 重置计数器 │ │ │
│ │ 收到失败 → 计数器+1 │─────┼── 超过阈值,触发熔断
│ └───────────────────────────────┘ │ │
│ │ │
└─────────────────────────────────────────┘ │
↑ │
│ │
│ │
┌─────────────────────────────────────────┐ │
│ │ │
│ OPEN │ │
│ (熔断状态) │ │
│ │ │
│ ┌───────────────────────────────┐ │ │
│ │ openUntil: 30秒后(open-duration-ms=30000)│ │ │
│ │ allowCall(): false (仍在熔断)│ │ │
│ │ │ │ │
│ │ ┌─────────────┐ │ │ │
│ │ │ 30秒后过期 │ │ │ │
│ │ └──────┬──────┘ │ │ │
│ │ ↓ │ │ │
│ └───────────────────────────────┘ │ │
│ │ │ │
└────────────────────┼───────────────────┘ │
│ │
│ openUntil 过期 │
│ │
↓ │
┌─────────────────────────────────────────┐ │
│ │ │
│ HALF_OPEN │ │
│ (半开尝试恢复) │ │
│ │ │
│ ┌───────────────────────────────┐ │ │
│ │ halfOpenInFlight: 1 │ │ │
│ │ allowCall(): true (限1个请求)│ │ │
│ │ │ │ │
│ │ 成功 → CLOSED ✅ │ │ │
│ │ 失败 → OPEN (重新熔断30秒) ❌ │─────┘ │
│ └───────────────────────────────┘ │ │
│ │ │
└─────────────────────────────────────────┘ │
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)