前言

在 RAG 系统中,大模型推理是核心环节。当你同时对接多个模型供应商(阿里云百炼、硅基流动、本地 Ollama 等),会面临几个现实问题:

  • 模型不稳定:某供应商偶尔超时、限流或直接报错
  • 单点故障:依赖单一模型,挂了就全挂
  • 流式调用空转:连接建立成功但模型不吐 token,卡住不返回

你当然可以在业务代码里写 try-catch 来应对,但那会让代码变得难以维护。更好的做法是把健康监控、自动降级、流式探测抽象成基础设施层,业务代码完全无感知。

本文结合一个实际项目的代码,拆解这套架构的设计与实现。

一、整体架构

用户请求
  ↓
RoutingLLMService.streamChat() / ModelRoutingExecutor.executeWithFallback()
  ↓
ModelSelector.selectChatCandidates()
  ├─ filterAndSortCandidates()   → 按 首选模型 → priority 升序 → id 排序
  └─ buildAvailableTargets()
       └─ buildModelTarget()
            ├─ healthStore.isUnavailable()? → 已熔断 → return null(排除)
            └─ provider 缺失且非 NOOP?      → return null(排除)
  ↓
返回 List<ModelTarget>(预筛选后的健康模型列表)
  ↓
for 每个 target:
  ├─ resolveClient() → clientsByProvider.get(provider)
  ├─ client == null? → continue
  ├─ healthStore.allowCall(id) → 并发安全屏障(防止筛选到调用的间隙中被别的线程标记熔断)
  ├─ 调用模型 API
  │   ├─ 同步: ModelRoutingExecutor.executeWithFallback()
  │   │   └─ caller.call(client, target) → 成功 markSuccess / 异常 markFailure + 继续下一个
  │   └─ 流式: RoutingLLMService.streamChat()
  │       ├─ client.streamChat(request, ProbeStreamBridge, target)
  │       ├─ handle == null? → markFailure + continue
  │       └─ ProbeStreamBridge.awaitFirstPacket(60s)
  │           ├─ SUCCESS   → markSuccess + return handle
  │           ├─ ERROR     → markFailure + cancel + continue
  │           ├─ TIMEOUT   → markFailure + cancel + continue
  │           └─ NO_CONTENT → markFailure + cancel + continue
  ↓
全部失败 → 抛出 RemoteException / callback.onError()

二、ModelSelector:先按优先级排序,再过滤已熔断模型

infra-ai/.../model/ModelSelector.java

2.1 入口

public List<ModelTarget> selectChatCandidates(boolean deepThinking) {
    AIModelProperties.ModelGroup group = properties.getChat();
    if (group == null) {
        return List.of();
    }
    String firstChoiceModelId = resolveFirstChoiceModel(group, deepThinking);
    return selectCandidates(group, firstChoiceModelId, deepThinking);
}

private String resolveFirstChoiceModel(AIModelProperties.ModelGroup group, boolean deepThinking) {
    if (deepThinking) {
        String deepModel = group.getDeepThinkingModel();
        if (StrUtil.isNotBlank(deepModel)) {
            return deepModel;
        }
    }
    return group.getDefaultModel();
}

deepThinking=true 时首选 deepThinkingModel,否则用 defaultModel。对应配置:

ai:
  chat:
    default-model: qwen3-max
    deep-thinking-model: qwen3-max
    candidates:
      - id: qwen-plus        priority: 1
      - id: qwen3-max        priority: 3  supports-thinking: true
      - id: glm-4.7          priority: 0
      - id: qwen3-local      priority: 2

2.2 排序:filterAndSortCandidates

