在AI服务开发中,模型调用的稳定性是核心痛点之一——无论是第三方模型接口超时、限流,还是自建模型服务故障,只要单个模型异常,就可能导致整个AI服务中断,直接影响用户体验和业务连续性。为解决这一问题,我们设计了一套模型路由降级机制,通过多模型候选、断路器保护、首包探测等核心设计,实现“单个模型故障不影响整体服务”的高可用目标。

本文将从“为什么需要降级”“整体架构”“核心组件源码解析”“流程图解”“实际场景”五个维度,全方位拆解这套机制,结合完整源码和通俗讲解,让开发者既能理解原理,也能快速落地应用。

一、痛点直击:为什么必须做模型路由降级?

AI服务中,模型调用的不稳定性主要来自三个方面:第三方模型接口波动、网络延迟或中断、模型自身负载过高/故障。如果只依赖单个模型,一旦出现问题,服务将直接不可用——这在生产环境中是无法接受的。

我们对比两种方案,就能清晰看到降级机制的必要性:

┌─────────────────────────────────────────────────────────────┐
│  方案1:只用一个模型                                       │
│  ❌ 模型挂了 = 服务挂了                                     │
│  场景:依赖单一第三方模型,接口限流/宕机,整个AI服务瘫痪     │
├─────────────────────────────────────────────────────────────┤
│  方案2:多模型 + 降级(我们的方案)                        │
│  模型A → 模型B → 模型C → 全部失败 → 返回错误              │
│  ✅ 一个挂了,自动切换下一个                               │
│  场景:模型A宕机,自动切换到模型B,服务正常提供,用户无感知 │
└─────────────────────────────────────────────────────────────┘

核心目标:通过多模型冗余和智能路由,实现“故障自动切换、服务持续可用”,同时通过断路器模式保护模型,避免无效调用,提升服务性能。

二、整体架构:四大核心组件,构建高可用路由体系

模型路由降级机制的整体架构围绕「路由调度+健康监控+多模型冗余」展开,核心分为4个组件,各组件协同工作,实现智能降级和故障恢复。架构图如下:

┌─────────────────────────────────────────────────────────────────────┐
│                        RoutingLLMService                              │
│                        (路由式 LLM 服务)                               │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│   ┌───────────────┐     ┌───────────────────┐                      │
│   │ ModelSelector  │ ──→ │ ModelRoutingExecutor│                      │
│   │  (模型选择器)  │     │  (路由执行器)      │                      │
│   └───────────────┘     └─────────┬─────────┘                      │
│           │                         │                                │
│           │                         ↓                                │
│           │               ┌───────────────────┐                        │
│           │               │  ModelHealthStore │                        │
│           │               │   (健康状态存储)   │                        │
│           │               │                   │                        │
│           │               │  CLOSED ──────────│────────→ OPEN          │
│           │               │     ↑              │    失败阈值触发        │
│           │               │     │              │                        │
│           │               │     └──── HALF_OPEN ←─── 冷却结束          │
│           │               └───────────────────┘                        │
│           │                                                           │
│           └───────────────────┬───────────────────────────────────────┘
│                               ↓
│   ┌─────────────────────────────────────────────────────────────────┐
│   │                     多个 ChatClient                              │
│   │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐            │
│   │  │ 硅基流动     │  │ 阿里云百炼   │  │  Ollama     │            │
│   │  │ SiliconFlow │  │  DashScope  │  │  (本地)     │            │
│   │  └─────────────┘  └─────────────┘  └─────────────┘            │
│   └─────────────────────────────────────────────────────────────────┘
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

各组件核心职责:

  • RoutingLLMService:服务入口,统一接收模型调用请求(同步/流式),协调后续组件执行路由和降级逻辑。
  • ModelSelector:模型选择器,根据请求类型(普通/深度思考)和模型优先级、健康状态,筛选出候选模型列表。
  • ModelRoutingExecutor:路由执行器,遍历候选模型,执行调用逻辑,处理失败降级,串联整个调用流程。
  • ModelHealthStore:健康状态存储,基于断路器模式,维护每个模型的健康状态(正常/熔断/半开),控制模型是否允许被调用。
  • ChatClient:多模型客户端,对接不同提供商的模型(硅基流动、阿里云百炼、本地Ollama等),提供统一调用接口。

三、核心配置:一键适配多场景,灵活调整降级规则

机制的灵活性源于可配置化设计,通过YAML配置即可调整模型候选、熔断阈值、冷却时间等核心参数,适配开发、生产等不同场景。完整配置如下,关键参数附带详细解读:

# application.yaml
ai:
  # 模型选择策略(断路器核心配置)
  selection:
    failure-threshold: 2      # 连续失败2次,触发熔断(可根据模型稳定性调整)
    open-duration-ms: 30000   # 熔断持续30秒(期间拒绝调用,避免无效重试)
  
  # 聊天模型组(多模型候选配置)
  chat:
    default-model: qwen-plus      # 普通请求默认模型
    deep-thinking-model: qwen-max # 深度思考请求(复杂问题)专用模型
    
    # 候选模型列表(按 priority 排序,优先级1最高,依次兜底)
    candidates:
      - id: siliconflow-qwen
        provider: siliconflow
        model: Qwen/Qwen2.5-72B-Instruct
        url: https://api.siliconflow.cn/v1
        priority: 1          # 优先级最高,优先调用
        enabled: true        # 是否启用该模型
        supports-thinking: false # 是否支持深度思考模式(复杂问题)
        
      - id: dashscope-qwen
        provider: dashscope
        model: qwen-plus
        url: https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation
        priority: 2          # 备选模型,优先级次之
        enabled: true
        supports-thinking: true # 支持深度思考,可处理复杂问题
        
      - id: ollama-local
        provider: ollama
        model: llama3.1
        url: http://localhost:11434/api/generate
        priority: 3          # 兜底模型(本地部署,无网络依赖)
        enabled: true
        supports-thinking: false

配置核心要点:

  • 优先级配置:priority 数值越小,优先级越高,确保核心模型优先被调用,兜底模型最后生效。
  • 熔断配置:failure-threshold 控制触发熔断的失败次数,open-duration-ms 控制熔断冷却时间,避免模型故障时反复调用消耗资源。
  • 功能适配:supports-thinking 标记模型是否支持深度思考,用于区分普通请求和复杂请求的模型选择。

四、源码解析:逐组件拆解,读懂降级核心逻辑

以下结合完整源码,逐一对核心组件进行解析,重点讲解“模型选择、路由执行、断路器保护、首包探测”四大核心逻辑,让开发者能直接复用代码并理解底层原理。

4.1 服务入口:RoutingLLMService(统一接收请求)

该类是整个路由降级机制的入口,实现了LLMService接口,统一处理同步聊天和流式聊天请求,协调模型选择、路由执行等逻辑。

文件路径infra-ai/.../chat/RoutingLLMService.java

@Slf4j
@Service
@Primary  // 注入优先使用这个实现,覆盖其他LLMService实现
public class RoutingLLMService implements LLMService {

    // 注入三个核心组件
    private final ModelSelector selector;           // 模型选择器
    private final ModelHealthStore healthStore;     // 健康状态存储(断路器)
    private final ModelRoutingExecutor executor;     // 路由执行器
    private final Map<String, ChatClient> clientsByProvider;  // 各提供商客户端(key:provider,value:客户端)

    // 构造方法注入依赖
    public RoutingLLMService(ModelSelector selector, ModelHealthStore healthStore, 
                             ModelRoutingExecutor executor, Map<String, ChatClient> clientsByProvider) {
        this.selector = selector;
        this.healthStore = healthStore;
        this.executor = executor;
        this.clientsByProvider = clientsByProvider;
    }

    /**
     * 同步聊天请求(非流式)
     * 核心:调用executor.executeWithFallback,实现失败降级
     */
    @Override
    @RagTraceNode(name = "llm-chat-routing", type = "LLM_ROUTING")
    public String chat(ChatRequest request) {
        // 同步调用:使用 executeWithFallback 自动处理降级
        return executor.executeWithFallback(
                ModelCapability.CHAT,  // 能力类型(此处为聊天)
                // 选择候选模型:根据请求是否需要深度思考,筛选合适的模型
                selector.selectChatCandidates(request.getThinking()),  
                // 如何获取对应模型的客户端(根据provider匹配)
                target -> clientsByProvider.get(target.candidate().getProvider()),  
                // 如何执行模型调用(调用客户端的chat方法)
                (client, target) -> client.chat(request, target)      
        );
    }

