大模型 API 编排:多模型路由与降级策略的工程实践

cover

一、单一模型的单点风险:当 GPT-4 宕机怎么办

依赖单一 LLM 供应商的应用面临两个风险:服务宕机和价格波动。2024 年 OpenAI 多次出现 API 故障,每次持续数小时,依赖 GPT-4 的应用直接瘫痪。同时,不同模型的价格差异巨大——GPT-4 约每百万 Token $30,而 Claude 3 Haiku 仅 $0.25,差距 120 倍。

多模型路由通过将请求分发到多个 LLM,解决了这两个问题:主模型不可用时自动切换到备用模型(降级),简单请求使用低成本模型(成本优化)。但多模型编排引入了新的复杂度——不同模型的 API 格式、能力边界和输出风格各异,需要统一的抽象层和智能路由策略。

flowchart TB
    Request[用户请求] --> Router[模型路由器]
    Router -->|复杂推理| GPT4[GPT-4<br/>$30/M tokens]
    Router -->|中等任务| Claude[Claude Sonnet<br/>$3/M tokens]
    Router -->|简单任务| Haiku[Claude Haiku<br/>$0.25/M tokens]

    GPT4 -->|超时/错误| Fallback1[降级→Claude Sonnet]
    Claude -->|超时/错误| Fallback2[降级→Haiku]

    GPT4 --> Response[统一响应格式]
    Claude --> Response
    Haiku --> Response

二、多模型路由的核心机制

2.1 基于任务复杂度的路由

不同任务对模型能力的要求不同。简单分类(如情感分析)用轻量模型即可,复杂推理(如数学证明)需要旗舰模型。路由策略基于任务复杂度评分:评分低于阈值用轻量模型,高于阈值用旗舰模型。复杂度可以从输入长度、关键词和任务类型推断。

2.2 降级与重试策略

当主模型返回错误或超时时,自动切换到备用模型。降级策略需要考虑两个因素:降级模型的能力是否足够(不能从 GPT-4 降级到无法完成任务的模型),以及降级是否对用户透明(简单任务降级用户无感知,复杂任务降级可能影响输出质量)。

sequenceDiagram
    participant Client as 客户端
    participant Router as 模型路由器
    participant GPT4 as GPT-4
    participant Claude as Claude Sonnet
    participant Haiku as Claude Haiku

    Client->>Router: 复杂推理请求
    Router->>GPT4: 转发请求
    GPT4--xRouter: 超时(10秒无响应)

    Router->>Claude: 降级转发
    Claude-->>Router: 返回结果
    Router->>Client: 返回结果(标记:降级自GPT-4)

    Note over Router: 记录GPT4故障<br/>后续请求直接路由到Claude

三、生产级代码实现

3.1 统一模型接口与路由器

import asyncio
import time
import logging
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from enum import Enum

logger = logging.getLogger(__name__)


class ModelTier(Enum):
    FLAGSHIP = "flagship"    # 旗舰模型:GPT-4, Claude Opus
    STANDARD = "standard"    # 标准模型:Claude Sonnet, GPT-4o-mini
    ECONOMY = "economy"      # 经济模型:Claude Haiku, GPT-3.5


@dataclass
class ModelConfig:
    """模型配置"""
    name: str
    tier: ModelTier
    provider: str
    max_tokens: int
    cost_per_million_input: float
    cost_per_million_output: float
    timeout_seconds: float = 30.0
    is_available: bool = True


@dataclass
class UnifiedResponse:
    """统一响应格式"""
    content: str
    model: str
    tier: ModelTier
    is_degraded: bool = False  # 是否经过降级
    latency_ms: float = 0.0
    cost_usd: float = 0.0
    usage: Dict[str, int] = field(default_factory=dict)