private List<AIModelProperties.ModelCandidate> filterAndSortCandidates(
        List<AIModelProperties.ModelCandidate> candidates,
        String firstChoiceModelId,
        boolean deepThinking) {

    List<AIModelProperties.ModelCandidate> enabled = candidates.stream()
            .filter(c -> c != null && !Boolean.FALSE.equals(c.getEnabled()))
            .filter(c -> !deepThinking || Boolean.TRUE.equals(c.getSupportsThinking()))
            .sorted(Comparator
                    .comparing((AIModelProperties.ModelCandidate c) ->
                            !Objects.equals(resolveId(c), firstChoiceModelId))
                    .thenComparing(AIModelProperties.ModelCandidate::getPriority,
                            Comparator.nullsLast(Integer::compareTo))
                    .thenComparing(AIModelProperties.ModelCandidate::getId,
                            Comparator.nullsLast(String::compareTo)))
            .collect(Collectors.toList());

    if (deepThinking && enabled.isEmpty()) {
        log.warn("深度思考模式没有可用候选模型");
    }
    return enabled;
}

排序规则:首选模型排第一 → priority 升序 → id 字典序resolveId(c)c.getId(),如果为空则拼接 provider::model

2.3 熔断过滤:buildModelTarget

排序之后的列表进入 buildAvailableTargets(),逐个调用 buildModelTarget()

private ModelTarget buildModelTarget(AIModelProperties.ModelCandidate candidate,
                                      Map<String, AIModelProperties.ProviderConfig> providers) {
    String modelId = resolveId(candidate);

    if (healthStore.isUnavailable(modelId)) {
        return null;  // ← 已熔断的模型直接排除
    }

    AIModelProperties.ProviderConfig provider = providers.get(candidate.getProvider());
    if (provider == null && !ModelProvider.NOOP.matches(candidate.getProvider())) {
        log.warn("Provider配置缺失: provider={}, modelId={}", candidate.getProvider(), modelId);
        return null;  // ← 提供商配置缺失也排除(NOOP 除外)
    }

    return new ModelTarget(modelId, candidate, provider);
}

两个过滤条件:

  • healthStore.isUnavailable(modelId) == true → 已熔断,排除
  • provider == null 且不是 NOOP 提供商 → 配置缺失,排除

2.4 isUnavailable

public boolean isUnavailable(String id) {
    ModelHealth health = healthById.get(id);
    if (health == null) {
        return false;
    }
    if (health.state == State.OPEN && health.openUntil > System.currentTimeMillis()) {
        return true;
    }
    return health.state == State.HALF_OPEN && health.halfOpenInFlight;
}

OPEN 且未到恢复时间 → 不可用;HALF_OPEN 且已有探针在跑 → 不可用。


三、三态熔断器(Circuit Breaker)

infra-ai/.../model/ModelHealthStore.java

3.1 状态定义

private enum State {
    CLOSED,     // 闭合 — 正常调用
    OPEN,       // 断开 — 拒绝请求
    HALF_OPEN   // 半开 — 放行一个探针请求
}

3.2 状态转换

CLOSED ──(连续失败 >= failureThreshold)──→ OPEN ──(等待 openDurationMs)──→ HALF_OPEN
  ↑                                                                         │
  └──────────────────────(探针成功)─────────────────────────────────────────┘

HALF_OPEN ──(探针失败)──→ OPEN(重置等待时间)

3.3 allowCall:熔断检查 + 状态自动迁移

public boolean allowCall(String id) {
    if (id == null) {
        return false;
    }
    long now = System.currentTimeMillis();
    AtomicBoolean allowed = new AtomicBoolean(false);
    healthById.compute(id, (k, v) -> {
        if (v == null) {
            v = new ModelHealth();
        }
        if (v.state == State.OPEN) {
            if (v.openUntil > now) {
                return v;           // 熔断期未过,拒绝
            }
            v.state = State.HALF_OPEN;       // 到期了,转半开
            v.halfOpenInFlight = true;        // 占住探针位
            allowed.set(true);
            return v;
        }
        if (v.state == State.HALF_OPEN) {
            if (v.halfOpenInFlight) {
                return v;           // 已有探针在跑,拒绝
            }
            v.halfOpenInFlight = true;        // 放行探针
            allowed.set(true);
            return v;
        }
        allowed.set(true);           // CLOSED 正常放行
        return v;
    });
    return allowed.get();
}

