前言:你是在用 AI,还是在伺候 AI?

相信不少独立开发者或技术从业者都有过类似的经历:

早上打开电脑,先登录 OpenAI 查 API 额度,切换浏览器去 Anthropic 调 Claude 写代码,再开一个标签页测 Gemini 的长文本处理能力。三个平台,三套 API Key,三份账单,三种 SDK 写法。

你以为自己在提效,实际上大量时间消耗在了工具切换上下文管理上。

这篇文章要解决的,就是这个问题:如何用一套统一的代码架构,根据任务类型自动调度不同的 AI 模型,实现真正意义上的多模型协作工作流。


一、问题拆解:多模型调度的核心挑战

在构建多模型调度系统之前,先明确我们要解决的三个核心工程问题:

1.1 接口异构性

不同厂商的 API 设计规范不一致。OpenAI 用 chat.completions.create,Anthropic 有自己的 messages.create,Google 又是另一套。每次切换模型,都意味着重写调用逻辑。

1.2 上下文割裂

在不同平台的聊天窗口间复制粘贴,不仅打断开发心流,还会丢失关键的上下文信息。对于需要多轮对话的任务,这个问题尤为突出。

1.3 成本不透明

多平台订阅叠加,Token 消耗难以统一统计,很难做到精细化的成本管控。


二、解决思路:统一入口 + 智能路由

解决上述问题的工程思路其实很清晰:引入一个兼容 OpenAI SDK 的统一 API 网关,在应用层做模型路由,而不是在基础设施层反复折腾。

这样做的好处:

  • 代码复用:全程使用 OpenAI SDK,只需修改 base_urlmodel 参数
  • 路由可控:在业务代码层面定义"什么任务用什么模型",逻辑清晰可维护
  • 成本聚合:Token 消耗统一记录,方便做成本分析

本文使用 88API 作为统一接入层,它兼容 OpenAI SDK,支持 Claude、GPT、Gemini、DeepSeek 等主流模型,适合用来演示多模型调度架构。


三、环境准备

# 安装依赖
pip install openai

无需安装多个 SDK,openai 这一个包就够了。


四、核心代码:构建多模型调度器

下面是一个完整的多模型调度器实现,包含模型映射表、任务分发函数、错误降级逻辑和成本日志记录。

import os
import time
from openai import OpenAI

# ============================================================
# 第一步:初始化统一 API 客户端
# 只需修改 base_url,其余调用方式与官方 OpenAI SDK 完全一致
# ============================================================
client = OpenAI(
    api_key=os.environ.get("API_KEY", "your-api-key-here"),
    base_url="<https://api.88api.shop/v1>"  # 统一入口,支持多模型路由
)

# ============================================================
# 第二步:定义模型映射表(AI 团队分工)
# 根据任务类型选择最适合的模型,兼顾效果与成本
# ============================================================
MODEL_ROUTER = {
    "coder":    "claude-sonnet-4-20250514",  # 代码任务:Claude 在编程基准测试中表现优异
    "writer":   "gpt-4o",                    # 创意写作:GPT 系列在文案生成上更灵活
    "analyst":  "gemini-2.5-pro",            # 长文本分析:Gemini 支持超长上下文窗口
    "intern":   "deepseek-chat",             # 简单任务:DeepSeek 性价比极高,适合批量处理
}

# ============================================================
# 第三步:任务调度核心函数
# 支持自动降级:主模型失败时,自动切换到备用模型
# ============================================================
def dispatch_task(task_type: str, prompt: str, fallback_model: str = "gpt-4o") -> dict:
    """
    多模型任务调度函数

    Args:
        task_type (str): 任务类型,对应 MODEL_ROUTER 中的 key
        prompt    (str): 用户输入的任务描述
        fallback_model (str): 主模型失败时的降级模型

    Returns:
        dict: 包含模型输出内容、使用的模型名称、耗时等信息
    """
    model_id = MODEL_ROUTER.get(task_type, fallback_model)
    print(f"[调度] 任务类型: {task_type} → 分配模型: {model_id}")

    start_time = time.time()

    try:
        response = client.chat.completions.create(
            model=model_id,
            messages=[
                {
                    "role": "system",
                    "content": "你是一个专业的技术助手,请给出准确、简洁的回答。"
                },
                {
                    "role": "user",
                    "content": prompt
                }
            ],
            temperature=0.7,
            max_tokens=1024
        )

        elapsed = round(time.time() - start_time, 2)
        content = response.choices[0].message.content

        # 记录 Token 消耗(用于成本分析)
        usage = response.usage
        print(f"[完成] 模型: {model_id} | 耗时: {elapsed}s | "
              f"Tokens: {usage.prompt_tokens} in / {usage.completion_tokens} out")

        return {
            "success": True,
            "model": model_id,
            "content": content,
            "elapsed": elapsed,
            "tokens_in": usage.prompt_tokens,
            "tokens_out": usage.completion_tokens,
        }

    except Exception as e:
        print(f"[错误] 模型 {model_id} 调用失败: {e}")
        print(f"[降级] 自动切换到备用模型: {fallback_model}")

        # 自动降级逻辑:主模型异常时切换到备用模型重试
        try:
            response = client.chat.completions.create(
                model=fallback_model,
                messages=[{"role": "user", "content": prompt}],
                temperature=0.7
            )
            content = response.choices[0].message.content
            return {
                "success": True,
                "model": fallback_model,
                "content": content,
                "fallback_used": True,
            }
        except Exception as fallback_error:
            return {
                "success": False,
                "error": str(fallback_error),
                "content": None
            }