    /**
     * 流式聊天请求(核心:首包探测,避免用户等待过久)
     * 核心:遍历候选模型,尝试调用,首包超时/失败则切换下一个
     */
    @Override
    @RagTraceNode(name = "llm-stream-routing", type = "LLM_ROUTING")
    public StreamCancellationHandle streamChat(ChatRequest request, StreamCallback callback) {
        // 1. 选择候选模型列表
        List<ModelTarget> targets = selector.selectChatCandidates(request.getThinking());
        if (CollUtil.isEmpty(targets)) {
            throw new RemoteException(STREAM_NO_PROVIDER_MESSAGE);
        }

        // 2. 遍历候选模型,尝试流式调用
        for (ModelTarget target : targets) {
            // 获取当前模型的客户端
            ChatClient client = resolveClient(target, label);
            if (client == null) continue;

            // 3. 首包探测:等待首包响应(避免模型卡住,用户长时间等待)
            FirstPacketAwaiter awaiter = new FirstPacketAwaiter();
            // 缓冲回调:首包成功前,缓冲所有事件,避免失败模型的内容污染输出
            ProbeBufferingCallback wrapper = new ProbeBufferingCallback(callback, awaiter);

            // 4. 尝试发起流式调用
            StreamCancellationHandle handle = client.streamChat(request, wrapper, target);
            
            // 5. 等待首包(60秒超时,可配置)
            FirstPacketAwaiter.Result result = awaitFirstPacket(awaiter, handle, callback);

            // 6. 判断首包结果
            if (result.isSuccess()) {
                // 首包成功:标记模型健康,返回调用句柄(后续内容通过回调推送)
                healthStore.markSuccess(target.id());
                return handle;  
            }

            // 首包失败:标记模型失败,取消当前调用,切换下一个模型
            healthStore.markFailure(target.id());
            handle.cancel();
        }

        // 所有模型都失败了,抛出异常
        throw notifyAllFailed(callback, lastError);
    }

    // 辅助方法:解析模型客户端(省略实现)
    private ChatClient resolveClient(ModelTarget target, String label) {
        // ... 省略:根据target的provider,从clientsByProvider中获取客户端
    }
}

核心解读:

  • 同步调用:依赖ModelRoutingExecutorexecuteWithFallback方法,自动遍历候选模型,处理失败降级,无需手动干预。
  • 流式调用:增加了「首包探测」逻辑——如果60秒内未收到模型的首包响应,就取消当前调用,切换到下一个模型,避免用户长时间等待无响应。
  • 缓冲回调:ProbeBufferingCallback会在首包成功前缓冲所有输出事件,避免失败模型的错误内容推送给用户,保证体验。

4.2 模型选择器:ModelSelector(筛选候选模型)

该类负责根据请求类型(普通/深度思考)和模型配置,筛选出“可用、适配、按优先级排序”的候选模型列表,是路由的核心前提。

文件路径infra-ai/.../model/ModelSelector.java

@Component
@RequiredArgsConstructor
public class ModelSelector {

    private final AIModelProperties properties;  // 模型配置(对应application.yaml中的ai配置)
    private final ModelHealthStore healthStore;  // 模型健康状态,用于过滤不健康模型

    /**
     * 核心方法:选择聊天模型候选列表
     * @param deepThinking 是否需要深度思考(true:复杂问题,false:普通问题)
     * @return 按优先级排序的候选模型列表
     */
    public List<ModelTarget> selectChatCandidates(Boolean deepThinking) {
        AIModelProperties.ModelGroup group = properties.getChat();
        if (group == null) {
            return List.of();
        }

        // 1. 解析首选模型(根据是否需要深度思考,选择默认模型或深度思考模型)
        String firstChoiceModelId = resolveFirstChoiceModel(group, deepThinking);
        
        // 2. 构建候选列表(过滤、排序、调整首选模型位置)
        return selectCandidates(group, firstChoiceModelId, deepThinking);
    }

    /**
     * 辅助方法:根据请求模式,选择首选模型
     */
    private String resolveFirstChoiceModel(AIModelProperties.ModelGroup group, Boolean deepThinking) {
        // 如果是深度思考模式,且配置了深度思考模型,优先选择该模型
        if (Boolean.TRUE.equals(deepThinking)) {
            String deepModel = group.getDeepThinkingModel();
            if (StrUtil.isNotBlank(deepModel)) {
                return deepModel;
            }
        }
        // 否则选择普通默认模型
        return group.getDefaultModel();
    }

    /**
     * 核心方法:构建候选列表(过滤、排序、调整首选模型)
     */
    private List<ModelTarget> selectCandidates(
            AIModelProperties.ModelGroup group, 
            String firstChoiceModelId, 
            Boolean deepThinking) {
        
        if (group == null || group.getCandidates() == null) {
            return List.of();
        }

        List<AIModelProperties.ModelCandidate> candidates = group.getCandidates();

        // 1. 过滤可用模型(启用状态、支持深度思考模式)
        List<AIModelProperties.ModelCandidate> enabledCandidates = candidates.stream()
                .filter(c -> c != null && !Boolean.FALSE.equals(c.getEnabled()))  // 只保留启用的模型
                .filter(c -> !Boolean.TRUE.equals(deepThinking) || 
                            Boolean.TRUE.equals(c.getSupportsThinking()))  // 深度思考模式需筛选支持的模型
                .sorted(Comparator
                        .comparing(AIModelProperties.ModelCandidate::getPriority,
                                Comparator.nullsLast(Integer::compareTo)))  // 按优先级升序排序(1最高)
                .collect(Collectors.toCollection(ArrayList::new));

        // 2. 把首选模型放到最前面(确保首选模型优先被调用)
        if (firstChoiceModelId != null) {
            reorderFirstChoice(enabledCandidates, firstChoiceModelId);
        }

        // 3. 构建最终的ModelTarget列表(封装模型信息,供后续调用)
        return buildAvailableTargets(enabledCandidates);
    }