ConcurrentHashMap.compute 是原子操作,多线程安全。HALF_OPEN 只放行一个探针(halfOpenInFlight 保证)。

3.4 markFailure:失败计数 + 熔断触发

public void markFailure(String id) {
    if (id == null) {
        return;
    }
    long now = System.currentTimeMillis();
    healthById.compute(id, (k, v) -> {
        if (v == null) {
            v = new ModelHealth();
        }
        if (v.state == State.HALF_OPEN) {
            v.state = State.OPEN;                                          // 探针失败,熔回去
            v.openUntil = now + properties.getSelection().getOpenDurationMs();
            v.consecutiveFailures = 0;
            v.halfOpenInFlight = false;
            return v;
        }
        v.consecutiveFailures++;
        if (v.consecutiveFailures >= properties.getSelection().getFailureThreshold()) {
            v.state = State.OPEN;                                          // 达到阈值,熔断
            v.openUntil = now + properties.getSelection().getOpenDurationMs();
            v.consecutiveFailures = 0;
        }
        return v;
    });
}

3.5 markSuccess:恢复 CLOSED

public void markSuccess(String id) {
    if (id == null) {
        return;
    }
    healthById.compute(id, (k, v) -> {
        if (v == null) {
            return new ModelHealth();
        }
        v.state = State.CLOSED;
        v.consecutiveFailures = 0;
        v.openUntil = 0L;
        v.halfOpenInFlight = false;
        return v;
    });
}

3.6 配置参数

AIModelProperties.java

@Data
public static class Selection {
    private Integer failureThreshold = 2;       // 连续失败 2 次触发熔断
    private Long openDurationMs = 30000L;       // 熔断后等待 30s 尝试恢复
}

四、同步调用的优先级降级链

infra-ai/.../model/ModelRoutingExecutor.java

@Component
@RequiredArgsConstructor
public class ModelRoutingExecutor {

    private final ModelHealthStore healthStore;

    public <C, T> T executeWithFallback(
            ModelCapability capability,
            List<ModelTarget> targets,
            Function<ModelTarget, C> clientResolver,
            ModelCaller<C, T> caller) {

        String label = capability.getDisplayName();
        if (targets == null || targets.isEmpty()) {
            throw new RemoteException("No " + label + " model candidates available");
        }

        Throwable last = null;
        for (ModelTarget target : targets) {                   // 遍历预筛选的健康列表
            C client = clientResolver.apply(target);
            if (client == null) {
                log.warn("{} provider client missing: provider={}, modelId={}",
                        label, target.candidate().getProvider(), target.id());
                continue;
            }
            if (!healthStore.allowCall(target.id())) {         // 并发安全屏障
                continue;
            }

            try {
                T response = caller.call(client, target);      // 实际调用
                healthStore.markSuccess(target.id());          // 成功 → 恢复 CLOSED
                return response;
            } catch (Exception e) {
                last = e;
                healthStore.markFailure(target.id());          // 失败 → 计数
                log.warn("{} model failed, fallback to next. modelId={}, provider={}",
                        label, target.id(), target.candidate().getProvider(), e);
                // 继续下一个模型(当前请求不中断)
            }
        }

        throw new RemoteException(
                "All " + label + " model candidates failed: " + (last == null ? "unknown" : last.getMessage()),
                last,
                BaseErrorCode.REMOTE_ERROR
        );
    }
}

关键点:

  • targetsModelSelector 预筛选过的健康列表
  • allowCall() 是并发安全屏障(防止筛选到调用的间隙中被其他线程熔断)
  • markFailure() 影响下一次 ModelSelector 的筛选,当前请求继续遍历下一个

五、流式调用的首包探测

同步调用检测失败很简单——抛异常就是 catch。但流式调用不同:client.streamChat() 可能正常返回了 StreamCancellationHandle,但模型卡住不吐 token。需要用首包探测来验证模型是否真的在生成。

