大模型 API 编排:多模型路由与降级策略的工程实践
大模型 API 编排:多模型路由与降级策略的工程实践

一、单一模型的单点风险:当 GPT-4 宕机怎么办
依赖单一 LLM 供应商的应用面临两个风险:服务宕机和价格波动。2024 年 OpenAI 多次出现 API 故障,每次持续数小时,依赖 GPT-4 的应用直接瘫痪。同时,不同模型的价格差异巨大——GPT-4 约每百万 Token $30,而 Claude 3 Haiku 仅 $0.25,差距 120 倍。
多模型路由通过将请求分发到多个 LLM,解决了这两个问题:主模型不可用时自动切换到备用模型(降级),简单请求使用低成本模型(成本优化)。但多模型编排引入了新的复杂度——不同模型的 API 格式、能力边界和输出风格各异,需要统一的抽象层和智能路由策略。
flowchart TB
Request[用户请求] --> Router[模型路由器]
Router -->|复杂推理| GPT4[GPT-4<br/>$30/M tokens]
Router -->|中等任务| Claude[Claude Sonnet<br/>$3/M tokens]
Router -->|简单任务| Haiku[Claude Haiku<br/>$0.25/M tokens]
GPT4 -->|超时/错误| Fallback1[降级→Claude Sonnet]
Claude -->|超时/错误| Fallback2[降级→Haiku]
GPT4 --> Response[统一响应格式]
Claude --> Response
Haiku --> Response
二、多模型路由的核心机制
2.1 基于任务复杂度的路由
不同任务对模型能力的要求不同。简单分类(如情感分析)用轻量模型即可,复杂推理(如数学证明)需要旗舰模型。路由策略基于任务复杂度评分:评分低于阈值用轻量模型,高于阈值用旗舰模型。复杂度可以从输入长度、关键词和任务类型推断。
2.2 降级与重试策略
当主模型返回错误或超时时,自动切换到备用模型。降级策略需要考虑两个因素:降级模型的能力是否足够(不能从 GPT-4 降级到无法完成任务的模型),以及降级是否对用户透明(简单任务降级用户无感知,复杂任务降级可能影响输出质量)。
sequenceDiagram
participant Client as 客户端
participant Router as 模型路由器
participant GPT4 as GPT-4
participant Claude as Claude Sonnet
participant Haiku as Claude Haiku
Client->>Router: 复杂推理请求
Router->>GPT4: 转发请求
GPT4--xRouter: 超时(10秒无响应)
Router->>Claude: 降级转发
Claude-->>Router: 返回结果
Router->>Client: 返回结果(标记:降级自GPT-4)
Note over Router: 记录GPT4故障<br/>后续请求直接路由到Claude
三、生产级代码实现
3.1 统一模型接口与路由器
import asyncio
import time
import logging
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from enum import Enum
logger = logging.getLogger(__name__)
class ModelTier(Enum):
FLAGSHIP = "flagship" # 旗舰模型:GPT-4, Claude Opus
STANDARD = "standard" # 标准模型:Claude Sonnet, GPT-4o-mini
ECONOMY = "economy" # 经济模型:Claude Haiku, GPT-3.5
@dataclass
class ModelConfig:
"""模型配置"""
name: str
tier: ModelTier
provider: str
max_tokens: int
cost_per_million_input: float
cost_per_million_output: float
timeout_seconds: float = 30.0
is_available: bool = True
@dataclass
class UnifiedResponse:
"""统一响应格式"""
content: str
model: str
tier: ModelTier
is_degraded: bool = False # 是否经过降级
latency_ms: float = 0.0
cost_usd: float = 0.0
usage: Dict[str, int] = field(default_factory=dict)
class ModelRouter:
"""多模型路由器
设计考量:
- 基于任务复杂度的智能路由
- 自动降级:主模型故障时切换备用模型
- 成本追踪:记录每次请求的实际花费
- 熔断机制:连续失败的模型暂时移出路由
"""
def __init__(self):
self._models: Dict[str, ModelConfig] = {}
self._failure_counts: Dict[str, int] = {}
self._circuit_open: Dict[str, float] = {} # model -> 熔断开启时间
self._failure_threshold = 3 # 连续失败3次触发熔断
self._circuit_reset_seconds = 300 # 熔断5分钟后重试
def register_model(self, config: ModelConfig) -> None:
"""注册模型"""
self._models[config.name] = config
async def route(
self,
messages: List[Dict],
complexity: str = "medium", # low/medium/high
preferred_model: Optional[str] = None,
) -> UnifiedResponse:
"""路由请求到合适的模型"""
# 1. 确定目标层级
target_tier = self._complexity_to_tier(complexity)
# 2. 选择模型
if preferred_model and preferred_model in self._models:
model_name = preferred_model
else:
model_name = self._select_model(target_tier)
# 3. 调用模型(含降级)
response = await self._call_with_fallback(model_name, messages)
return response
def _complexity_to_tier(self, complexity: str) -> ModelTier:
"""将任务复杂度映射到模型层级"""
mapping = {
"low": ModelTier.ECONOMY,
"medium": ModelTier.STANDARD,
"high": ModelTier.FLAGSHIP,
}
return mapping.get(complexity, ModelTier.STANDARD)
def _select_model(self, target_tier: ModelTier) -> str:
"""选择指定层级中可用的模型"""
candidates = [
name for name, config in self._models.items()
if config.tier == target_tier
and config.is_available
and not self._is_circuit_open(name)
]
if not candidates:
# 降级到更低的层级
tier_order = [ModelTier.FLAGSHIP, ModelTier.STANDARD, ModelTier.ECONOMY]
current_idx = tier_order.index(target_tier)
for lower_tier in tier_order[current_idx + 1:]:
candidates = [
name for name, config in self._models.items()
if config.tier == lower_tier and config.is_available
]
if candidates:
break
if not candidates:
raise RuntimeError("所有模型均不可用")
# 优先选择成本最低的
return min(candidates, key=lambda n: self._models[n].cost_per_million_input)
async def _call_with_fallback(
self,
model_name: str,
messages: List[Dict],
) -> UnifiedResponse:
"""调用模型,失败时自动降级"""
fallback_chain = self._build_fallback_chain(model_name)
for i, name in enumerate(fallback_chain):
config = self._models[name]
start_time = time.time()
try:
# 模拟模型调用(实际实现使用 httpx/aiohttp)
content = await self._call_model(name, config, messages)
latency_ms = (time.time() - start_time) * 1000
# 成功:重置失败计数
self._failure_counts[name] = 0
cost = self._estimate_cost(config, messages, content)
return UnifiedResponse(
content=content,
model=name,
tier=config.tier,
is_degraded=(i > 0),
latency_ms=latency_ms,
cost_usd=cost,
)
except Exception as e:
logger.warning(f"模型 {name} 调用失败: {e}")
self._record_failure(name)
if i == len(fallback_chain) - 1:
raise RuntimeError(f"所有模型均失败,最后尝试: {name}")
raise RuntimeError("不应到达此处")
def _build_fallback_chain(self, primary: str) -> List[str]:
"""构建降级链:主模型 → 同级备用 → 低级模型"""
chain = [primary]
primary_tier = self._models[primary].tier
# 同级备用
for name, config in self._models.items():
if config.tier == primary_tier and name != primary and config.is_available:
chain.append(name)
# 低级模型
tier_order = [ModelTier.FLAGSHIP, ModelTier.STANDARD, ModelTier.ECONOMY]
current_idx = tier_order.index(primary_tier)
for lower_tier in tier_order[current_idx + 1:]:
for name, config in self._models.items():
if config.tier == lower_tier and config.is_available:
chain.append(name)
return chain
async def _call_model(
self,
name: str,
config: ModelConfig,
messages: List[Dict],
) -> str:
"""调用具体模型(模拟)"""
await asyncio.sleep(0.1) # 模拟网络延迟
return f"[{name}] 模型生成的回复内容"
def _estimate_cost(
self,
config: ModelConfig,
messages: List[Dict],
response: str,
) -> float:
"""估算请求成本"""
input_tokens = sum(len(m.get("content", "")) for m in messages) // 4
output_tokens = len(response) // 4
cost = (
input_tokens * config.cost_per_million_input / 1_000_000
+ output_tokens * config.cost_per_million_output / 1_000_000
)
return cost
def _record_failure(self, model_name: str) -> None:
"""记录模型失败,触发熔断"""
self._failure_counts[model_name] = self._failure_counts.get(model_name, 0) + 1
if self._failure_counts[model_name] >= self._failure_threshold:
self._circuit_open[model_name] = time.time()
logger.error(f"模型 {model_name} 触发熔断,{self._circuit_reset_seconds}秒后重试")
def _is_circuit_open(self, model_name: str) -> bool:
"""检查模型是否处于熔断状态"""
if model_name not in self._circuit_open:
return False
elapsed = time.time() - self._circuit_open[model_name]
if elapsed > self._circuit_reset_seconds:
# 熔断超时,允许重试
del self._circuit_open[model_name]
self._failure_counts[model_name] = 0
return False
return True
四、边界分析与架构权衡
4.1 降级对输出质量的影响
从旗舰模型降级到经济模型,输出质量必然下降。对于简单任务(如分类、摘要),降级影响可忽略;对于复杂推理(如数学证明、代码生成),降级可能导致输出不可用。解决方案是在降级响应中标记 is_degraded=True,让业务层决定是否接受降级结果。
4.2 模型输出的格式差异
不同模型的输出格式不完全一致——GPT-4 的 JSON 输出可能比 Claude 更可靠,Claude 的 Markdown 格式化可能更好。统一响应层需要做格式归一化,但这增加了维护成本。对于格式敏感的场景(如结构化数据提取),应指定特定模型而非依赖路由。
4.3 成本追踪的准确性
Token 计数和成本估算是近似的,实际费用以供应商账单为准。对于成本敏感的应用,应在请求前后记录 Token 用量,定期与供应商账单对账。
五、总结
多模型路由通过智能分发和自动降级,解决了单一模型依赖的风险。基于任务复杂度的路由策略在成本和质量之间找到平衡,熔断机制防止故障蔓延。统一的响应格式让业务层无需感知底层模型差异。
落地路线建议:第一步,注册 2-3 个不同层级的模型,实现基本的路由和降级;第二步,添加熔断机制,防止故障模型持续拖慢响应;第三步,实现成本追踪,监控每日模型调用费用;第四步,根据历史数据优化路由策略,将更多请求路由到低成本模型。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)