    /**
     * 辅助方法:将首选模型调整到候选列表第一位
     */
    private void reorderFirstChoice(List<AIModelProperties.ModelCandidate> candidates, String firstChoiceModelId) {
        for (int i = 0; i < candidates.size(); i++) {
            AIModelProperties.ModelCandidate candidate = candidates.get(i);
            if (firstChoiceModelId.equals(candidate.getId())) {
                // 移除当前位置,插入到第一位
                candidates.remove(i);
                candidates.add(0, candidate);
                break;
            }
        }
    }

    /**
     * 辅助方法:将ModelCandidate转换为ModelTarget(封装模型ID、候选信息等)
     */
    private List<ModelTarget> buildAvailableTargets(List<AIModelProperties.ModelCandidate> candidates) {
        return candidates.stream()
                .map(candidate -> ModelTarget.of(candidate.getId(), candidate))
                .collect(Collectors.toList());
    }
}

核心解读:

  • 筛选逻辑:先过滤“未启用”和“不支持深度思考”的模型,确保候选模型都是适配当前请求的。
  • 排序逻辑:按priority升序排序,优先级1的模型排在最前面,同时将“首选模型”(默认/深度思考模型)调整到第一位,确保核心模型优先被调用。
  • 灵活性:通过配置区分普通请求和深度思考请求,适配不同复杂度的业务场景。

4.3 路由执行器:ModelRoutingExecutor(执行降级逻辑)

该类是降级机制的核心执行组件,负责遍历候选模型,检查模型健康状态,执行调用逻辑,处理失败降级,确保“一个模型失败,自动切换下一个”。

文件路径infra-ai/.../model/ModelRoutingExecutor.java

@Component
@RequiredArgsConstructor
public class ModelRoutingExecutor {

    private final ModelHealthStore healthStore;  // 模型健康状态存储(断路器)

    /**
     * 核心方法:执行带降级的模型调用
     * @param capability 能力类型(CHAT/EMBEDDING/RERANK等)
     * @param targets 候选模型列表(按优先级排序)
     * @param clientResolver 如何根据ModelTarget获取对应的ChatClient
     * @param caller 如何执行模型调用(封装调用逻辑)
     * @return 模型调用结果
     */
    public <C, T> T executeWithFallback(
            ModelCapability capability,
            List<ModelTarget> targets,
            Function<ModelTarget, C> clientResolver,
            ModelCaller<C, T> caller) {
        
        String label = capability.getDisplayName();  // 能力名称(如"CHAT")
        Exception lastException = null;  // 记录最后一次失败异常

        // 遍历候选模型,依次尝试调用
        for (ModelTarget target : targets) {
            // 1. 获取当前模型的客户端(如siliconflow的客户端)
            C client = clientResolver.apply(target);
            if (client == null) {
                log.warn("Provider client missing: {}", target.candidate().getProvider());
                continue;  // 客户端不存在,跳过当前模型,切换下一个
            }

            // 2. 检查模型健康状态(断路器逻辑:是否允许调用)
            if (!healthStore.allowCall(target.id())) {
                log.info("Model {} is circuit-broken, skip", target.id());
                continue;  // 模型处于熔断状态,拒绝调用,切换下一个
            }

            try {
                // 3. 执行模型调用(调用caller的call方法,即客户端的chat方法)
                T response = caller.call(client, target);
                
                // 4. 调用成功:标记模型健康,返回结果(结束遍历)
                healthStore.markSuccess(target.id());
                return response;
                
            } catch (Exception e) {
                // 5. 调用失败:标记模型失败,记录异常,继续下一个模型
                healthStore.markFailure(target.id());
                lastException = e;
                log.warn("Model {} failed, fallback to next. Error: {}", 
                        target.id(), e.getMessage());
            }
        }

        // 所有模型都失败了,抛出异常(携带最后一次失败信息)
        throw new RemoteException("All " + label + " model candidates failed", lastException);
    }