五、实战演示:三类典型任务的调度场景

有了调度器,下面演示三个实际开发场景中的使用方式:

# ============================================================
# 场景一:代码生成任务 → 派给 Claude
# 适用于:写函数、调试代码、生成单元测试等
# ============================================================
result_code = dispatch_task(
    task_type="coder",
    prompt="用 Python 写一个异步爬取 Hacker News 首页标题的脚本,使用 aiohttp 库"
)

if result_code["success"]:
    print("\\n=== 代码生成结果 ===")
    print(result_code["content"][:300], "...\\n")

# ============================================================
# 场景二:创意文案任务 → 派给 GPT
# 适用于:营销文案、社交媒体内容、产品描述等
# ============================================================
result_copy = dispatch_task(
    task_type="writer",
    prompt="为一款机械键盘写一段小红书风格的种草文案,200字以内,多用 emoji"
)

if result_copy["success"]:
    print("=== 文案生成结果 ===")
    print(result_copy["content"], "\\n")

# ============================================================
# 场景三:批量翻译任务 → 派给 DeepSeek(高性价比)
# 适用于:大批量简单任务,如翻译、数据清洗、格式转换等
# ============================================================
texts_to_translate = [
    "Hello, World!",
    "Open source is the future.",
    "Build in public."
]

print("=== 批量翻译结果 ===")
for text in texts_to_translate:
    result_trans = dispatch_task(
        task_type="intern",
        prompt=f"将以下英文翻译成中文,只输出译文:{text}"
    )
    if result_trans["success"]:
        print(f"  原文: {text}")
        print(f"  译文: {result_trans['content'].strip()}\\n")

六、进阶:成本统计与任务日志

在生产环境中,你还需要对每次 API 调用做成本追踪。下面是一个简单的成本聚合示例:

# 各模型的 Token 单价(美元/1K tokens,仅供示意,实际以平台为准)
TOKEN_PRICE = {
    "claude-sonnet-4-20250514": {"in": 0.003,  "out": 0.015},
    "gpt-4o":                   {"in": 0.005,  "out": 0.015},
    "gemini-2.5-pro":           {"in": 0.00125,"out": 0.005},
    "deepseek-chat":            {"in": 0.00014,"out": 0.00028},
}

def calculate_cost(model: str, tokens_in: int, tokens_out: int) -> float:
    """
    根据 Token 消耗计算本次调用费用(美元)

    Args:
        model     (str): 使用的模型名称
        tokens_in (int): 输入 Token 数量
        tokens_out(int): 输出 Token 数量

    Returns:
        float: 本次调用费用(美元)
    """
    price = TOKEN_PRICE.get(model, {"in": 0.005, "out": 0.015})
    cost = (tokens_in / 1000) * price["in"] + (tokens_out / 1000) * price["out"]
    return round(cost, 6)

# 在 dispatch_task 的返回结果中追加成本字段
def dispatch_with_cost(task_type: str, prompt: str) -> dict:
    result = dispatch_task(task_type, prompt)
    if result.get("success") and "tokens_in" in result:
        result["cost_usd"] = calculate_cost(
            result["model"],
            result["tokens_in"],
            result["tokens_out"]
        )
        print(f"[成本] 本次调用费用: ${result['cost_usd']}")
    return result

通过这套成本追踪机制,你可以清楚地看到:把简单任务路由给 DeepSeek,复杂任务才交给 Claude 或 GPT,整体 Token 成本可以显著降低。


七、架构总结

整个多模型调度系统的核心逻辑可以归纳为三层:

层级 职责 实现方式
接入层 统一 API 入口,屏蔽厂商差异 修改 base_url,复用 OpenAI SDK
路由层 根据任务类型选择最优模型 MODEL_ROUTER 映射表 + dispatch_task 函数
监控层 记录耗时、Token 消耗、成本 usage 字段解析 + calculate_cost 函数

这套架构的扩展性很强:你可以在路由层加入更复杂的规则,比如根据 prompt 长度自动选择支持长上下文的模型,或者根据当前各模型的响应延迟动态切换。


结语

多模型协作不是什么高深的概念,本质上就是把合适的任务交给合适的工具。工程上的关键在于:用统一的接口抹平差异,用清晰的路由逻辑管理分工,用完善的日志追踪成本。

代码层面,整个调度器的核心不超过 80 行,任何有 Python 基础的开发者都可以直接落地。

88API 驱动你的每一行自动化指令:https://api.88api.shop
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