5.1 ProbeStreamBridge:首包探测 + 缓冲转发

infra-ai/.../chat/ProbeStreamBridge.java

final class ProbeStreamBridge implements StreamCallback {

    private final StreamCallback downstream;
    private final CompletableFuture<ProbeResult> probe = new CompletableFuture<>();
    private final Object lock = new Object();
    private final List<Runnable> buffer = new ArrayList<>();
    private volatile boolean committed;

    ProbeStreamBridge(StreamCallback downstream) {
        this.downstream = downstream;
    }

    @Override
    public void onContent(String content) {
        probe.complete(ProbeResult.success());            // 首包到达!
        bufferOrDispatch(() -> downstream.onContent(content));
    }

    @Override
    public void onThinking(String content) {
        probe.complete(ProbeResult.success());            // 思考内容也算首包
        bufferOrDispatch(() -> downstream.onThinking(content));
    }

    @Override
    public void onComplete() {
        probe.complete(ProbeResult.noContent());          // 流结束但没内容
        bufferOrDispatch(downstream::onComplete);
    }

    @Override
    public void onError(Throwable t) {
        probe.complete(ProbeResult.error(t));             // 流异常
        bufferOrDispatch(() -> downstream.onError(t));
    }

CompletableFuture<ProbeResult> probe 只被 complete() 一次(complete() 幂等,后续调用被忽略)。

缓冲机制:

    private void bufferOrDispatch(Runnable action) {
        boolean dispatchNow;
        synchronized (lock) {
            dispatchNow = committed;
            if (!dispatchNow) {
                buffer.add(action);           // 首包确认前先缓冲
            }
        }
        if (dispatchNow) {
            action.run();                     // 已确认,直接转发
        }
    }

    private void commit() {
        synchronized (lock) {
            if (committed) return;
            committed = true;
            buffer.forEach(Runnable::run);    // 一次性释放缓冲
        }
    }

等待首包并提交:

    ProbeResult awaitFirstPacket(long timeout, TimeUnit unit) throws InterruptedException {
        ProbeResult result;
        try {
            result = probe.get(timeout, unit);             // 阻塞等首包
        } catch (TimeoutException e) {
            return ProbeResult.timeout();
        } catch (ExecutionException e) {
            return ProbeResult.error(e.getCause());
        }

        if (result.isSuccess()) {
            commit();                                       // 确认成功,提交缓冲
        }
        return result;
    }

ProbeResult 类型:

    @Getter
    static class ProbeResult {
        enum Type { SUCCESS, ERROR, TIMEOUT, NO_CONTENT }

        private final Type type;
        private final Throwable error;

        static ProbeResult success()    { return new ProbeResult(Type.SUCCESS, null); }
        static ProbeResult error(Throwable t) { return new ProbeResult(Type.ERROR, t); }
        static ProbeResult timeout()    { return new ProbeResult(Type.TIMEOUT, null); }
        static ProbeResult noContent()  { return new ProbeResult(Type.NO_CONTENT, null); }

        boolean isSuccess() { return type == Type.SUCCESS; }
    }

5.2 RoutingLLMService:流式调用降级链

infra-ai/.../chat/RoutingLLMService.java

@Service
@Primary
public class RoutingLLMService implements LLMService {

    private static final int FIRST_PACKET_TIMEOUT_SECONDS = 60;
    private static final String STREAM_NO_PROVIDER_MESSAGE = "无可用大模型提供者";
    private static final String STREAM_START_FAILED_MESSAGE = "流式请求启动失败";
    private static final String STREAM_TIMEOUT_MESSAGE = "流式首包超时";
    private static final String STREAM_NO_CONTENT_MESSAGE = "流式请求未返回内容";
    private static final String STREAM_ALL_FAILED_MESSAGE = "大模型调用失败,请稍后再试...";

    private final ModelSelector selector;
    private final ModelHealthStore healthStore;
    private final ModelRoutingExecutor executor;
    private final Map<String, ChatClient> clientsByProvider;  // key=provider name