class ModelRouter:
    """多模型路由器

    设计考量:
    - 基于任务复杂度的智能路由
    - 自动降级:主模型故障时切换备用模型
    - 成本追踪:记录每次请求的实际花费
    - 熔断机制:连续失败的模型暂时移出路由
    """

    def __init__(self):
        self._models: Dict[str, ModelConfig] = {}
        self._failure_counts: Dict[str, int] = {}
        self._circuit_open: Dict[str, float] = {}  # model -> 熔断开启时间
        self._failure_threshold = 3  # 连续失败3次触发熔断
        self._circuit_reset_seconds = 300  # 熔断5分钟后重试

    def register_model(self, config: ModelConfig) -> None:
        """注册模型"""
        self._models[config.name] = config

    async def route(
        self,
        messages: List[Dict],
        complexity: str = "medium",  # low/medium/high
        preferred_model: Optional[str] = None,
    ) -> UnifiedResponse:
        """路由请求到合适的模型"""
        # 1. 确定目标层级
        target_tier = self._complexity_to_tier(complexity)

        # 2. 选择模型
        if preferred_model and preferred_model in self._models:
            model_name = preferred_model
        else:
            model_name = self._select_model(target_tier)

        # 3. 调用模型(含降级)
        response = await self._call_with_fallback(model_name, messages)

        return response

    def _complexity_to_tier(self, complexity: str) -> ModelTier:
        """将任务复杂度映射到模型层级"""
        mapping = {
            "low": ModelTier.ECONOMY,
            "medium": ModelTier.STANDARD,
            "high": ModelTier.FLAGSHIP,
        }
        return mapping.get(complexity, ModelTier.STANDARD)

    def _select_model(self, target_tier: ModelTier) -> str:
        """选择指定层级中可用的模型"""
        candidates = [
            name for name, config in self._models.items()
            if config.tier == target_tier
            and config.is_available
            and not self._is_circuit_open(name)
        ]

        if not candidates:
            # 降级到更低的层级
            tier_order = [ModelTier.FLAGSHIP, ModelTier.STANDARD, ModelTier.ECONOMY]
            current_idx = tier_order.index(target_tier)
            for lower_tier in tier_order[current_idx + 1:]:
                candidates = [
                    name for name, config in self._models.items()
                    if config.tier == lower_tier and config.is_available
                ]
                if candidates:
                    break

        if not candidates:
            raise RuntimeError("所有模型均不可用")

        # 优先选择成本最低的
        return min(candidates, key=lambda n: self._models[n].cost_per_million_input)

    async def _call_with_fallback(
        self,
        model_name: str,
        messages: List[Dict],
    ) -> UnifiedResponse:
        """调用模型,失败时自动降级"""
        fallback_chain = self._build_fallback_chain(model_name)

        for i, name in enumerate(fallback_chain):
            config = self._models[name]
            start_time = time.time()

            try:
                # 模拟模型调用(实际实现使用 httpx/aiohttp)
                content = await self._call_model(name, config, messages)
                latency_ms = (time.time() - start_time) * 1000

                # 成功:重置失败计数
                self._failure_counts[name] = 0

                cost = self._estimate_cost(config, messages, content)

                return UnifiedResponse(
                    content=content,
                    model=name,
                    tier=config.tier,
                    is_degraded=(i > 0),
                    latency_ms=latency_ms,
                    cost_usd=cost,
                )

            except Exception as e:
                logger.warning(f"模型 {name} 调用失败: {e}")
                self._record_failure(name)

                if i == len(fallback_chain) - 1:
                    raise RuntimeError(f"所有模型均失败,最后尝试: {name}")

        raise RuntimeError("不应到达此处")

    def _build_fallback_chain(self, primary: str) -> List[str]:
        """构建降级链:主模型 → 同级备用 → 低级模型"""
        chain = [primary]
        primary_tier = self._models[primary].tier

        # 同级备用
        for name, config in self._models.items():
            if config.tier == primary_tier and name != primary and config.is_available:
                chain.append(name)

        # 低级模型
        tier_order = [ModelTier.FLAGSHIP, ModelTier.STANDARD, ModelTier.ECONOMY]
        current_idx = tier_order.index(primary_tier)
        for lower_tier in tier_order[current_idx + 1:]:
            for name, config in self._models.items():
                if config.tier == lower_tier and config.is_available:
                    chain.append(name)

        return chain

    async def _call_model(
        self,
        name: str,
        config: ModelConfig,
        messages: List[Dict],
    ) -> str:
        """调用具体模型(模拟)"""
        await asyncio.sleep(0.1)  # 模拟网络延迟
        return f"[{name}] 模型生成的回复内容"

    def _estimate_cost(
        self,
        config: ModelConfig,
        messages: List[Dict],
        response: str,
    ) -> float:
        """估算请求成本"""
        input_tokens = sum(len(m.get("content", "")) for m in messages) // 4
        output_tokens = len(response) // 4
        cost = (
            input_tokens * config.cost_per_million_input / 1_000_000
            + output_tokens * config.cost_per_million_output / 1_000_000
        )
        return cost

    def _record_failure(self, model_name: str) -> None:
        """记录模型失败,触发熔断"""
        self._failure_counts[model_name] = self._failure_counts.get(model_name, 0) + 1
        if self._failure_counts[model_name] >= self._failure_threshold:
            self._circuit_open[model_name] = time.time()
            logger.error(f"模型 {model_name} 触发熔断,{self._circuit_reset_seconds}秒后重试")

    def _is_circuit_open(self, model_name: str) -> bool:
        """检查模型是否处于熔断状态"""
        if model_name not in self._circuit_open:
            return False
        elapsed = time.time() - self._circuit_open[model_name]
        if elapsed > self._circuit_reset_seconds:
            # 熔断超时,允许重试
            del self._circuit_open[model_name]
            self._failure_counts[model_name] = 0
            return False
        return True

四、边界分析与架构权衡

4.1 降级对输出质量的影响

从旗舰模型降级到经济模型,输出质量必然下降。对于简单任务(如分类、摘要),降级影响可忽略;对于复杂推理(如数学证明、代码生成),降级可能导致输出不可用。解决方案是在降级响应中标记 is_degraded=True,让业务层决定是否接受降级结果。

4.2 模型输出的格式差异

不同模型的输出格式不完全一致——GPT-4 的 JSON 输出可能比 Claude 更可靠,Claude 的 Markdown 格式化可能更好。统一响应层需要做格式归一化,但这增加了维护成本。对于格式敏感的场景(如结构化数据提取),应指定特定模型而非依赖路由。

4.3 成本追踪的准确性

Token 计数和成本估算是近似的,实际费用以供应商账单为准。对于成本敏感的应用,应在请求前后记录 Token 用量,定期与供应商账单对账。

五、总结

多模型路由通过智能分发和自动降级,解决了单一模型依赖的风险。基于任务复杂度的路由策略在成本和质量之间找到平衡,熔断机制防止故障蔓延。统一的响应格式让业务层无需感知底层模型差异。

落地路线建议:第一步,注册 2-3 个不同层级的模型,实现基本的路由和降级;第二步,添加熔断机制,防止故障模型持续拖慢响应;第三步,实现成本追踪,监控每日模型调用费用;第四步,根据历史数据优化路由策略,将更多请求路由到低成本模型。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