FastAPI搭多模型路由:自动切换DeepSeek/Qwen/GPT
·
FastAPI搭多模型路由:自动切换DeepSeek/Qwen/GPT
文章信息
- 标题:FastAPI搭多模型路由:自动切换DeepSeek/Qwen/GPT
- 字数:4000字
- 预估阅读时间:18分钟
- 难度:⭐⭐⭐⭐☆
- 环境要求:Python 3.10+(代码中使用了
int | None、list[str]等 3.10+ 语法)
一、为什么需要多模型路由
2026年,大模型已经不是"选择一个"的问题。企业级AI应用通常需要同时接入多个模型,原因很实际:
- 成本优化:DeepSeek-V3输入约$0.27/M tokens,GPT-4o输入约$2.50/M tokens,日常对话用DeepSeek,复杂推理才切GPT-4o
- 能力分层:简单问答 → Qwen-Turbo,快要失败 → DeepSeek-R1,复杂推理 → GPT-4o
- 容错兜底:某个模型API临时不可用时自动降级到备选,不影响服务
- 合规需求:境内数据走通义/Qwen,出境数据走DeepSeek或GPT
本文用FastAPI构建一套完整的多模型路由系统,支持自动路由、成本控制、降级熔断、负载均衡。
二、多模型价格与能力对比
2026年5月各模型官方定价(大模型价格变动频繁,以下为参考价格,实际价格请务必查阅官方最新定价:DeepSeek定价、通义千问定价、OpenAI定价):
| 模型 | 输入价格($/M) | 输出价格($/M) | 上下文 | 优势场景 | 延迟参考 |
|---|---|---|---|---|---|
| DeepSeek-V3 | ~0.27 | ~1.10 | 64K | 日常对话、代码生成 | ~500ms |
| DeepSeek-R1 | ~0.55 | ~2.19 | 64K | 复杂推理、思维链 | ~1200ms |
| Qwen-Turbo | ~0.50 | ~2.00 | 128K | 国内合规、快速响应 | ~300ms |
| Qwen-Max | ~2.00 | ~6.00 | 32K | 复杂推理、高质量生成 | ~800ms |
| GPT-4o | ~2.50 | ~10.00 | 128K | 复杂推理、多模态 | ~1000ms |
| GPT-4o-mini | ~0.15 | ~0.60 | 128K | 简单任务、低成本 | ~400ms |
路由策略建议:
- 成本优先:DeepSeek-V3(日常)→ GPT-4o-mini(降级)→ Qwen-Turbo(兜底)
- 速度优先:Qwen-Turbo(国内)→ DeepSeek-V3 → GPT-4o-mini
- 质量优先:GPT-4o(复杂推理)→ DeepSeek-R1 → Qwen-Max
三、项目结构
fastapi-multi-model-router/
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── config.py
│ ├── models.py
│ ├── core/
│ │ ├── __init__.py
│ │ ├── model_registry.py # 模型注册表
│ │ ├── router.py # 路由核心逻辑
│ │ ├── fallback.py # 降级熔断器
│ │ └── load_balancer.py # 负载均衡
│ ├── providers/
│ │ ├── __init__.py
│ │ ├── base.py # Provider基类
│ │ ├── deepseek.py # DeepSeek Provider
│ │ ├── qwen.py # 阿里通义 Provider
│ │ └── openai.py # OpenAI/GPT Provider
│ ├── routes/
│ │ ├── __init__.py
│ │ └── chat.py # 聊天路由
│ └── middleware/
│ ├── __init__.py
│ └── rate_limiter.py # 模型级限流
├── config/
│ └── models.yaml # 模型配置
├── tests/
│ └── test_router.py
├── .env
└── requirements.txt
requirements.txt:
fastapi>=0.115.0
uvicorn[standard]>=0.34.0
pydantic>=2.0
pydantic-settings>=2.0
pyyaml>=6.0
httpx>=0.27.0
四、模型配置(YAML)
用 YAML 文件集中管理所有模型的配置——价格、能力、端点等。新增模型只需加一段配置,不用改代码。
# config/models.yaml
models:
# DeepSeek系列
deepseek-chat:
provider: deepseek
model_name: deepseek-chat
display_name: "DeepSeek V3"
api_base: "https://api.deepseek.com"
api_key_env: DEEPSEEK_API_KEY
input_price: 0.27 # $/M tokens
output_price: 1.10
max_tokens: 4096
context_window: 64000
latency_p50: 500 # ms
capabilities:
- chat
- function_call
tier: cheap
region: auto
deepseek-reasoner:
provider: deepseek
model_name: deepseek-reasoner
display_name: "DeepSeek R1"
api_base: "https://api.deepseek.com"
api_key_env: DEEPSEEK_API_KEY
input_price: 0.55
output_price: 2.19
max_tokens: 8192
context_window: 64000
latency_p50: 1200
capabilities:
- chat
- reasoning
- chain_of_thought
supports_temperature: false # R1推理模型不支持temperature参数
tier: reasoning
region: auto
# 阿里通义系列
qwen-turbo:
provider: qwen
model_name: qwen-turbo
display_name: "通义千问 Turbo"
api_base: "https://dashscope.aliyuncs.com/compatible-mode/v1"
api_key_env: DASHSCOPE_API_KEY
input_price: 0.50
output_price: 2.00
max_tokens: 8192
context_window: 128000
latency_p50: 300
capabilities:
- chat
- function_call
tier: fast
region: cn
qwen-max:
provider: qwen
model_name: qwen-max
display_name: "通义千问 Max"
api_base: "https://dashscope.aliyuncs.com/compatible-mode/v1"
api_key_env: DASHSCOPE_API_KEY
input_price: 2.00
output_price: 6.00
max_tokens: 8192
context_window: 32000
latency_p50: 800
capabilities:
- chat
- advanced_reasoning
tier: premium
region: cn
# OpenAI系列
gpt-4o-mini:
provider: openai
model_name: gpt-4o-mini
display_name: "GPT-4o Mini"
api_base: "https://api.openai.com/v1"
api_key_env: OPENAI_API_KEY
input_price: 0.15
output_price: 0.60
max_tokens: 16384
context_window: 128000
latency_p50: 400
capabilities:
- chat
- function_call
- vision
tier: cheap
region: us
gpt-4o:
provider: openai
model_name: gpt-4o
display_name: "GPT-4o"
api_base: "https://api.openai.com/v1"
api_key_env: OPENAI_API_KEY
input_price: 2.50
output_price: 10.00
max_tokens: 16384
context_window: 128000
latency_p50: 1000
capabilities:
- chat
- function_call
- vision
- advanced_reasoning
tier: premium
region: us
# 路由策略配置
routing:
default_strategy: cost_optimized
strategies:
cost_optimized:
order:
- deepseek-chat # 首选,便宜
- gpt-4o-mini # 降级1
- qwen-turbo # 降级2
- gpt-4o # 兜底
fallback_cooldown: 300 # 模型失败后5分钟不再路由
speed_priority:
order:
- qwen-turbo
- deepseek-chat
- gpt-4o-mini
quality_priority:
order:
- gpt-4o
- deepseek-reasoner
- qwen-max
# 限流配置(每分钟请求数)
rate_limits:
deepseek-chat: 60
deepseek-reasoner: 30
qwen-turbo: 60
gpt-4o-mini: 60
gpt-4o: 20
五、配置加载
# app/config.py
from pydantic_settings import BaseSettings
from functools import lru_cache
from pathlib import Path
from dataclasses import dataclass, field
import yaml
class ModelConfig(BaseSettings):
model_name: str
display_name: str
provider: str
api_base: str
api_key_env: str
input_price: float
output_price: float
max_tokens: int
context_window: int
latency_p50: int
capabilities: list[str]
tier: str
region: str
supports_temperature: bool = True # 推理模型(如DeepSeek-R1)不支持
class Config:
extra = "ignore" # 忽略YAML中未声明的字段(如rate_limits中的条目)
@dataclass
class StrategyConfig:
order: list[str] = field(default_factory=list)
fallback_cooldown: int = 300 # 默认5分钟冷却
@dataclass
class RoutingConfig:
default_strategy: str = "cost_optimized"
strategies: dict[str, StrategyConfig] = field(default_factory=dict)
@classmethod
def from_yaml(cls, data: dict) -> "RoutingConfig":
"""从YAML的routing段解析,处理strategies嵌套结构"""
strategies = {}
raw_strategies = data.get("strategies", {})
for name, cfg in raw_strategies.items():
strategies[name] = StrategyConfig(
order=cfg.get("order", []),
fallback_cooldown=cfg.get("fallback_cooldown", 300),
)
return cls(
default_strategy=data.get("default_strategy", "cost_optimized"),
strategies=strategies,
)
class Settings(BaseSettings):
class Config:
env_file = ".env"
def load_model_configs() -> tuple[dict[str, ModelConfig], RoutingConfig]:
"""加载YAML模型配置"""
config_path = Path(__file__).parent.parent / "config" / "models.yaml"
with open(config_path, "r", encoding="utf-8") as f:
raw = yaml.safe_load(f)
models = {}
for name, cfg in raw["models"].items():
models[name] = ModelConfig(model_name=name, **cfg)
routing_cfg = RoutingConfig.from_yaml(raw["routing"])
return models, routing_cfg
@lru_cache
def get_model_configs() -> tuple[dict[str, ModelConfig], RoutingConfig]:
return load_model_configs()
六、Provider基类与实现
# app/providers/base.py
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Literal
@dataclass
class ModelResponse:
content: str
model: str
input_tokens: int
output_tokens: int
latency_ms: int
finish_reason: str = "stop"
error: str | None = None
@dataclass
class ChatMessage:
role: Literal["system", "user", "assistant"]
content: str
class BaseModelProvider(ABC):
"""模型Provider基类"""
def __init__(self, api_key: str, api_base: str, model_name: str):
self.api_key = api_key
self.api_base = api_base
self.model_name = model_name
@abstractmethod
async def chat(
self,
messages: list[ChatMessage],
temperature: float = 0.7,
max_tokens: int | None = None,
**kwargs,
) -> ModelResponse:
"""发送聊天请求"""
pass
@abstractmethod
async def health_check(self) -> bool:
"""健康检查"""
pass
# app/providers/deepseek.py
import httpx
import os
import time
from app.providers.base import BaseModelProvider, ModelResponse, ChatMessage
class DeepSeekProvider(BaseModelProvider):
"""DeepSeek模型Provider"""
def __init__(self, model_name: str = "deepseek-chat", supports_temperature: bool = True):
api_key = os.environ.get("DEEPSEEK_API_KEY", "")
super().__init__(
api_key=api_key,
api_base="https://api.deepseek.com",
model_name=model_name,
)
self._supports_temperature = supports_temperature
async def chat(
self,
messages: list[ChatMessage],
temperature: float = 0.7,
max_tokens: int | None = None,
**kwargs,
) -> ModelResponse:
start = time.perf_counter()
# 构建请求体,推理模型跳过temperature参数
payload = {
"model": self.model_name,
"messages": [{"role": m.role, "content": m.content} for m in messages],
"max_tokens": max_tokens or 2048,
**kwargs,
}
if self._supports_temperature:
payload["temperature"] = temperature
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.post(
f"{self.api_base}/chat/completions",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
},
json=payload,
)
data = response.json()
latency_ms = int((time.perf_counter() - start) * 1000)
if response.status_code != 200:
return ModelResponse(
content="",
model=self.model_name,
input_tokens=0,
output_tokens=0,
latency_ms=latency_ms,
error=data.get("error", {}).get("message", "Unknown error"),
)
usage = data.get("usage", {})
return ModelResponse(
content=data["choices"][0]["message"]["content"],
model=self.model_name,
input_tokens=usage.get("prompt_tokens", 0),
output_tokens=usage.get("completion_tokens", 0),
latency_ms=latency_ms,
finish_reason=data["choices"][0].get("finish_reason", "stop"),
)
async def health_check(self) -> bool:
try:
async with httpx.AsyncClient(timeout=5.0) as client:
r = await client.post(
f"{self.api_base}/chat/completions",
headers={"Authorization": f"Bearer {self.api_key}"},
json={
"model": self.model_name,
"messages": [{"role": "user", "content": "hi"}],
"max_tokens": 5,
},
)
return r.status_code == 200
except Exception:
return False
# app/providers/qwen.py
import httpx
import os
import time
from app.providers.base import BaseModelProvider, ModelResponse, ChatMessage
class QwenProvider(BaseModelProvider):
"""阿里通义千问Provider"""
def __init__(self, model_name: str = "qwen-turbo"):
api_key = os.environ.get("DASHSCOPE_API_KEY", "")
super().__init__(
api_key=api_key,
api_base="https://dashscope.aliyuncs.com/compatible-mode/v1",
model_name=model_name,
)
async def chat(
self,
messages: list[ChatMessage],
temperature: float = 0.7,
max_tokens: int | None = None,
**kwargs,
) -> ModelResponse:
start = time.perf_counter()
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.post(
f"{self.api_base}/chat/completions",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
},
json={
"model": self.model_name,
"messages": [{"role": m.role, "content": m.content} for m in messages],
"temperature": temperature,
"max_tokens": max_tokens or 2048,
},
)
data = response.json()
latency_ms = int((time.perf_counter() - start) * 1000)
if response.status_code != 200:
return ModelResponse(
content="",
model=self.model_name,
input_tokens=0,
output_tokens=0,
latency_ms=latency_ms,
error=str(data),
)
usage = data.get("usage", {})
return ModelResponse(
content=data["choices"][0]["message"]["content"],
model=self.model_name,
input_tokens=usage.get("prompt_tokens", 0),
output_tokens=usage.get("completion_tokens", 0),
latency_ms=latency_ms,
finish_reason=data["choices"][0].get("finish_reason", "stop"),
)
async def health_check(self) -> bool:
try:
async with httpx.AsyncClient(timeout=5.0) as client:
r = await client.post(
f"{self.api_base}/chat/completions",
headers={"Authorization": f"Bearer {self.api_key}"},
json={
"model": self.model_name,
"messages": [{"role": "user", "content": "hi"}],
"max_tokens": 5,
},
)
return r.status_code == 200
except Exception:
return False
# app/providers/openai.py
import httpx
import os
import time
from app.providers.base import BaseModelProvider, ModelResponse, ChatMessage
class OpenAIProvider(BaseModelProvider):
"""OpenAI/GPT Provider"""
def __init__(self, model_name: str = "gpt-4o-mini"):
api_key = os.environ.get("OPENAI_API_KEY", "")
super().__init__(
api_key=api_key,
api_base="https://api.openai.com/v1",
model_name=model_name,
)
async def chat(
self,
messages: list[ChatMessage],
temperature: float = 0.7,
max_tokens: int | None = None,
**kwargs,
) -> ModelResponse:
start = time.perf_counter()
async with httpx.AsyncClient(timeout=120.0) as client:
response = await client.post(
f"{self.api_base}/chat/completions",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
},
json={
"model": self.model_name,
"messages": [{"role": m.role, "content": m.content} for m in messages],
"temperature": temperature,
"max_tokens": max_tokens or 2048,
},
)
data = response.json()
latency_ms = int((time.perf_counter() - start) * 1000)
if response.status_code != 200:
return ModelResponse(
content="",
model=self.model_name,
input_tokens=0,
output_tokens=0,
latency_ms=latency_ms,
error=data.get("error", {}).get("message", "Unknown error"),
)
usage = data.get("usage", {})
return ModelResponse(
content=data["choices"][0]["message"]["content"],
model=self.model_name,
input_tokens=usage.get("prompt_tokens", 0),
output_tokens=usage.get("completion_tokens", 0),
latency_ms=latency_ms,
finish_reason=data["choices"][0].get("finish_reason", "stop"),
)
async def health_check(self) -> bool:
try:
async with httpx.AsyncClient(timeout=5.0) as client:
r = await client.post(
f"{self.api_base}/chat/completions",
headers={"Authorization": f"Bearer {self.api_key}"},
json={
"model": self.model_name,
"messages": [{"role": "user", "content": "hi"}],
"max_tokens": 5,
},
)
return r.status_code == 200
except Exception:
return False
七、模型注册表与路由核心
模型注册表(ModelRegistry)管理所有 Provider 实例,路由核心(MultiModelRouter)根据策略选择模型并处理降级。
# app/core/model_registry.py
from app.providers.base import BaseModelProvider, ModelResponse, ChatMessage
from app.providers.deepseek import DeepSeekProvider
from app.providers.qwen import QwenProvider
from app.providers.openai import OpenAIProvider
from app.config import get_model_configs
from functools import lru_cache
from typing import Literal
class ModelRegistry:
"""全局模型注册表"""
_providers: dict[str, BaseModelProvider] = {}
_configs: dict = {}
@classmethod
def init(cls):
"""初始化所有Provider"""
cls._configs, routing_cfg = get_model_configs()
# 从配置中读取supports_temperature(默认True)
deepseek_v3_cfg = cls._configs.get("deepseek-chat")
deepseek_r1_cfg = cls._configs.get("deepseek-reasoner")
cls._providers["deepseek-chat"] = DeepSeekProvider(
"deepseek-chat",
supports_temperature=deepseek_v3_cfg.supports_temperature if deepseek_v3_cfg else True,
)
cls._providers["deepseek-reasoner"] = DeepSeekProvider(
"deepseek-reasoner",
supports_temperature=deepseek_r1_cfg.supports_temperature if deepseek_r1_cfg else False,
)
cls._providers["qwen-turbo"] = QwenProvider("qwen-turbo")
cls._providers["qwen-max"] = QwenProvider("qwen-max")
cls._providers["gpt-4o-mini"] = OpenAIProvider("gpt-4o-mini")
cls._providers["gpt-4o"] = OpenAIProvider("gpt-4o")
@classmethod
def get_provider(cls, model_name: str) -> BaseModelProvider | None:
return cls._providers.get(model_name)
@classmethod
def list_models(cls) -> list[dict]:
return [
{
"name": name,
"display_name": cfg.display_name,
"tier": cfg.tier,
"region": cfg.region,
"latency_p50": cfg.latency_p50,
}
for name, cfg in cls._configs.items()
]
# 全局初始化
ModelRegistry.init()
八、路由核心逻辑
# app/core/router.py
from app.core.model_registry import ModelRegistry
from app.providers.base import ModelResponse, ChatMessage
from app.config import get_model_configs
from dataclasses import dataclass, field
import asyncio
import time
from typing import Literal
@dataclass
class RoutingDecision:
selected_model: str
strategy: str
fallback_chain: list[str]
cost_estimate: float | None = None
class FallbackState:
"""熔断状态记录"""
def __init__(self, cooldown: int = 300):
self._failed_models: dict[str, float] = {}
self._cooldown = cooldown
def mark_failed(self, model: str) -> None:
self._failed_models[model] = time.time()
def is_cooling(self, model: str) -> bool:
if model not in self._failed_models:
return False
elapsed = time.time() - self._failed_models[model]
return elapsed < self._cooldown
def clear_model(self, model: str) -> None:
self._failed_models.pop(model, None)
class MultiModelRouter:
"""多模型路由核心"""
def __init__(self, strategy: str = "cost_optimized"):
self._configs, routing_cfg = get_model_configs()
strategy_cfg = routing_cfg.strategies.get(strategy)
self._fallback_state = FallbackState(
cooldown=strategy_cfg.fallback_cooldown if strategy_cfg else 300
)
self._strategy = strategy
self._strategy_order = strategy_cfg.order if strategy_cfg else []
async def route_and_chat(
self,
messages: list[ChatMessage],
temperature: float = 0.7,
max_tokens: int | None = None,
force_model: str | None = None,
) -> tuple[ModelResponse, RoutingDecision]:
"""
路由选择并执行chat,自动降级
返回:(响应, 路由决策)
"""
if force_model:
chain = [force_model]
else:
# 过滤掉正在冷却的模型
chain = [
m for m in self._strategy_order
if not self._fallback_state.is_cooling(m)
]
if not chain:
# 所有模型都在冷却,强制使用第一个
chain = [self._strategy_order[0]]
last_error = None
for model_name in chain:
provider = ModelRegistry.get_provider(model_name)
if not provider:
continue
try:
response = await provider.chat(
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
)
if response.error:
self._fallback_state.mark_failed(model_name)
last_error = response.error
continue
decision = RoutingDecision(
selected_model=model_name,
strategy=self._strategy,
fallback_chain=chain,
cost_estimate=self._estimate_cost(response, model_name),
)
return response, decision
except Exception as e:
# 请求异常(超时、网络错误等)自动降级到下一个模型
self._fallback_state.mark_failed(model_name)
last_error = str(e)
continue
# 所有模型都失败了
return ModelResponse(
content="",
model="",
input_tokens=0,
output_tokens=0,
latency_ms=0,
error=f"All models failed. Last error: {last_error}",
), RoutingDecision(
selected_model="none",
strategy=self._strategy,
fallback_chain=chain,
)
def _estimate_cost(self, response: ModelResponse, model_name: str) -> float:
"""计算单次请求费用(美元),统一在此处计算"""
cfg = self._configs.get(model_name)
if not cfg:
return 0.0
input_cost = response.input_tokens / 1_000_000 * cfg.input_price
output_cost = response.output_tokens / 1_000_000 * cfg.output_price
return round(input_cost + output_cost, 6)
九、FastAPI路由
# app/models.py
from pydantic import BaseModel, Field
from typing import Literal, Optional
class MessageInput(BaseModel):
role: Literal["system", "user", "assistant"]
content: str
class ChatRequest(BaseModel):
messages: list[MessageInput] = Field(min_length=1)
model: Optional[str] = Field(default=None, description="指定模型,为空则自动路由")
strategy: Literal["cost_optimized", "speed_priority", "quality_priority"] = "cost_optimized"
temperature: float = Field(default=0.7, ge=0, le=2)
max_tokens: Optional[int] = Field(default=None, ge=100, le=16384)
class RoutingInfo(BaseModel):
selected_model: str
strategy: str
fallback_chain: list[str]
cost_estimate_usd: Optional[float]
class UsageInfo(BaseModel):
input_tokens: int
output_tokens: int
total_tokens: int
latency_ms: int
cost_usd: Optional[float]
class ChatResponse(BaseModel):
content: str
finish_reason: str
model: str
routing: RoutingInfo
usage: UsageInfo
error: Optional[str] = None
class ModelListResponse(BaseModel):
models: list[dict]
# app/routes/chat.py
from fastapi import APIRouter, HTTPException, Request
from app.models import ChatRequest, ChatResponse, RoutingInfo, UsageInfo, ModelListResponse
from app.core.router import MultiModelRouter
from app.core.model_registry import ModelRegistry
from app.providers.base import ChatMessage
router = APIRouter(prefix="/chat", tags=["多模型路由"])
@router.post("/", response_model=ChatResponse)
async def chat(request: ChatRequest, fastapi_request: Request):
"""多模型路由聊天,支持自动降级"""
try:
# 使用app级单例Router,保持熔断状态在请求间共享
router_instances = fastapi_request.app.state.router_instances
strategy = request.strategy
if strategy not in router_instances:
router_instances[strategy] = MultiModelRouter(strategy=strategy)
router_instance = router_instances[strategy]
# 转换为Provider格式
messages = [
ChatMessage(role=m.role, content=m.content)
for m in request.messages
]
response, decision = await router_instance.route_and_chat(
messages=messages,
temperature=request.temperature,
max_tokens=request.max_tokens,
force_model=request.model,
)
if response.error and not response.content:
raise HTTPException(status_code=502, detail=f"所有模型均不可用: {response.error}")
return ChatResponse(
content=response.content,
finish_reason=response.finish_reason,
model=response.model,
routing=RoutingInfo(
selected_model=decision.selected_model,
strategy=decision.strategy,
fallback_chain=decision.fallback_chain,
cost_estimate_usd=decision.cost_estimate,
),
usage=UsageInfo(
input_tokens=response.input_tokens,
output_tokens=response.output_tokens,
total_tokens=response.input_tokens + response.output_tokens,
latency_ms=response.latency_ms,
cost_usd=decision.cost_estimate, # 复用Router中已计算的费用
),
)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/models", response_model=ModelListResponse)
async def list_models():
"""列出所有可用模型"""
return ModelListResponse(models=ModelRegistry.list_models())
# app/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.routes import chat
from app.core.router import MultiModelRouter
app = FastAPI(title="多模型路由API", version="1.0.0")
# 在app级保存Router单例,保持熔断状态在请求间共享
app.state.router_instances = {}
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(chat.router)
@app.get("/health")
async def health():
return {"status": "ok"}
if __name__ == "__main__":
import uvicorn
uvicorn.run("app.main:app", host="0.0.0.0", port=8000, reload=True)
十、负载均衡(轮询+加权)
# app/core/load_balancer.py
from collections import deque
from dataclasses import dataclass
import threading
import time
@dataclass
class LoadStats:
requests: int = 0
errors: int = 0
avg_latency_ms: float = 0.0
total_latency_ms: float = 0.0
class HealthCache:
"""健康检查缓存,避免每次请求都做health_check"""
def __init__(self, check_interval: int = 60):
self._check_interval = check_interval # 检查间隔(秒)
self._cache: dict[str, tuple[bool, float]] = {} # model -> (healthy, timestamp)
def is_healthy(self, model: str) -> bool | None:
"""返回缓存的健康状态,None表示无缓存"""
if model not in self._cache:
return None
healthy, ts = self._cache[model]
if time.time() - ts > self._check_interval:
return None # 缓存过期
return healthy
def update(self, model: str, healthy: bool) -> None:
self._cache[model] = (healthy, time.time())
class LoadBalancer:
"""带权重的轮询负载均衡"""
def __init__(self, models: list[str]):
self._models = models
self._index = 0
self._lock = threading.Lock()
self._stats: dict[str, LoadStats] = {
m: LoadStats() for m in models
}
def select(self) -> str:
"""选择一个模型(轮询)"""
with self._lock:
model = self._models[self._index]
self._index = (self._index + 1) % len(self._models)
return model
def record_request(self, model: str, latency_ms: int, error: bool = False) -> None:
"""记录请求结果"""
with self._lock:
stat = self._stats[model]
stat.requests += 1
stat.total_latency_ms += latency_ms
if error:
stat.errors += 1
stat.avg_latency_ms = stat.total_latency_ms / stat.requests
def get_stats(self) -> dict:
return {
model: {
"requests": stat.requests,
"errors": stat.errors,
"avg_latency_ms": round(stat.avg_latency_ms, 1),
"error_rate": round(stat.errors / stat.requests, 3) if stat.requests else 0,
}
for model, stat in self._stats.items()
}
十一、踩坑记录总结
| 坑 | 现象 | 解决方案 |
|---|---|---|
| API Key为空导致静默失败 | 模型返回空content但error字段也为空 | 在Provider中先检查API Key是否为空 |
| 降级循环 | 模型A失败→B,B失败→A来回切换 | 加fallback_cooldown冷却时间 |
| DeepSeek和Qwen的Message格式不同 | 格式不兼容时返回400 | 用统一的ChatMessage dataclass做转换 |
| 费用估算不准确 | 账单金额与估算差太多 | 用实际usage字段,不依赖max_tokens估算 |
| 国内访问OpenAI超时 | GPT请求超时 | 设置合理的timeout(60-120s),超时会触发except分支自动降级到下一模型 |
| 路由决策无记录 | 不知道哪个模型回答了问题 | 返回RoutingInfo,包含selected_model和fallback_chain |
| 限流未区分模型 | 共享限流导致某个模型频繁被拒 | 模型级限流(rate_limits配置),每个模型独立计数 |
| DeepSeek-R1发送temperature报错 | R1推理模型不支持temperature参数 | 在YAML中配置supports_temperature: false,Provider中按此字段决定是否传temperature |
十二、总结
本文构建了一套完整的FastAPI多模型路由系统:
- YAML配置驱动:模型列表、定价、能力、限流全在config/models.yaml中管理,改配置不改代码
- Provider模式:DeepSeek/Qwen/OpenAI各自封装,互不耦合,新加Claude/其他模型只加一个类
- 路由策略:成本优先/速度优先/质量优先三种策略可切换
- 自动降级:模型失败自动切下一级,带冷却时间防止震荡;超时/网络错误等异常同样触发降级
- 费用追踪:每次请求返回cost_usd,方便做成本监控
- 健康检查:通过HealthCache缓存健康状态,定期异步检查而非每次请求都检查
关键是配置即策略:改models.yaml里的order就能改变路由优先级,无需改代码。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)