    // 函数式接口:定义模型调用逻辑(客户端+目标模型 → 调用结果)
    @FunctionalInterface
    public interface ModelCaller<C, T> {
        T call(C client, ModelTarget target) throws Exception;
    }
}

核心解读:

  • 遍历逻辑:按候选模型列表的优先级,依次尝试调用,只要有一个模型调用成功,就返回结果,结束遍历。
  • 健康检查:调用前通过healthStore.allowCall检查模型状态,熔断中的模型直接跳过,避免无效调用。
  • 失败处理:调用失败后,通过healthStore.markFailure标记模型失败,累计失败次数,触发熔断逻辑,同时切换到下一个模型。
  • 通用性:通过函数式接口ModelCaller封装调用逻辑,适配不同类型的模型调用(CHAT/EMBEDDING等),通用性强。

4.4 健康状态存储:ModelHealthStore(断路器核心)

该类实现了「断路器模式」,维护每个模型的健康状态(CLOSED/OPEN/HALF_OPEN),控制模型是否允许被调用,避免模型故障时反复调用,保护服务资源。

文件路径infra-ai/.../model/ModelHealthStore.java

@Component
@RequiredArgsConstructor
public class ModelHealthStore {

    private final AIModelProperties properties;  // 配置参数(熔断阈值、冷却时间)
    // 存储每个模型的健康状态(key:模型id,value:ModelHealth),线程安全
    private final Map<String, ModelHealth> healthById = new ConcurrentHashMap<>();

    /**
     * 核心方法:检查模型是否允许被调用(断路器核心逻辑)
     * 三种状态:CLOSED(正常)、OPEN(熔断)、HALF_OPEN(半开,尝试恢复)
     */
    public boolean allowCall(String id) {
        if (id == null) return false;
        
        long now = System.currentTimeMillis();
        final boolean[] allowed = {false};  // 用于存储是否允许调用的结果
        
        // 原子操作:确保多线程环境下状态一致(ConcurrentHashMap的compute方法线程安全)
        healthById.compute(id, (k, v) -> {
            if (v == null) {
                v = new ModelHealth();  // 模型首次调用,初始化健康状态
            }
            
            // 状态1:OPEN(熔断中)
            if (v.state == State.OPEN) {
                if (v.openUntil > now) {
                    // 还在熔断冷却期,拒绝调用
                    return v;
                }
                // 熔断冷却期结束,切换到HALF_OPEN状态(尝试恢复)
                v.state = State.HALF_OPEN;
                v.halfOpenInFlight = true;  // HALF_OPEN状态下,只允许一个请求尝试
                allowed[0] = true;
                return v;
            }
            
            // 状态2:HALF_OPEN(半开,尝试恢复)
            if (v.state == State.HALF_OPEN) {
                if (v.halfOpenInFlight) {
                    // 已有一个请求在尝试恢复,拒绝其他请求
                    return v;
                }
                v.halfOpenInFlight = true;
                allowed[0] = true;
                return v;
            }
            
            // 状态3:CLOSED(正常),允许调用
            allowed[0] = true;
            return v;
        });
        
        return allowed[0];
    }

    /**
     * 标记模型调用成功:重置健康状态,切换到CLOSED
     */
    public void markSuccess(String id) {
        healthById.compute(id, (k, v) -> {
            if (v == null) {
                return new ModelHealth();
            }
            v.state = State.CLOSED;           // 重置为正常状态
            v.consecutiveFailures = 0;        // 重置连续失败计数
            v.halfOpenInFlight = false;       // 重置HALF_OPEN状态的请求标记
            return v;
        });
    }

    /**
     * 标记模型调用失败:累计失败次数,触发熔断(超过阈值则切换到OPEN)
     */
    public void markFailure(String id) {
        long now = System.currentTimeMillis();
        
        healthById.compute(id, (k, v) -> {
            if (v == null) {
                v = new ModelHealth();
            }
            
            // 如果是HALF_OPEN状态失败,立即重新熔断(恢复失败)
            if (v.state == State.HALF_OPEN) {
                v.state = State.OPEN;
                // 设置熔断结束时间(当前时间 + 冷却时间)
                v.openUntil = now + properties.getSelection().getOpenDurationMs();
                return v;
            }
            
            // 否则,累计连续失败次数
            v.consecutiveFailures++;
            
            // 连续失败次数超过阈值,触发熔断(切换到OPEN状态)
            if (v.consecutiveFailures >= properties.getSelection().getFailureThreshold()) {
                v.state = State.OPEN;
                v.openUntil = now + properties.getSelection().getOpenDurationMs();
                v.consecutiveFailures = 0;  // 重置失败计数
            }
            return v;
        });
    }

