RAG:AI基础层的三态熔断+ 优先级降级链 + 流式首包探测
前言
在 RAG 系统中,大模型推理是核心环节。当你同时对接多个模型供应商(阿里云百炼、硅基流动、本地 Ollama 等),会面临几个现实问题:
- 模型不稳定:某供应商偶尔超时、限流或直接报错
- 单点故障:依赖单一模型,挂了就全挂
- 流式调用空转:连接建立成功但模型不吐 token,卡住不返回
你当然可以在业务代码里写 try-catch 来应对,但那会让代码变得难以维护。更好的做法是把健康监控、自动降级、流式探测抽象成基础设施层,业务代码完全无感知。
本文结合一个实际项目的代码,拆解这套架构的设计与实现。
一、整体架构
用户请求
↓
RoutingLLMService.streamChat() / ModelRoutingExecutor.executeWithFallback()
↓
ModelSelector.selectChatCandidates()
├─ filterAndSortCandidates() → 按 首选模型 → priority 升序 → id 排序
└─ buildAvailableTargets()
└─ buildModelTarget()
├─ healthStore.isUnavailable()? → 已熔断 → return null(排除)
└─ provider 缺失且非 NOOP? → return null(排除)
↓
返回 List<ModelTarget>(预筛选后的健康模型列表)
↓
for 每个 target:
├─ resolveClient() → clientsByProvider.get(provider)
├─ client == null? → continue
├─ healthStore.allowCall(id) → 并发安全屏障(防止筛选到调用的间隙中被别的线程标记熔断)
├─ 调用模型 API
│ ├─ 同步: ModelRoutingExecutor.executeWithFallback()
│ │ └─ caller.call(client, target) → 成功 markSuccess / 异常 markFailure + 继续下一个
│ └─ 流式: RoutingLLMService.streamChat()
│ ├─ client.streamChat(request, ProbeStreamBridge, target)
│ ├─ handle == null? → markFailure + continue
│ └─ ProbeStreamBridge.awaitFirstPacket(60s)
│ ├─ SUCCESS → markSuccess + return handle
│ ├─ ERROR → markFailure + cancel + continue
│ ├─ TIMEOUT → markFailure + cancel + continue
│ └─ NO_CONTENT → markFailure + cancel + continue
↓
全部失败 → 抛出 RemoteException / callback.onError()
二、ModelSelector:先按优先级排序,再过滤已熔断模型
infra-ai/.../model/ModelSelector.java
2.1 入口
public List<ModelTarget> selectChatCandidates(boolean deepThinking) {
AIModelProperties.ModelGroup group = properties.getChat();
if (group == null) {
return List.of();
}
String firstChoiceModelId = resolveFirstChoiceModel(group, deepThinking);
return selectCandidates(group, firstChoiceModelId, deepThinking);
}
private String resolveFirstChoiceModel(AIModelProperties.ModelGroup group, boolean deepThinking) {
if (deepThinking) {
String deepModel = group.getDeepThinkingModel();
if (StrUtil.isNotBlank(deepModel)) {
return deepModel;
}
}
return group.getDefaultModel();
}
deepThinking=true 时首选 deepThinkingModel,否则用 defaultModel。对应配置:
ai:
chat:
default-model: qwen3-max
deep-thinking-model: qwen3-max
candidates:
- id: qwen-plus priority: 1
- id: qwen3-max priority: 3 supports-thinking: true
- id: glm-4.7 priority: 0
- id: qwen3-local priority: 2
2.2 排序:filterAndSortCandidates
private List<AIModelProperties.ModelCandidate> filterAndSortCandidates(
List<AIModelProperties.ModelCandidate> candidates,
String firstChoiceModelId,
boolean deepThinking) {
List<AIModelProperties.ModelCandidate> enabled = candidates.stream()
.filter(c -> c != null && !Boolean.FALSE.equals(c.getEnabled()))
.filter(c -> !deepThinking || Boolean.TRUE.equals(c.getSupportsThinking()))
.sorted(Comparator
.comparing((AIModelProperties.ModelCandidate c) ->
!Objects.equals(resolveId(c), firstChoiceModelId))
.thenComparing(AIModelProperties.ModelCandidate::getPriority,
Comparator.nullsLast(Integer::compareTo))
.thenComparing(AIModelProperties.ModelCandidate::getId,
Comparator.nullsLast(String::compareTo)))
.collect(Collectors.toList());
if (deepThinking && enabled.isEmpty()) {
log.warn("深度思考模式没有可用候选模型");
}
return enabled;
}
排序规则:首选模型排第一 → priority 升序 → id 字典序。resolveId(c) 取 c.getId(),如果为空则拼接 provider::model。
2.3 熔断过滤:buildModelTarget
排序之后的列表进入 buildAvailableTargets(),逐个调用 buildModelTarget():
private ModelTarget buildModelTarget(AIModelProperties.ModelCandidate candidate,
Map<String, AIModelProperties.ProviderConfig> providers) {
String modelId = resolveId(candidate);
if (healthStore.isUnavailable(modelId)) {
return null; // ← 已熔断的模型直接排除
}
AIModelProperties.ProviderConfig provider = providers.get(candidate.getProvider());
if (provider == null && !ModelProvider.NOOP.matches(candidate.getProvider())) {
log.warn("Provider配置缺失: provider={}, modelId={}", candidate.getProvider(), modelId);
return null; // ← 提供商配置缺失也排除(NOOP 除外)
}
return new ModelTarget(modelId, candidate, provider);
}
两个过滤条件:
healthStore.isUnavailable(modelId) == true→ 已熔断,排除provider == null且不是 NOOP 提供商 → 配置缺失,排除
2.4 isUnavailable
public boolean isUnavailable(String id) {
ModelHealth health = healthById.get(id);
if (health == null) {
return false;
}
if (health.state == State.OPEN && health.openUntil > System.currentTimeMillis()) {
return true;
}
return health.state == State.HALF_OPEN && health.halfOpenInFlight;
}
OPEN 且未到恢复时间 → 不可用;HALF_OPEN 且已有探针在跑 → 不可用。
三、三态熔断器(Circuit Breaker)
infra-ai/.../model/ModelHealthStore.java
3.1 状态定义
private enum State {
CLOSED, // 闭合 — 正常调用
OPEN, // 断开 — 拒绝请求
HALF_OPEN // 半开 — 放行一个探针请求
}
3.2 状态转换
CLOSED ──(连续失败 >= failureThreshold)──→ OPEN ──(等待 openDurationMs)──→ HALF_OPEN
↑ │
└──────────────────────(探针成功)─────────────────────────────────────────┘
HALF_OPEN ──(探针失败)──→ OPEN(重置等待时间)
3.3 allowCall:熔断检查 + 状态自动迁移
public boolean allowCall(String id) {
if (id == null) {
return false;
}
long now = System.currentTimeMillis();
AtomicBoolean allowed = new AtomicBoolean(false);
healthById.compute(id, (k, v) -> {
if (v == null) {
v = new ModelHealth();
}
if (v.state == State.OPEN) {
if (v.openUntil > now) {
return v; // 熔断期未过,拒绝
}
v.state = State.HALF_OPEN; // 到期了,转半开
v.halfOpenInFlight = true; // 占住探针位
allowed.set(true);
return v;
}
if (v.state == State.HALF_OPEN) {
if (v.halfOpenInFlight) {
return v; // 已有探针在跑,拒绝
}
v.halfOpenInFlight = true; // 放行探针
allowed.set(true);
return v;
}
allowed.set(true); // CLOSED 正常放行
return v;
});
return allowed.get();
}
ConcurrentHashMap.compute 是原子操作,多线程安全。HALF_OPEN 只放行一个探针(halfOpenInFlight 保证)。
3.4 markFailure:失败计数 + 熔断触发
public void markFailure(String id) {
if (id == null) {
return;
}
long now = System.currentTimeMillis();
healthById.compute(id, (k, v) -> {
if (v == null) {
v = new ModelHealth();
}
if (v.state == State.HALF_OPEN) {
v.state = State.OPEN; // 探针失败,熔回去
v.openUntil = now + properties.getSelection().getOpenDurationMs();
v.consecutiveFailures = 0;
v.halfOpenInFlight = false;
return v;
}
v.consecutiveFailures++;
if (v.consecutiveFailures >= properties.getSelection().getFailureThreshold()) {
v.state = State.OPEN; // 达到阈值,熔断
v.openUntil = now + properties.getSelection().getOpenDurationMs();
v.consecutiveFailures = 0;
}
return v;
});
}
3.5 markSuccess:恢复 CLOSED
public void markSuccess(String id) {
if (id == null) {
return;
}
healthById.compute(id, (k, v) -> {
if (v == null) {
return new ModelHealth();
}
v.state = State.CLOSED;
v.consecutiveFailures = 0;
v.openUntil = 0L;
v.halfOpenInFlight = false;
return v;
});
}
3.6 配置参数
AIModelProperties.java:
@Data
public static class Selection {
private Integer failureThreshold = 2; // 连续失败 2 次触发熔断
private Long openDurationMs = 30000L; // 熔断后等待 30s 尝试恢复
}
四、同步调用的优先级降级链
infra-ai/.../model/ModelRoutingExecutor.java
@Component
@RequiredArgsConstructor
public class ModelRoutingExecutor {
private final ModelHealthStore healthStore;
public <C, T> T executeWithFallback(
ModelCapability capability,
List<ModelTarget> targets,
Function<ModelTarget, C> clientResolver,
ModelCaller<C, T> caller) {
String label = capability.getDisplayName();
if (targets == null || targets.isEmpty()) {
throw new RemoteException("No " + label + " model candidates available");
}
Throwable last = null;
for (ModelTarget target : targets) { // 遍历预筛选的健康列表
C client = clientResolver.apply(target);
if (client == null) {
log.warn("{} provider client missing: provider={}, modelId={}",
label, target.candidate().getProvider(), target.id());
continue;
}
if (!healthStore.allowCall(target.id())) { // 并发安全屏障
continue;
}
try {
T response = caller.call(client, target); // 实际调用
healthStore.markSuccess(target.id()); // 成功 → 恢复 CLOSED
return response;
} catch (Exception e) {
last = e;
healthStore.markFailure(target.id()); // 失败 → 计数
log.warn("{} model failed, fallback to next. modelId={}, provider={}",
label, target.id(), target.candidate().getProvider(), e);
// 继续下一个模型(当前请求不中断)
}
}
throw new RemoteException(
"All " + label + " model candidates failed: " + (last == null ? "unknown" : last.getMessage()),
last,
BaseErrorCode.REMOTE_ERROR
);
}
}
关键点:
targets是ModelSelector预筛选过的健康列表allowCall()是并发安全屏障(防止筛选到调用的间隙中被其他线程熔断)markFailure()影响下一次ModelSelector的筛选,当前请求继续遍历下一个
五、流式调用的首包探测
同步调用检测失败很简单——抛异常就是 catch。但流式调用不同:client.streamChat() 可能正常返回了 StreamCancellationHandle,但模型卡住不吐 token。需要用首包探测来验证模型是否真的在生成。
5.1 ProbeStreamBridge:首包探测 + 缓冲转发
infra-ai/.../chat/ProbeStreamBridge.java
final class ProbeStreamBridge implements StreamCallback {
private final StreamCallback downstream;
private final CompletableFuture<ProbeResult> probe = new CompletableFuture<>();
private final Object lock = new Object();
private final List<Runnable> buffer = new ArrayList<>();
private volatile boolean committed;
ProbeStreamBridge(StreamCallback downstream) {
this.downstream = downstream;
}
@Override
public void onContent(String content) {
probe.complete(ProbeResult.success()); // 首包到达!
bufferOrDispatch(() -> downstream.onContent(content));
}
@Override
public void onThinking(String content) {
probe.complete(ProbeResult.success()); // 思考内容也算首包
bufferOrDispatch(() -> downstream.onThinking(content));
}
@Override
public void onComplete() {
probe.complete(ProbeResult.noContent()); // 流结束但没内容
bufferOrDispatch(downstream::onComplete);
}
@Override
public void onError(Throwable t) {
probe.complete(ProbeResult.error(t)); // 流异常
bufferOrDispatch(() -> downstream.onError(t));
}
CompletableFuture<ProbeResult> probe 只被 complete() 一次(complete() 幂等,后续调用被忽略)。
缓冲机制:
private void bufferOrDispatch(Runnable action) {
boolean dispatchNow;
synchronized (lock) {
dispatchNow = committed;
if (!dispatchNow) {
buffer.add(action); // 首包确认前先缓冲
}
}
if (dispatchNow) {
action.run(); // 已确认,直接转发
}
}
private void commit() {
synchronized (lock) {
if (committed) return;
committed = true;
buffer.forEach(Runnable::run); // 一次性释放缓冲
}
}
等待首包并提交:
ProbeResult awaitFirstPacket(long timeout, TimeUnit unit) throws InterruptedException {
ProbeResult result;
try {
result = probe.get(timeout, unit); // 阻塞等首包
} catch (TimeoutException e) {
return ProbeResult.timeout();
} catch (ExecutionException e) {
return ProbeResult.error(e.getCause());
}
if (result.isSuccess()) {
commit(); // 确认成功,提交缓冲
}
return result;
}
ProbeResult 类型:
@Getter
static class ProbeResult {
enum Type { SUCCESS, ERROR, TIMEOUT, NO_CONTENT }
private final Type type;
private final Throwable error;
static ProbeResult success() { return new ProbeResult(Type.SUCCESS, null); }
static ProbeResult error(Throwable t) { return new ProbeResult(Type.ERROR, t); }
static ProbeResult timeout() { return new ProbeResult(Type.TIMEOUT, null); }
static ProbeResult noContent() { return new ProbeResult(Type.NO_CONTENT, null); }
boolean isSuccess() { return type == Type.SUCCESS; }
}
5.2 RoutingLLMService:流式调用降级链
infra-ai/.../chat/RoutingLLMService.java
@Service
@Primary
public class RoutingLLMService implements LLMService {
private static final int FIRST_PACKET_TIMEOUT_SECONDS = 60;
private static final String STREAM_NO_PROVIDER_MESSAGE = "无可用大模型提供者";
private static final String STREAM_START_FAILED_MESSAGE = "流式请求启动失败";
private static final String STREAM_TIMEOUT_MESSAGE = "流式首包超时";
private static final String STREAM_NO_CONTENT_MESSAGE = "流式请求未返回内容";
private static final String STREAM_ALL_FAILED_MESSAGE = "大模型调用失败,请稍后再试...";
private final ModelSelector selector;
private final ModelHealthStore healthStore;
private final ModelRoutingExecutor executor;
private final Map<String, ChatClient> clientsByProvider; // key=provider name
public RoutingLLMService(..., List<ChatClient> clients) {
this.clientsByProvider = clients.stream()
.collect(Collectors.toMap(ChatClient::provider, Function.identity()));
}
streamChat 核心逻辑:
@Override
@RagTraceNode(name = "llm-stream-routing", type = "LLM_ROUTING")
public StreamCancellationHandle streamChat(ChatRequest request, StreamCallback callback) {
List<ModelTarget> targets = selector.selectChatCandidates(
Boolean.TRUE.equals(request.getThinking()));
if (CollUtil.isEmpty(targets)) {
throw new RemoteException(STREAM_NO_PROVIDER_MESSAGE);
}
String label = ModelCapability.CHAT.getDisplayName();
Throwable lastError = null;
for (ModelTarget target : targets) { // 遍历预筛选的健康列表
ChatClient client = resolveClient(target, label);
if (client == null) {
continue;
}
if (!healthStore.allowCall(target.id())) { // 并发安全屏障
continue;
}
ProbeStreamBridge bridge = new ProbeStreamBridge(callback);
StreamCancellationHandle handle;
try {
handle = client.streamChat(request, bridge, target);
} catch (Exception e) {
healthStore.markFailure(target.id()); // 启动就抛异常
lastError = e;
log.warn("{} 流式请求启动失败,切换下一个模型。modelId={},provider={}",
label, target.id(), target.candidate().getProvider(), e);
continue;
}
if (handle == null) { // 没返回取消句柄
healthStore.markFailure(target.id());
lastError = new RemoteException(
STREAM_START_FAILED_MESSAGE, BaseErrorCode.REMOTE_ERROR);
log.warn("{} 流式请求未返回取消句柄,切换下一个模型。modelId={},provider={}",
label, target.id(), target.candidate().getProvider());
continue;
}
ProbeResult result = awaitFirstPacket(bridge, handle, callback);
if (result.isSuccess()) {
healthStore.markSuccess(target.id()); // 首包成功,锁定模型
return handle;
}
// 首包失败:熔断计数 + 取消 + 下一个
healthStore.markFailure(target.id());
handle.cancel();
lastError = buildLastErrorAndLog(result, target, label);
}
throw notifyAllFailed(callback, lastError);
}
awaitFirstPacket 封装:
private ProbeResult awaitFirstPacket(ProbeStreamBridge bridge,
StreamCancellationHandle handle,
StreamCallback callback) {
try {
return bridge.awaitFirstPacket(FIRST_PACKET_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
handle.cancel();
RemoteException ex = new RemoteException(
"流式请求被中断", e, BaseErrorCode.REMOTE_ERROR);
callback.onError(ex);
throw ex;
}
}
错误分类记录:
private Throwable buildLastErrorAndLog(ProbeResult result, ModelTarget target, String label) {
return switch (result.getType()) {
case ERROR -> {
Throwable error = result.getError() != null
? result.getError()
: new RemoteException("流式请求失败", BaseErrorCode.REMOTE_ERROR);
log.warn("{} 失败模型: modelId={}, provider={},原因: 流式请求失败,切换下一个模型",
label, target.id(), target.candidate().getProvider(), error);
yield error;
}
case TIMEOUT -> {
log.warn("{} 失败模型: modelId={}, provider={},原因: 流式请求超时,切换下一个模型",
label, target.id(), target.candidate().getProvider());
yield new RemoteException(STREAM_TIMEOUT_MESSAGE, BaseErrorCode.REMOTE_ERROR);
}
case NO_CONTENT -> {
log.warn("{} 失败模型: modelId={}, provider={},原因: 流式请求无内容完成,切换下一个模型",
label, target.id(), target.candidate().getProvider());
yield new RemoteException(STREAM_NO_CONTENT_MESSAGE, BaseErrorCode.REMOTE_ERROR);
}
default -> {
log.warn("{} 失败模型: modelId={}, provider={},原因: 流式请求失败(未知类型),切换下一个模型",
label, target.id(), target.candidate().getProvider());
yield new RemoteException("流式请求失败", BaseErrorCode.REMOTE_ERROR);
}
};
}
private RemoteException notifyAllFailed(StreamCallback callback, Throwable lastError) {
RemoteException finalException = new RemoteException(
STREAM_ALL_FAILED_MESSAGE, lastError, BaseErrorCode.REMOTE_ERROR);
callback.onError(finalException);
return finalException;
}
六、完整流程示例
配置 failureThreshold=2,openDurationMs=30000,候选链 glm-4.7 → qwen-plus → qwen3-local → qwen3-max:
T0: 用户提问(流式)
T0+0: ModelSelector.selectChatCandidates()
├─ filterAndSortCandidates: 排序 → [glm-4.7, qwen-plus, qwen3-local, qwen3-max]
└─ buildAvailableTargets: 假设全部健康,全量返回
→ 得到 targets = [glm-4.7, qwen-plus, qwen3-local, qwen3-max]
T0+0: 遍历 glm-4.7
├─ resolveClient → 找到 siliconflow 的 ChatClient
├─ allowCall → CLOSED,放行
├─ client.streamChat(request, bridge, target) → 返回 handle
└─ awaitFirstPacket(60s)... 超时!
→ markFailure (consecutiveFailures=1) → cancel → continue
T0+60: 遍历 qwen-plus
├─ allowCall → CLOSED,放行
├─ client.streamChat → 返回 handle
└─ awaitFirstPacket(60s)... 收到 onContent!
→ markSuccess → 锁定 qwen-plus
→ commit() 释放缓冲内容 → 后续 onContent 直接转发前端
30s 内另一个请求也失败一次 glm-4.7,达到阈值:
T0+30: 另一请求尝试 glm-4.7
├─ allowCall → CLOSED,放行
├─ client.streamChat → 异常
└─ markFailure (consecutiveFailures=2 == threshold)
→ state = OPEN, openUntil = now + 30s
→ 下一个请求进来,ModelSelector.buildModelTarget() 发现
isUnavailable → return null → glm-4.7 不在候选列表中
30s 后半开恢复:
T0+60: 某个请求尝试 glm-4.7
├─ allowCall: state=OPEN, openUntil 已过
│ → 自动转 HALF_OPEN, halfOpenInFlight=true → 放行
├─ client.streamChat → 正常返回 → 首包 SUCCESS
└─ markSuccess → state=CLOSED, 恢复正常
如果探针失败:
→ markFailure(HALF_OPEN) → 回到 OPEN, openUntil = now + 30s
总体大致流程:
用户发消息
↓
选模型列表(深思/普通)
↓
遍历模型
↓
解析客户端 → 空则跳过
↓
熔断检查 allowCall
↓
不允许 → 跳过
允许 → 继续
↓
创建探针桥 ProbeStreamBridge
↓
调用模型 streamChat
↓
启动失败 / handle空 → markFailure → 下一个
↓
等待首包(60秒)
↓
成功?
├─── 是 → markSuccess → 下发流 → 返回
└─── 否 → markFailure → 取消 → 下一个
所有模型失败 → 回调错误给用户
总结
这个模块最终只对外暴露一个接口,优点十分之多,设计的非常好。
总的来说,这块有点难理解,看的让人头大,感觉很陌生而且很难受,对于后端开发转学RAG的同学来说应该是最难的一个版块了,不过好在AI的辅助下作者还是啃完了,也有了初步的一个深刻认识,希望能够帮助到大家。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)