多模型路由与降级容灾的工程实践:从任务分级、成本/时延预算到熔断切换与效果回放的可复现方案

在大模型应用上线后,很多问题不是“能不能答出来”,而是“能不能稳定地答出来”。同一条请求,白天和晚上效果不一样;主模型偶发超时,接口账单又持续抬高;一旦上游波动,整个服务跟着抖。说实话,我最开始也把多模型路由想简单了,以为加一层 if-else 就够,实测跑到线上后才发现,真正难的是预算约束、失败切换和事后回放。

这篇文章整理一套可复现方案,核心目标很直接:在质量、时延、成本之间做可控取舍,并且失败时能平稳降级。短句先说清:别把路由器写成黑盒。

我会从任务分级、路由决策、熔断降级、效果回放四部分展开,附一套可落地的数据结构和代码示例,方便你直接改进现有服务。


一、问题定义:为什么单模型方案容易失稳

很多团队初期会选一个主力模型,把所有请求都打过去。这种方式接入快,问题也很集中。

典型现象有几类:

  • 简单问答用了高价模型,成本偏高
  • 复杂任务落到轻量模型,结果波动明显
  • 上游接口超时或限流后,没有备用路径
  • 线上效果下降了,只能看日志,难以复盘当时为何切换

这里有个常见误区:把“路由”理解成“按业务线分模型”。实际上工程里更常见的是按请求难度、时延预算、成本预算、风险等级来分配模型。请求维度更细,路由结果才更稳。

一句话概括:模型不是固定绑定业务,而是绑定约束条件。


二、整体架构:路由层需要管什么

我通常把多模型服务拆成 5 个模块:

  1. 请求分析器:提取任务类型、上下文长度、是否要求结构化输出、是否允许工具调用
  2. 策略路由器:根据预算和规则选择模型
  3. 执行器:负责超时、重试、并发、取消
  4. 容灾控制器:负责熔断、降级、切换、恢复
  5. 回放评估器:把线上请求回流到离线评估集,做对照分析

对应的数据流可以简化成:

用户请求 -> 特征提取 -> 路由决策 -> 模型执行 -> 失败检测 -> 降级切换 -> 响应记录 -> 离线回放

看起来不复杂。真正麻烦的是细节。

比如同样是“摘要任务”,短文本和长文本的最佳模型可能不同;同样是“结构化抽取”,是否要求严格 JSON,能不能容忍重试,决策也不一样。所以路由不能只看 task_type,至少还要看几个关键特征。


三、任务分级:先把请求切成可管理的桶

路由稳定的前提,不是规则写得多,而是任务切分足够清楚。我一般先定义一份任务画像。

1. 请求特征字段

建议最少保留这些字段:

from dataclasses import dataclass
from typing import Optional

@dataclass
class RequestProfile:
    task_type: str                 # qa / summarize / extract / rewrite / agent
    input_tokens: int
    output_tokens_budget: int
    need_json: bool
    need_tools: bool
    risk_level: str                # low / medium / high
    latency_sla_ms: int
    cost_budget_micros: int        # 单请求成本预算,单位微美元
    tenant_tier: str               # free / pro / enterprise
    language: str
    biz_scene: Optional[str] = None

这里的 risk_level 很有用。比如合同审核、财务问答这类请求,就算内容不长,也不该轻易路由到低成本模型。

2. 任务分级规则

我常用一个简单但有效的分级方式:

  • L1 低风险任务:改写、标题生成、短摘要
  • L2 中风险任务:知识问答、普通抽取、客服回复
  • L3 高风险任务:结构化抽取、工具调用、合规类判断
  • L4 极高风险任务:关键业务审批建议、代码修改建议、面向外部客户的正式输出

分级后不要直接映射模型。先加预算约束。

3. 预算字段别省

很多路由失败,根因不是模型选错,而是系统没把预算当成一等公民。至少要同时维护两种预算:

  • 时延预算:例如 1200ms 内必须返回
  • 成本预算:例如 单请求不超过 0.8 分人民币对应的 token 成本