    // 内部类:封装模型的健康状态信息
    private static class ModelHealth {
        private int consecutiveFailures;  // 连续失败次数
        private long openUntil;            // 熔断结束时间(毫秒)
        private boolean halfOpenInFlight; // HALF_OPEN状态下,是否有请求正在尝试恢复
        private State state = State.CLOSED; // 初始状态:正常(CLOSED)
    }

    // 枚举:断路器三种状态
    private enum State {
        CLOSED,    // 正常:允许调用,累计失败次数
        OPEN,      // 熔断:拒绝调用,冷却一段时间
        HALF_OPEN  // 半开:冷却结束后,允许一个请求尝试恢复
    }
}

核心解读(断路器逻辑):

  • CLOSED(正常状态):允许模型调用,每次失败累计失败次数,成功则重置计数;连续失败次数达到failure-threshold(默认2次),切换到OPEN状态。
  • OPEN(熔断状态):拒绝所有调用,持续open-duration-ms(默认30秒);冷却时间结束后,切换到HALF_OPEN状态,尝试恢复。
  • HALF_OPEN(半开状态):只允许一个请求尝试调用模型;调用成功则切换到CLOSED状态(恢复正常),失败则重新切换到OPEN状态(继续熔断)。
  • 线程安全:使用ConcurrentHashMap存储健康状态,compute方法保证原子操作,避免多线程环境下的状态不一致。

4.5 首包探测:FirstPacketAwaiter + ProbeBufferingCallback

针对流式调用场景,为避免用户长时间等待无响应,设计了首包探测机制——等待模型60秒内返回首包,超时则切换模型;同时通过缓冲回调,避免失败模型的内容污染输出。

4.5.1 首包等待器:FirstPacketAwaiter

文件路径infra-ai/.../chat/FirstPacketAwaiter.java

public class FirstPacketAwaiter {

    // 倒计时锁存器:用于等待首包响应(1表示需要等待1个事件)
    private final CountDownLatch latch = new CountDownLatch(1);
    // 标记是否收到首包内容
    private final AtomicBoolean hasContent = new AtomicBoolean(false);
    // 标记首包是否出现错误
    private final AtomicReference<Throwable> error = new AtomicReference<>();

    /**
     * 等待首包响应(60秒超时,可配置)
     * @return 首包探测结果(成功/超时/错误/无内容)
     */
    public Result await(long timeout, TimeUnit unit) throws InterruptedException {
        // 等待首包,超时返回false
        boolean completed = latch.await(timeout, unit);

        // 有错误:返回错误结果
        if (error.get() != null) {
            return Result.error(error.get());
        }
        // 超时:返回超时结果
        if (!completed) {
            return Result.timeout();
        }
        // 无内容:返回无内容结果
        if (!hasContent.get()) {
            return Result.noContent();
        }
        // 成功:返回成功结果
        return Result.success();
    }

    // 标记收到首包内容
    public void markContent() {
        hasContent.set(true);
        latch.countDown();  // 触发锁存器,结束等待
    }

    // 标记首包出现错误
    public void markError(Throwable t) {
        error.set(t);
        latch.countDown();
    }

    // 标记首包无内容
    public void markNoContent() {
        hasContent.set(false);
        latch.countDown();
    }

    // 首包探测结果封装
    @Getter
    public static class Result {
        public enum Type { SUCCESS, ERROR, TIMEOUT, NO_CONTENT }
        private final Type type;
        private final Throwable error;

        // 私有构造,通过静态方法创建实例
        private Result(Type type, Throwable error) {
            this.type = type;
            this.error = error;
        }

        // 静态工厂方法:成功
        public static Result success() {
            return new Result(Type.SUCCESS, null);
        }

        // 静态工厂方法:错误
        public static Result error(Throwable error) {
            return new Result(Type.ERROR, error);
        }

        // 静态工厂方法:超时
        public static Result timeout() {
            return new Result(Type.TIMEOUT, null);
        }

        // 静态工厂方法:无内容
        public static Result noContent() {
            return new Result(Type.NO_CONTENT, null);
        }

        // 判断是否成功
        public boolean isSuccess() {
            return Type.SUCCESS.equals(this.type);
        }
    }
}
4.5.2 探测缓冲回调:ProbeBufferingCallback

该类是RoutingLLMService的内部类,用于缓冲首包探测期间的所有输出事件,避免失败模型的内容推送给用户。

// RoutingLLMService 内部类
private static final class ProbeBufferingCallback implements StreamCallback {