    public RoutingLLMService(..., List<ChatClient> clients) {
        this.clientsByProvider = clients.stream()
                .collect(Collectors.toMap(ChatClient::provider, Function.identity()));
    }

streamChat 核心逻辑:

    @Override
    @RagTraceNode(name = "llm-stream-routing", type = "LLM_ROUTING")
    public StreamCancellationHandle streamChat(ChatRequest request, StreamCallback callback) {
        List<ModelTarget> targets = selector.selectChatCandidates(
                Boolean.TRUE.equals(request.getThinking()));
        if (CollUtil.isEmpty(targets)) {
            throw new RemoteException(STREAM_NO_PROVIDER_MESSAGE);
        }

        String label = ModelCapability.CHAT.getDisplayName();
        Throwable lastError = null;

        for (ModelTarget target : targets) {                     // 遍历预筛选的健康列表
            ChatClient client = resolveClient(target, label);
            if (client == null) {
                continue;
            }
            if (!healthStore.allowCall(target.id())) {           // 并发安全屏障
                continue;
            }

            ProbeStreamBridge bridge = new ProbeStreamBridge(callback);

            StreamCancellationHandle handle;
            try {
                handle = client.streamChat(request, bridge, target);
            } catch (Exception e) {
                healthStore.markFailure(target.id());            // 启动就抛异常
                lastError = e;
                log.warn("{} 流式请求启动失败,切换下一个模型。modelId={},provider={}",
                        label, target.id(), target.candidate().getProvider(), e);
                continue;
            }
            if (handle == null) {                                // 没返回取消句柄
                healthStore.markFailure(target.id());
                lastError = new RemoteException(
                        STREAM_START_FAILED_MESSAGE, BaseErrorCode.REMOTE_ERROR);
                log.warn("{} 流式请求未返回取消句柄,切换下一个模型。modelId={},provider={}",
                        label, target.id(), target.candidate().getProvider());
                continue;
            }

            ProbeResult result = awaitFirstPacket(bridge, handle, callback);

            if (result.isSuccess()) {
                healthStore.markSuccess(target.id());            // 首包成功,锁定模型
                return handle;
            }

            // 首包失败:熔断计数 + 取消 + 下一个
            healthStore.markFailure(target.id());
            handle.cancel();
            lastError = buildLastErrorAndLog(result, target, label);
        }

        throw notifyAllFailed(callback, lastError);
    }

awaitFirstPacket 封装:

    private ProbeResult awaitFirstPacket(ProbeStreamBridge bridge,
                                          StreamCancellationHandle handle,
                                          StreamCallback callback) {
        try {
            return bridge.awaitFirstPacket(FIRST_PACKET_TIMEOUT_SECONDS, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            handle.cancel();
            RemoteException ex = new RemoteException(
                    "流式请求被中断", e, BaseErrorCode.REMOTE_ERROR);
            callback.onError(ex);
            throw ex;
        }
    }

错误分类记录:

    private Throwable buildLastErrorAndLog(ProbeResult result, ModelTarget target, String label) {
        return switch (result.getType()) {
            case ERROR -> {
                Throwable error = result.getError() != null
                        ? result.getError()
                        : new RemoteException("流式请求失败", BaseErrorCode.REMOTE_ERROR);
                log.warn("{} 失败模型: modelId={}, provider={},原因: 流式请求失败,切换下一个模型",
                        label, target.id(), target.candidate().getProvider(), error);
                yield error;
            }
            case TIMEOUT -> {
                log.warn("{} 失败模型: modelId={}, provider={},原因: 流式请求超时,切换下一个模型",
                        label, target.id(), target.candidate().getProvider());
                yield new RemoteException(STREAM_TIMEOUT_MESSAGE, BaseErrorCode.REMOTE_ERROR);
            }
            case NO_CONTENT -> {
                log.warn("{} 失败模型: modelId={}, provider={},原因: 流式请求无内容完成,切换下一个模型",
                        label, target.id(), target.candidate().getProvider());
                yield new RemoteException(STREAM_NO_CONTENT_MESSAGE, BaseErrorCode.REMOTE_ERROR);
            }
            default -> {
                log.warn("{} 失败模型: modelId={}, provider={},原因: 流式请求失败(未知类型),切换下一个模型",
                        label, target.id(), target.candidate().getProvider());
                yield new RemoteException("流式请求失败", BaseErrorCode.REMOTE_ERROR);
            }
        };
    }

    private RemoteException notifyAllFailed(StreamCallback callback, Throwable lastError) {
        RemoteException finalException = new RemoteException(
                STREAM_ALL_FAILED_MESSAGE, lastError, BaseErrorCode.REMOTE_ERROR);
        callback.onError(finalException);
        return finalException;
    }

六、完整流程示例

配置 failureThreshold=2openDurationMs=30000,候选链 glm-4.7 → qwen-plus → qwen3-local → qwen3-max

T0:    用户提问(流式)
T0+0:  ModelSelector.selectChatCandidates()
        ├─ filterAndSortCandidates: 排序 → [glm-4.7, qwen-plus, qwen3-local, qwen3-max]
        └─ buildAvailableTargets: 假设全部健康,全量返回
       → 得到 targets = [glm-4.7, qwen-plus, qwen3-local, qwen3-max]

T0+0:  遍历 glm-4.7
        ├─ resolveClient → 找到 siliconflow 的 ChatClient
        ├─ allowCall → CLOSED,放行
        ├─ client.streamChat(request, bridge, target) → 返回 handle
        └─ awaitFirstPacket(60s)... 超时!
        → markFailure (consecutiveFailures=1) → cancel → continue

T0+60: 遍历 qwen-plus
        ├─ allowCall → CLOSED,放行
        ├─ client.streamChat → 返回 handle
        └─ awaitFirstPacket(60s)... 收到 onContent!
        → markSuccess → 锁定 qwen-plus
        → commit() 释放缓冲内容 → 后续 onContent 直接转发前端

30s 内另一个请求也失败一次 glm-4.7,达到阈值:

T0+30: 另一请求尝试 glm-4.7
        ├─ allowCall → CLOSED,放行
        ├─ client.streamChat → 异常
        └─ markFailure (consecutiveFailures=2 == threshold)
        → state = OPEN, openUntil = now + 30s
        → 下一个请求进来,ModelSelector.buildModelTarget() 发现
          isUnavailable → return null → glm-4.7 不在候选列表中

30s 后半开恢复:

T0+60: 某个请求尝试 glm-4.7
        ├─ allowCall: state=OPEN, openUntil 已过
        │  → 自动转 HALF_OPEN, halfOpenInFlight=true → 放行
        ├─ client.streamChat → 正常返回 → 首包 SUCCESS
        └─ markSuccess → state=CLOSED, 恢复正常

        如果探针失败:
        → markFailure(HALF_OPEN) → 回到 OPEN, openUntil = now + 30s

总体大致流程:

用户发消息
   ↓
选模型列表(深思/普通)
   ↓
遍历模型
   ↓
解析客户端 → 空则跳过
   ↓
熔断检查 allowCall
   ↓
不允许 → 跳过
允许 → 继续
   ↓
创建探针桥 ProbeStreamBridge
   ↓
调用模型 streamChat
   ↓
启动失败 / handle空 → markFailure → 下一个
   ↓
等待首包(60秒)
   ↓
   成功?
   ├─── 是 → markSuccess → 下发流 → 返回
   └─── 否 → markFailure → 取消 → 下一个
所有模型失败 → 回调错误给用户

总结

这个模块最终只对外暴露一个接口,优点十分之多,设计的非常好。
总的来说,这块有点难理解,看的让人头大,感觉很陌生而且很难受,对于后端开发转学RAG的同学来说应该是最难的一个版块了,不过好在AI的辅助下作者还是啃完了,也有了初步的一个深刻认识,希望能够帮助到大家。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