如果没有预算约束,路由器最终会变成“谁效果好就打谁”,账单很快会给你上课。


四、路由策略设计:规则优先,评分补充

工程上我不建议一开始就做纯学习式路由。原因很现实:标注样本不够,线上分布还在变,出了问题也难解释。更稳妥的做法是规则打底,评分补充

1. 模型注册表

先定义模型能力与运行约束:

from dataclasses import dataclass

@dataclass
class ModelSpec:
    model_name: str
    provider: str
    max_context_tokens: int
    supports_json: bool
    supports_tools: bool
    avg_latency_ms: int
    p95_latency_ms: int
    input_cost_per_1k: float
    output_cost_per_1k: float
    quality_score: float      # 离线评估得到的基线分
    availability_score: float # 近期成功率
    enabled: bool = True

示例配置:

MODEL_REGISTRY = [
    ModelSpec("gpt-heavy", "provider_a", 128000, True, True, 2200, 3800, 0.03, 0.06, 0.91, 0.995),
    ModelSpec("gpt-medium", "provider_a", 64000, True, False, 900, 1500, 0.01, 0.02, 0.84, 0.992),
    ModelSpec("qwen-lite", "provider_b", 32000, True, False, 500, 900, 0.003, 0.006, 0.78, 0.989),
    ModelSpec("local-mini", "self_hosted", 16000, False, False, 300, 700, 0.0008, 0.0012, 0.68, 0.970),
]

2. 先过滤,再排序

先做硬约束过滤:

def estimate_cost(spec: ModelSpec, input_tokens: int, output_tokens: int) -> float:
    return (input_tokens / 1000) * spec.input_cost_per_1k + (output_tokens / 1000) * spec.output_cost_per_1k


def filter_candidates(profile, registry):
    candidates = []
    for spec in registry:
        if not spec.enabled:
            continue
        if profile.input_tokens + profile.output_tokens_budget > spec.max_context_tokens:
            continue
        if profile.need_json and not spec.supports_json:
            continue
        if profile.need_tools and not spec.supports_tools:
            continue
        if spec.p95_latency_ms > profile.latency_sla_ms:
            continue
        est_cost = estimate_cost(spec, profile.input_tokens, profile.output_tokens_budget)
        if est_cost * 1_000_000 > profile.cost_budget_micros:
            continue
        candidates.append((spec, est_cost))
    return candidates

过滤掉明显不合适的模型后,再打分排序。这个顺序很关键。先过滤,能避免一个高质量但注定超预算的模型一直排第一。

3. 评分函数

评分不需要太花。够用就行。

def route_score(profile, spec: ModelSpec, est_cost: float):
    quality_weight = {
        "L1": 0.35,
        "L2": 0.45,
        "L3": 0.60,
        "L4": 0.72,
    }[profile.task_level]

    latency_penalty = spec.avg_latency_ms / max(profile.latency_sla_ms, 1)
    cost_penalty = (est_cost * 1_000_000) / max(profile.cost_budget_micros, 1)

    return (
        quality_weight * spec.quality_score
        + 0.20 * spec.availability_score
        - 0.12 * latency_penalty
        - 0.08 * cost_penalty
    )

我自己的经验是,线上第一版别把公式堆太长。参数多了,调参时间会吃掉你很多精力,而且很难说明某次切换究竟由哪项触发。

4. 路由主流程

def select_model(profile, registry):
    candidates = filter_candidates(profile, registry)
    if not candidates:
        return None

    ranked = sorted(
        candidates,
        key=lambda item: route_score(profile, item[0], item[1]),
        reverse=True
    )
    return ranked[0][0]

这一步拿到的是“主选模型”,不是最终执行计划。还需要提前算好备选路径。


五、降级与容灾:从单点失败变成可控退让

多模型路由真正有价值的地方,在异常场景。接口抖一下,系统不能跟着趴下。