    private final StreamCallback downstream;  // 下游回调(最终推送给用户)
    private final FirstPacketAwaiter awaiter;  // 首包等待器
    private final List<BufferedEvent> bufferedEvents = new ArrayList<>();  // 事件缓冲列表
    private volatile boolean committed;  // 是否提交缓冲(首包成功后提交)
    private final Object lock = new Object();  // 锁,保证线程安全

    // 构造方法:传入下游回调和首包等待器
    public ProbeBufferingCallback(StreamCallback downstream, FirstPacketAwaiter awaiter) {
        this.downstream = downstream;
        this.awaiter = awaiter;
    }

    /**
     * 收到模型内容回调
     */
    @Override
    public void onContent(String content) {
        awaiter.markContent();  // 标记收到首包内容
        bufferOrDispatch(BufferedEvent.content(content));  // 缓冲或转发事件
    }

    /**
     * 收到模型思考过程回调(如函数调用前的思考)
     */
    @Override
    public void onThinking(String content) {
        bufferOrDispatch(BufferedEvent.thinking(content));
    }

    /**
     * 调用完成回调
     */
    @Override
    public void onComplete() {
        bufferOrDispatch(BufferedEvent.complete());
    }

    /**
     * 调用失败回调
     */
    @Override
    public void onError(Throwable throwable) {
        awaiter.markError(throwable);  // 标记首包错误
        downstream.onError(throwable);  // 直接转发错误(无需缓冲)
    }

    /**
     * 核心方法:缓冲事件或直接转发
     * 首包成功前:缓冲;首包成功后:直接转发
     */
    private void bufferOrDispatch(BufferedEvent event) {
        synchronized (lock) {
            if (committed) {
                // 首包成功,已提交,直接转发事件给下游
                dispatch(event);
            } else {
                // 首包未成功,缓冲事件
                bufferedEvents.add(event);
            }
        }
    }

    /**
     * 提交缓冲:首包成功后,回放所有缓冲事件
     */
    void commit() {
        synchronized (lock) {
            committed = true;
            // 按顺序回放缓冲的事件
            for (BufferedEvent event : bufferedEvents) {
                dispatch(event);
            }
            bufferedEvents.clear();  // 清空缓冲
        }
    }

    /**
     * 转发事件到下游回调
     */
    private void dispatch(BufferedEvent event) {
        switch (event.type) {
            case CONTENT -> downstream.onContent(event.content);
            case THINKING -> downstream.onThinking(event.content);
            case COMPLETE -> downstream.onComplete();
        }
    }

    // 内部类:缓冲事件封装
    private static class BufferedEvent {
        private final Type type;
        private final String content;

        // 私有构造
        private BufferedEvent(Type type, String content) {
            this.type = type;
            this.content = content;
        }

        // 静态工厂方法:内容事件
        public static BufferedEvent content(String content) {
            return new BufferedEvent(Type.CONTENT, content);
        }

        // 静态工厂方法:思考事件
        public static BufferedEvent thinking(String content) {
            return new BufferedEvent(Type.THINKING, content);
        }

        // 静态工厂方法:完成事件
        public static BufferedEvent complete() {
            return new BufferedEvent(Type.COMPLETE, null);
        }

        // 事件类型枚举
        private enum Type { CONTENT, THINKING, COMPLETE }
    }
}

核心解读:

  • 首包探测:FirstPacketAwaiter通过CountDownLatch等待首包事件,60秒超时则返回超时结果,触发模型切换。
  • 事件缓冲:ProbeBufferingCallback在首包成功前,将所有输出事件缓冲起来;首包成功后,再一次性回放所有事件,避免失败模型的内容推送给用户。
  • 失败处理:如果首包出现错误,直接转发错误给下游,同时标记模型失败,切换到下一个模型。

五、流程图解:直观理解降级和断路器工作流程

结合以下三张流程图,能更直观地理解整个模型路由降级机制的工作流程,包括同步调用、流式调用和断路器状态切换。

5.1 同步调用降级流程图

用户请求(同步聊天)
    ↓
RoutingLLMService.chat()
    ↓
┌─────────────────────────────────────────────────────────────────┐
│  ModelSelector.selectChatCandidates()                           │
│  → 按 priority 排序:[模型A, 模型B, 模型C]                       │
└─────────────────────────────────────────────────────────────────┘
    ↓
