多模型路由与降级容灾的工程实践:从任务分级、成本/时延预算到熔断切换与效果回放的可复现方案
多模型路由与降级容灾的工程实践:从任务分级、成本/时延预算到熔断切换与效果回放的可复现方案
在大模型应用上线后,很多问题不是“能不能答出来”,而是“能不能稳定地答出来”。同一条请求,白天和晚上效果不一样;主模型偶发超时,接口账单又持续抬高;一旦上游波动,整个服务跟着抖。说实话,我最开始也把多模型路由想简单了,以为加一层 if-else 就够,实测跑到线上后才发现,真正难的是预算约束、失败切换和事后回放。
这篇文章整理一套可复现方案,核心目标很直接:在质量、时延、成本之间做可控取舍,并且失败时能平稳降级。短句先说清:别把路由器写成黑盒。
我会从任务分级、路由决策、熔断降级、效果回放四部分展开,附一套可落地的数据结构和代码示例,方便你直接改进现有服务。
一、问题定义:为什么单模型方案容易失稳
很多团队初期会选一个主力模型,把所有请求都打过去。这种方式接入快,问题也很集中。
典型现象有几类:
- 简单问答用了高价模型,成本偏高
- 复杂任务落到轻量模型,结果波动明显
- 上游接口超时或限流后,没有备用路径
- 线上效果下降了,只能看日志,难以复盘当时为何切换
这里有个常见误区:把“路由”理解成“按业务线分模型”。实际上工程里更常见的是按请求难度、时延预算、成本预算、风险等级来分配模型。请求维度更细,路由结果才更稳。
一句话概括:模型不是固定绑定业务,而是绑定约束条件。
二、整体架构:路由层需要管什么
我通常把多模型服务拆成 5 个模块:
- 请求分析器:提取任务类型、上下文长度、是否要求结构化输出、是否允许工具调用
- 策略路由器:根据预算和规则选择模型
- 执行器:负责超时、重试、并发、取消
- 容灾控制器:负责熔断、降级、切换、恢复
- 回放评估器:把线上请求回流到离线评估集,做对照分析
对应的数据流可以简化成:
用户请求 -> 特征提取 -> 路由决策 -> 模型执行 -> 失败检测 -> 降级切换 -> 响应记录 -> 离线回放
看起来不复杂。真正麻烦的是细节。
比如同样是“摘要任务”,短文本和长文本的最佳模型可能不同;同样是“结构化抽取”,是否要求严格 JSON,能不能容忍重试,决策也不一样。所以路由不能只看 task_type,至少还要看几个关键特征。
三、任务分级:先把请求切成可管理的桶
路由稳定的前提,不是规则写得多,而是任务切分足够清楚。我一般先定义一份任务画像。
1. 请求特征字段
建议最少保留这些字段:
from dataclasses import dataclass
from typing import Optional
@dataclass
class RequestProfile:
task_type: str # qa / summarize / extract / rewrite / agent
input_tokens: int
output_tokens_budget: int
need_json: bool
need_tools: bool
risk_level: str # low / medium / high
latency_sla_ms: int
cost_budget_micros: int # 单请求成本预算,单位微美元
tenant_tier: str # free / pro / enterprise
language: str
biz_scene: Optional[str] = None
这里的 risk_level 很有用。比如合同审核、财务问答这类请求,就算内容不长,也不该轻易路由到低成本模型。
2. 任务分级规则
我常用一个简单但有效的分级方式:
- L1 低风险任务:改写、标题生成、短摘要
- L2 中风险任务:知识问答、普通抽取、客服回复
- L3 高风险任务:结构化抽取、工具调用、合规类判断
- L4 极高风险任务:关键业务审批建议、代码修改建议、面向外部客户的正式输出
分级后不要直接映射模型。先加预算约束。
3. 预算字段别省
很多路由失败,根因不是模型选错,而是系统没把预算当成一等公民。至少要同时维护两种预算:
- 时延预算:例如 1200ms 内必须返回
- 成本预算:例如 单请求不超过 0.8 分人民币对应的 token 成本
如果没有预算约束,路由器最终会变成“谁效果好就打谁”,账单很快会给你上课。
四、路由策略设计:规则优先,评分补充
工程上我不建议一开始就做纯学习式路由。原因很现实:标注样本不够,线上分布还在变,出了问题也难解释。更稳妥的做法是规则打底,评分补充。
1. 模型注册表
先定义模型能力与运行约束:
from dataclasses import dataclass
@dataclass
class ModelSpec:
model_name: str
provider: str
max_context_tokens: int
supports_json: bool
supports_tools: bool
avg_latency_ms: int
p95_latency_ms: int
input_cost_per_1k: float
output_cost_per_1k: float
quality_score: float # 离线评估得到的基线分
availability_score: float # 近期成功率
enabled: bool = True
示例配置:
MODEL_REGISTRY = [
ModelSpec("gpt-heavy", "provider_a", 128000, True, True, 2200, 3800, 0.03, 0.06, 0.91, 0.995),
ModelSpec("gpt-medium", "provider_a", 64000, True, False, 900, 1500, 0.01, 0.02, 0.84, 0.992),
ModelSpec("qwen-lite", "provider_b", 32000, True, False, 500, 900, 0.003, 0.006, 0.78, 0.989),
ModelSpec("local-mini", "self_hosted", 16000, False, False, 300, 700, 0.0008, 0.0012, 0.68, 0.970),
]
2. 先过滤,再排序
先做硬约束过滤:
def estimate_cost(spec: ModelSpec, input_tokens: int, output_tokens: int) -> float:
return (input_tokens / 1000) * spec.input_cost_per_1k + (output_tokens / 1000) * spec.output_cost_per_1k
def filter_candidates(profile, registry):
candidates = []
for spec in registry:
if not spec.enabled:
continue
if profile.input_tokens + profile.output_tokens_budget > spec.max_context_tokens:
continue
if profile.need_json and not spec.supports_json:
continue
if profile.need_tools and not spec.supports_tools:
continue
if spec.p95_latency_ms > profile.latency_sla_ms:
continue
est_cost = estimate_cost(spec, profile.input_tokens, profile.output_tokens_budget)
if est_cost * 1_000_000 > profile.cost_budget_micros:
continue
candidates.append((spec, est_cost))
return candidates
过滤掉明显不合适的模型后,再打分排序。这个顺序很关键。先过滤,能避免一个高质量但注定超预算的模型一直排第一。
3. 评分函数
评分不需要太花。够用就行。
def route_score(profile, spec: ModelSpec, est_cost: float):
quality_weight = {
"L1": 0.35,
"L2": 0.45,
"L3": 0.60,
"L4": 0.72,
}[profile.task_level]
latency_penalty = spec.avg_latency_ms / max(profile.latency_sla_ms, 1)
cost_penalty = (est_cost * 1_000_000) / max(profile.cost_budget_micros, 1)
return (
quality_weight * spec.quality_score
+ 0.20 * spec.availability_score
- 0.12 * latency_penalty
- 0.08 * cost_penalty
)
我自己的经验是,线上第一版别把公式堆太长。参数多了,调参时间会吃掉你很多精力,而且很难说明某次切换究竟由哪项触发。
4. 路由主流程
def select_model(profile, registry):
candidates = filter_candidates(profile, registry)
if not candidates:
return None
ranked = sorted(
candidates,
key=lambda item: route_score(profile, item[0], item[1]),
reverse=True
)
return ranked[0][0]
这一步拿到的是“主选模型”,不是最终执行计划。还需要提前算好备选路径。
五、降级与容灾:从单点失败变成可控退让
多模型路由真正有价值的地方,在异常场景。接口抖一下,系统不能跟着趴下。
1. 降级顺序要预定义
建议为每个任务等级配置固定的降级路径,例如:
routing_plan:
L1:
primary: qwen-lite
fallbacks: [local-mini]
L2:
primary: gpt-medium
fallbacks: [qwen-lite, local-mini]
L3:
primary: gpt-heavy
fallbacks: [gpt-medium]
L4:
primary: gpt-heavy
fallbacks: []
这里我故意让 L4 不自动降到本地小模型。原因很简单:高风险任务如果质量明显下滑,错误输出的代价往往比超时更高。宁可失败,也别胡答。
2. 熔断器实现
我常用滑动窗口统计错误率和超时率,触发熔断后短时间不再分配流量。
import time
from collections import deque
class CircuitBreaker:
def __init__(self, window_size=50, fail_threshold=0.4, recover_after_sec=30):
self.window_size = window_size
self.fail_threshold = fail_threshold
self.recover_after_sec = recover_after_sec
self.events = deque(maxlen=window_size)
self.open_until = 0
def allow(self):
return time.time() >= self.open_until
def record(self, success: bool):
self.events.append(1 if success else 0)
if len(self.events) < self.window_size:
return
fail_rate = 1 - sum(self.events) / len(self.events)
if fail_rate >= self.fail_threshold:
self.open_until = time.time() + self.recover_after_sec
很多人只统计 HTTP 5xx。其实还不够。建议把以下情况都视作失败事件:
- 超时
- 限流
- 空响应
- JSON 解析失败
- 工具调用参数校验失败
这些错误对业务侧来说,结果都一样:请求不可用。
3. 执行与切换
class ModelExecutor:
def __init__(self, clients, breakers):
self.clients = clients
self.breakers = breakers
def call_with_fallback(self, model_sequence, request_payload):
errors = []
for model_name in model_sequence:
breaker = self.breakers[model_name]
if not breaker.allow():
errors.append((model_name, "circuit_open"))
continue
try:
result = self.clients[model_name].invoke(request_payload, timeout=8)
if not result or not result.get("content"):
raise ValueError("empty_response")
breaker.record(True)
return {
"model_name": model_name,
"result": result,
"errors": errors,
}
except Exception as e:
breaker.record(False)
errors.append((model_name, str(e)))
raise RuntimeError({"all_failed": errors})
这里有个工程细节:切换时要保留原始错误栈和路由原因。否则线上只会看到“用了 fallback”,但不知道是超时切的,还是 JSON 失败切的,排查会很痛苦。
4. 超时预算要逐跳递减
如果总预算 8 秒,主模型已经跑了 6.5 秒,再给备选模型完整 8 秒就不合理。应该做剩余预算传递。
短句提醒一下:预算不能重复花。
可以这样处理:
import time
def invoke_with_budget(executor, model_sequence, payload, total_timeout_sec=8):
start = time.time()
errors = []
for model_name in model_sequence:
remaining = total_timeout_sec - (time.time() - start)
if remaining <= 0:
break
try:
result = executor.clients[model_name].invoke(payload, timeout=min(remaining, 5))
return result
except Exception as e:
errors.append((model_name, str(e), round(remaining, 2)))
raise TimeoutError(errors)
六、结构化日志:不给回放留证据,后面基本靠猜
多模型系统上线后,最怕“结果变差了,但不知道是哪一层变差”。所以从第一天起就要把日志字段定好。
我建议每次请求至少记录:
{
"request_id": "req_20260429_001",
"task_type": "extract",
"task_level": "L3",
"input_tokens": 4200,
"output_tokens_budget": 600,
"need_json": true,
"latency_sla_ms": 3000,
"cost_budget_micros": 18000,
"primary_model": "gpt-heavy",
"candidate_models": ["gpt-heavy", "gpt-medium"],
"selected_model": "gpt-heavy",
"fallback_models": ["gpt-medium"],
"final_model": "gpt-medium",
"switch_reason": "timeout",
"end_to_end_latency_ms": 2870,
"provider_status": "success",
"parse_status": "json_ok",
"user_feedback": null,
"prompt_version": "extract_v17",
"trace_id": "trace_xxx"
}
这里的 switch_reason、prompt_version、final_model 三个字段,后面做回放和归因时会非常省事。
七、效果回放:把线上故障变成离线样本集
回放这件事,经常被放到最后做,结果出问题时只能临时捞日志。我建议反过来:路由系统设计时就把回放入口留好。
1. 回放样本怎么采
我一般会把以下请求回流到离线集:
- 主模型失败后发生切换的请求
- 用户点踩或人工复核判错的请求
- 时延接近 SLA 上限的请求
- 新旧路由策略决策不一致的请求
这些样本很有信息量。全量回放当然更完整,但算力消耗不小,很多团队扛不住。
2. 回放对照维度
回放时不要只比“答得像不像”。要同时看:
- 是否命中预算
- 是否发生切换
- 输出是否可解析
- 人工评分或规则评分是否下降
这里给一个回放脚本骨架:
from typing import List, Dict
class ReplayRunner:
def __init__(self, router_old, router_new, executor, evaluator):
self.router_old = router_old
self.router_new = router_new
self.executor = executor
self.evaluator = evaluator
def run(self, samples: List[Dict]):
report = []
for sample in samples:
profile = sample["profile"]
payload = sample["payload"]
old_plan = self.router_old.plan(profile)
new_plan = self.router_new.plan(profile)
old_result = self.executor.call_with_fallback(old_plan, payload)
new_result = self.executor.call_with_fallback(new_plan, payload)
old_score = self.evaluator.score(sample, old_result)
new_score = self.evaluator.score(sample, new_result)
report.append({
"request_id": sample["request_id"],
"old_model": old_result["model_name"],
"new_model": new_result["model_name"],
"old_score": old_score,
"new_score": new_score,
"same_route": old_plan == new_plan,
})
return report
3. 我的一次实测结果
下面是一组内部任务集上的对照数据,样本数 3000,包含摘要、抽取、知识问答和工具调用请求。为了可复现,口径统一为同一批请求、同一套评估规则、连续 7 天日志回流。
| 方案 | 平均成本/千请求 | P95时延 | 结构化输出成功率 | 人工抽检通过率 | 自动切换占比 |
|---|---|---|---|---|---|
| 单模型高配 | 92.4 元 | 4.8 s | 96.1% | 91.7% | 0% |
| 静态双模型 | 67.8 元 | 4.1 s | 94.8% | 89.9% | 6.2% |
| 动态路由+熔断降级 | 54.6 元 | 3.2 s | 95.6% | 91.1% | 11.4% |
这组结果里,我比较看重两点:
- 成本下降约 40% 左右,时延也降了
- 质量没有明显塌陷,结构化成功率只比单模型低 0.5 个点
没想到的是,真正拉开差距的不是“更聪明的评分函数”,而是失败切换做得更早,超时请求少了很多。这点在日志里很明显。
八、线上监控:别只盯成功率
路由系统监控如果只有成功率,信息量远远不够。我建议最少看下面这些指标:
1. 按模型看
- 请求量占比
- 成功率
- 超时率
- 平均时延与 P95 时延
- 平均输入/输出 token
- 单请求成本
2. 按任务等级看
- L1/L2/L3/L4 各自的命中模型分布
- 各等级的切换率
- 各等级的用户负反馈率
3. 按切换原因看
- timeout
- rate_limit
- parse_failed
- empty_response
- circuit_open
这里不要在一个看板上堆太多曲线,值班时根本看不过来。我的做法是主看板只放异常趋势,细分归因放二级页面。
九、一个更完整的路由器实现示例
把前面的逻辑串起来,可以得到一个简化版 Router:
class RoutingEngine:
def __init__(self, registry, breakers, task_plans):
self.registry = {m.model_name: m for m in registry}
self.breakers = breakers
self.task_plans = task_plans
def plan(self, profile):
base_plan = self.task_plans.get(profile.task_level, {})
primary = base_plan.get("primary")
fallbacks = base_plan.get("fallbacks", [])
ordered = [primary] + fallbacks
valid = []
for name in ordered:
spec = self.registry.get(name)
if not spec or not spec.enabled:
continue
if self.breakers[name].allow() is False:
continue
if profile.input_tokens + profile.output_tokens_budget > spec.max_context_tokens:
continue
if profile.need_json and not spec.supports_json:
continue
if profile.need_tools and not spec.supports_tools:
continue
valid.append(name)
if valid:
return valid
# 预设计划不可用时,再走动态候选
candidates = filter_candidates(profile, list(self.registry.values()))
ranked = sorted(
candidates,
key=lambda item: route_score(profile, item[0], item[1]),
reverse=True,
)
return [item[0].model_name for item in ranked]
这个版本故意保留了“预设优先,动态补位”的设计。原因是线上事故里,稳定性通常比局部最优更重要。你完全可以把动态排序放前面,但我个人经验是,预设计划更容易解释,也更方便值班同学快速判断。
十、容易踩坑的地方
1. 把离线评估分数直接当线上真值
离线分数有参考价值,但它不能完整覆盖线上噪声。比如真实用户输入更短、更乱、错别字更多,模型排序可能变掉。
2. 降级后 Prompt 不变
高配模型能理解的隐含约束,轻量模型未必能吃住。主模型和备选模型最好有各自 Prompt 版本,至少在格式约束和步骤提示上做适配。
3. 只做模型切换,不做结果校验
如果降级到轻量模型后结构化输出更容易坏,最好在响应后加一层 schema 校验。不然用户侧看到的是成功返回,业务侧拿到的是坏数据。
4. 熔断恢复太激进
刚恢复就全量放流量,容易反复抖。更稳一点的方式是半开状态只放小流量,连续成功后再恢复。
5. 所有租户共用一套预算
企业客户、内部运营、免费用户,预算策略很难完全一致。最好按租户等级配置不同成本上限和时延上限。
这里我承认一个局限:如果你的业务高度依赖复杂推理,低成本模型的可替代空间会比较小,这时路由系统对成本的改善幅度不会像表格里那么大。
十一、落地建议:从小范围开始,不要一口气全改
如果你准备把现有单模型系统升级成多模型路由,我建议按这个顺序推进:
先补日志。没有日志,后面的优化基本没法验。
接着给请求打上任务等级和预算字段,哪怕一开始只有粗分类,也比完全没有强。等这一步跑稳,再加静态主备切换;静态方案稳定后,再引入动态评分和更细的回放评估。
我的经验是,第一阶段只要做对两件事,收益就很明显:
- 高风险请求不要盲目降级
- 超时和解析失败要立刻切备选
很多时候,工程收益来自这些很朴素的规则,而不是多复杂的算法。简单,但管用。
十二、总结
多模型路由不是为了“把所有模型都接上”,而是为了让系统在质量、时延、成本三者之间可控运行。一个能上线扛流量的方案,至少要包含下面几个点:
- 请求分级,明确风险与预算
- 路由可解释,先过滤再打分
- 降级有边界,高风险任务谨慎退让
- 熔断可恢复,失败原因要留痕
- 线上可回放,策略迭代能对照
如果你现在的服务已经遇到“主模型贵、慢、偶发不稳”这类问题,这套方案很适合先做一版简化实现。先把主备切换和日志回放跑起来,再慢慢补动态路由。别急。系统稳下来后,优化空间会比你预期的大。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)