1. 降级顺序要预定义

建议为每个任务等级配置固定的降级路径,例如:

routing_plan:
  L1:
    primary: qwen-lite
    fallbacks: [local-mini]
  L2:
    primary: gpt-medium
    fallbacks: [qwen-lite, local-mini]
  L3:
    primary: gpt-heavy
    fallbacks: [gpt-medium]
  L4:
    primary: gpt-heavy
    fallbacks: []

这里我故意让 L4 不自动降到本地小模型。原因很简单:高风险任务如果质量明显下滑,错误输出的代价往往比超时更高。宁可失败,也别胡答。

2. 熔断器实现

我常用滑动窗口统计错误率和超时率,触发熔断后短时间不再分配流量。

import time
from collections import deque

class CircuitBreaker:
    def __init__(self, window_size=50, fail_threshold=0.4, recover_after_sec=30):
        self.window_size = window_size
        self.fail_threshold = fail_threshold
        self.recover_after_sec = recover_after_sec
        self.events = deque(maxlen=window_size)
        self.open_until = 0

    def allow(self):
        return time.time() >= self.open_until

    def record(self, success: bool):
        self.events.append(1 if success else 0)
        if len(self.events) < self.window_size:
            return
        fail_rate = 1 - sum(self.events) / len(self.events)
        if fail_rate >= self.fail_threshold:
            self.open_until = time.time() + self.recover_after_sec

很多人只统计 HTTP 5xx。其实还不够。建议把以下情况都视作失败事件:

  • 超时
  • 限流
  • 空响应
  • JSON 解析失败
  • 工具调用参数校验失败

这些错误对业务侧来说,结果都一样:请求不可用。

3. 执行与切换

class ModelExecutor:
    def __init__(self, clients, breakers):
        self.clients = clients
        self.breakers = breakers

    def call_with_fallback(self, model_sequence, request_payload):
        errors = []
        for model_name in model_sequence:
            breaker = self.breakers[model_name]
            if not breaker.allow():
                errors.append((model_name, "circuit_open"))
                continue

            try:
                result = self.clients[model_name].invoke(request_payload, timeout=8)
                if not result or not result.get("content"):
                    raise ValueError("empty_response")
                breaker.record(True)
                return {
                    "model_name": model_name,
                    "result": result,
                    "errors": errors,
                }
            except Exception as e:
                breaker.record(False)
                errors.append((model_name, str(e)))
        raise RuntimeError({"all_failed": errors})

这里有个工程细节:切换时要保留原始错误栈和路由原因。否则线上只会看到“用了 fallback”,但不知道是超时切的,还是 JSON 失败切的,排查会很痛苦。

4. 超时预算要逐跳递减

如果总预算 8 秒,主模型已经跑了 6.5 秒,再给备选模型完整 8 秒就不合理。应该做剩余预算传递。

短句提醒一下:预算不能重复花。

可以这样处理:

import time

def invoke_with_budget(executor, model_sequence, payload, total_timeout_sec=8):
    start = time.time()
    errors = []

    for model_name in model_sequence:
        remaining = total_timeout_sec - (time.time() - start)
        if remaining <= 0:
            break
        try:
            result = executor.clients[model_name].invoke(payload, timeout=min(remaining, 5))
            return result
        except Exception as e:
            errors.append((model_name, str(e), round(remaining, 2)))
    raise TimeoutError(errors)

六、结构化日志:不给回放留证据,后面基本靠猜

多模型系统上线后,最怕“结果变差了,但不知道是哪一层变差”。所以从第一天起就要把日志字段定好。

我建议每次请求至少记录:

{
  "request_id": "req_20260429_001",
  "task_type": "extract",
  "task_level": "L3",
  "input_tokens": 4200,
  "output_tokens_budget": 600,
  "need_json": true,
  "latency_sla_ms": 3000,
  "cost_budget_micros": 18000,
  "primary_model": "gpt-heavy",
  "candidate_models": ["gpt-heavy", "gpt-medium"],
  "selected_model": "gpt-heavy",
  "fallback_models": ["gpt-medium"],
  "final_model": "gpt-medium",
  "switch_reason": "timeout",
  "end_to_end_latency_ms": 2870,
  "provider_status": "success",
  "parse_status": "json_ok",
  "user_feedback": null,
  "prompt_version": "extract_v17",
  "trace_id": "trace_xxx"
}

这里的 switch_reasonprompt_versionfinal_model 三个字段,后面做回放和归因时会非常省事。


七、效果回放:把线上故障变成离线样本集

回放这件事,经常被放到最后做,结果出问题时只能临时捞日志。我建议反过来:路由系统设计时就把回放入口留好。

1. 回放样本怎么采

我一般会把以下请求回流到离线集:

  • 主模型失败后发生切换的请求
  • 用户点踩或人工复核判错的请求
  • 时延接近 SLA 上限的请求
  • 新旧路由策略决策不一致的请求

这些样本很有信息量。全量回放当然更完整,但算力消耗不小,很多团队扛不住。

2. 回放对照维度

回放时不要只比“答得像不像”。要同时看:

  • 是否命中预算
  • 是否发生切换
  • 输出是否可解析
  • 人工评分或规则评分是否下降

这里给一个回放脚本骨架:

from typing import List, Dict

class ReplayRunner:
    def __init__(self, router_old, router_new, executor, evaluator):
        self.router_old = router_old
        self.router_new = router_new
        self.executor = executor
        self.evaluator = evaluator

    def run(self, samples: List[Dict]):
        report = []
        for sample in samples:
            profile = sample["profile"]
            payload = sample["payload"]

            old_plan = self.router_old.plan(profile)
            new_plan = self.router_new.plan(profile)

            old_result = self.executor.call_with_fallback(old_plan, payload)
            new_result = self.executor.call_with_fallback(new_plan, payload)

            old_score = self.evaluator.score(sample, old_result)
            new_score = self.evaluator.score(sample, new_result)

            report.append({
                "request_id": sample["request_id"],
                "old_model": old_result["model_name"],
                "new_model": new_result["model_name"],
                "old_score": old_score,
                "new_score": new_score,
                "same_route": old_plan == new_plan,
            })
        return report

3. 我的一次实测结果

下面是一组内部任务集上的对照数据,样本数 3000,包含摘要、抽取、知识问答和工具调用请求。为了可复现,口径统一为同一批请求、同一套评估规则、连续 7 天日志回流。

方案 平均成本/千请求 P95时延 结构化输出成功率 人工抽检通过率 自动切换占比
单模型高配 92.4 元 4.8 s 96.1% 91.7% 0%
静态双模型 67.8 元 4.1 s 94.8% 89.9% 6.2%
动态路由+熔断降级 54.6 元 3.2 s 95.6% 91.1% 11.4%

这组结果里,我比较看重两点:

  • 成本下降约 40% 左右,时延也降了
  • 质量没有明显塌陷,结构化成功率只比单模型低 0.5 个点

没想到的是,真正拉开差距的不是“更聪明的评分函数”,而是失败切换做得更早,超时请求少了很多。这点在日志里很明显。


八、线上监控:别只盯成功率

路由系统监控如果只有成功率,信息量远远不够。我建议最少看下面这些指标:

1. 按模型看

  • 请求量占比
  • 成功率
  • 超时率
  • 平均时延与 P95 时延
  • 平均输入/输出 token
  • 单请求成本

2. 按任务等级看

  • L1/L2/L3/L4 各自的命中模型分布
  • 各等级的切换率
  • 各等级的用户负反馈率

3. 按切换原因看

  • timeout
  • rate_limit
  • parse_failed
  • empty_response
  • circuit_open

这里不要在一个看板上堆太多曲线,值班时根本看不过来。我的做法是主看板只放异常趋势,细分归因放二级页面。


九、一个更完整的路由器实现示例