┌─────────────────────────────────────────────────────────────────┐
│  ModelRoutingExecutor.executeWithFallback()                     │
│  → 遍历候选模型,执行调用+降级逻辑                              │
│                                                                 │
│  ① 处理模型A:                                                  │
│     - healthStore.allowCall(A) → CLOSED → 允许调用              │
│     - 调用clientA.chat(request)                                │
│        ├─ 成功 → healthStore.markSuccess(A) → 返回结果 ✅        │
│        └─ 失败 → healthStore.markFailure(A) → consecutiveFailures=1 │
│              └─ 继续下一个模型                                   │
│                                                                 │
│  ② 处理模型B:                                                  │
│     - healthStore.allowCall(B) → CLOSED → 允许调用              │
│     - 调用clientB.chat(request)                                │
│        ├─ 成功 → healthStore.markSuccess(B) → 返回结果 ✅        │
│        └─ 失败 → healthStore.markFailure(B) → consecutiveFailures=1 │
│              └─ 继续下一个模型                                   │
│                                                                 │
│  ③ 处理模型C:                                                  │
│     - healthStore.allowCall(C) → CLOSED → 允许调用              │
│     - 调用clientC.chat(request)                                │
│        ├─ 成功 → healthStore.markSuccess(C) → 返回结果 ✅        │
│        └─ 失败 → healthStore.markFailure(C) → consecutiveFailures=1 │
│              └─ 所有模型都失败 → 抛异常 ❌                      │
└─────────────────────────────────────────────────────────────────┘

5.2 流式调用+首包探测流程图

用户请求(流式聊天)
    ↓
RoutingLLMService.streamChat()
    ↓
ModelSelector.selectChatCandidates() → [模型A, 模型B, 模型C]
    ↓
for 模型A in [A, B, C]:
    │
    ├─→ ① 调用clientA.streamChat(request) → 发起流式请求
    │    │
    │    ├─→ ② 创建FirstPacketAwaiter,等待首包(60秒超时)
    │    │    │
    │    │    ├─→ 收到首包 → ProbeBufferingCallback.commit() → 回放缓冲事件
    │    │    │    └─→ healthStore.markSuccess(A) → 返回StreamCancellationHandle ✅
    │    │    ├─→ 60秒超时 → 调用handle.cancel() → healthStore.markFailure(A) → 切换模型B
    │    │    └─→ 出现错误 → 调用handle.cancel() → healthStore.markFailure(A) → 切换模型B
    │    │
    │    └─→ 首包成功后,后续内容通过回调推送给用户...
    │
    ├─→ 模型B 执行上述相同逻辑,失败则切换到模型C
    └─→ 模型C 失败 → 所有模型都失败 → 抛异常 ❌

5.3 断路器状态机流程图

                              连续失败 >= 2次(failure-threshold=2)
                             ─────────────────
                            │                  │
                            ↓                  │
    ┌─────────────────────────────────────────┐ │
    │                                         │ │
    │              CLOSED                     │ │
    │           (正常状态)                     │ │
    │                                         │ │
    │   ┌───────────────────────────────┐     │ │
    │   │  consecutiveFailures: 0        │     │ │
    │   │  allowCall(): true            │     │ │
    │   │  收到成功 → 重置计数器         │     │ │
    │   │  收到失败 → 计数器+1           │─────┼── 超过阈值,触发熔断
    │   └───────────────────────────────┘     │ │
    │                                         │ │
    └─────────────────────────────────────────┘ │
                            ↑                  │
                            │                  │
                            │                  │
    ┌─────────────────────────────────────────┐ │
    │                                         │ │
    │                OPEN                     │ │
    │             (熔断状态)                    │ │
    │                                         │ │
    │   ┌───────────────────────────────┐     │ │
    │   │  openUntil: 30秒后(open-duration-ms=30000)│     │ │
    │   │  allowCall(): false (仍在熔断)│     │ │
    │   │                               │     │ │
    │   │         ┌─────────────┐       │     │ │
    │   │         │ 30秒后过期 │       │     │ │
    │   │         └──────┬──────┘       │     │ │
    │   │                ↓              │     │ │
    │   └───────────────────────────────┘     │ │
    │                    │                   │ │
    └────────────────────┼───────────────────┘ │
                         │                     │
                         │ openUntil 过期       │
                         │                     │
                         ↓                     │
    ┌─────────────────────────────────────────┐ │
    │                                         │ │
    │              HALF_OPEN                  │ │
    │            (半开尝试恢复)                 │ │
    │                                         │ │
    │   ┌───────────────────────────────┐     │ │
    │   │  halfOpenInFlight: 1          │     │ │
    │   │  allowCall(): true (限1个请求)│     │ │
    │   │                               │     │ │
    │   │  成功 → CLOSED ✅             │     │ │
    │   │  失败 → OPEN (重新熔断30秒) ❌ │─────┘ │
    │   └───────────────────────────────┘       │ │
    │                                         │ │
    └─────────────────────────────────────────┘ │
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