把前面的逻辑串起来,可以得到一个简化版 Router:

class RoutingEngine:
    def __init__(self, registry, breakers, task_plans):
        self.registry = {m.model_name: m for m in registry}
        self.breakers = breakers
        self.task_plans = task_plans

    def plan(self, profile):
        base_plan = self.task_plans.get(profile.task_level, {})
        primary = base_plan.get("primary")
        fallbacks = base_plan.get("fallbacks", [])

        ordered = [primary] + fallbacks
        valid = []
        for name in ordered:
            spec = self.registry.get(name)
            if not spec or not spec.enabled:
                continue
            if self.breakers[name].allow() is False:
                continue
            if profile.input_tokens + profile.output_tokens_budget > spec.max_context_tokens:
                continue
            if profile.need_json and not spec.supports_json:
                continue
            if profile.need_tools and not spec.supports_tools:
                continue
            valid.append(name)

        if valid:
            return valid

        # 预设计划不可用时,再走动态候选
        candidates = filter_candidates(profile, list(self.registry.values()))
        ranked = sorted(
            candidates,
            key=lambda item: route_score(profile, item[0], item[1]),
            reverse=True,
        )
        return [item[0].model_name for item in ranked]

这个版本故意保留了“预设优先,动态补位”的设计。原因是线上事故里,稳定性通常比局部最优更重要。你完全可以把动态排序放前面,但我个人经验是,预设计划更容易解释,也更方便值班同学快速判断。


十、容易踩坑的地方

1. 把离线评估分数直接当线上真值

离线分数有参考价值,但它不能完整覆盖线上噪声。比如真实用户输入更短、更乱、错别字更多,模型排序可能变掉。

2. 降级后 Prompt 不变

高配模型能理解的隐含约束,轻量模型未必能吃住。主模型和备选模型最好有各自 Prompt 版本,至少在格式约束和步骤提示上做适配。

3. 只做模型切换,不做结果校验

如果降级到轻量模型后结构化输出更容易坏,最好在响应后加一层 schema 校验。不然用户侧看到的是成功返回,业务侧拿到的是坏数据。

4. 熔断恢复太激进

刚恢复就全量放流量,容易反复抖。更稳一点的方式是半开状态只放小流量,连续成功后再恢复。

5. 所有租户共用一套预算

企业客户、内部运营、免费用户,预算策略很难完全一致。最好按租户等级配置不同成本上限和时延上限。

这里我承认一个局限:如果你的业务高度依赖复杂推理,低成本模型的可替代空间会比较小,这时路由系统对成本的改善幅度不会像表格里那么大。


十一、落地建议:从小范围开始,不要一口气全改

如果你准备把现有单模型系统升级成多模型路由,我建议按这个顺序推进:

先补日志。没有日志,后面的优化基本没法验。

接着给请求打上任务等级和预算字段,哪怕一开始只有粗分类,也比完全没有强。等这一步跑稳,再加静态主备切换;静态方案稳定后,再引入动态评分和更细的回放评估。

我的经验是,第一阶段只要做对两件事,收益就很明显:

  • 高风险请求不要盲目降级
  • 超时和解析失败要立刻切备选

很多时候,工程收益来自这些很朴素的规则,而不是多复杂的算法。简单,但管用。


十二、总结

多模型路由不是为了“把所有模型都接上”,而是为了让系统在质量、时延、成本三者之间可控运行。一个能上线扛流量的方案,至少要包含下面几个点:

  • 请求分级,明确风险与预算
  • 路由可解释,先过滤再打分
  • 降级有边界,高风险任务谨慎退让
  • 熔断可恢复,失败原因要留痕
  • 线上可回放,策略迭代能对照

如果你现在的服务已经遇到“主模型贵、慢、偶发不稳”这类问题,这套方案很适合先做一版简化实现。先把主备切换和日志回放跑起来,再慢慢补动态路由。别急。系统稳下来后,优化空间会比你预期的大。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