LLM 多模型编排:从路由策略到智能工作流自动化

cover

一、单一模型的瓶颈与多模型协作的工程需求

在生产环境中依赖单一 LLM 模型存在三个核心问题:成本不可控、延迟不稳定、能力覆盖有限。以一个典型的企业级 AI 应用为例,简单问答使用 GPT-4o 级别模型是资源浪费,而复杂推理任务交给小模型又可能产生幻觉。此外,当某个模型提供商出现服务中断时,整个应用随之瘫痪。

多模型编排的核心思路是:根据任务的复杂度、成本预算和延迟要求,将请求路由到最合适的模型。这不仅是简单的负载均衡,而是基于任务语义的智能分发——让每个模型做它最擅长的事。

二、多模型编排的架构设计与路由机制

多模型编排系统的核心是路由层(Router),它负责接收用户请求、分析任务特征、选择最优模型、转发请求并聚合结果。

graph TB
    A[用户请求] --> B[路由层 Router]
    B --> C{任务分类器}
    C -->|简单问答| D[轻量模型: Qwen2.5-7B]
    C -->|复杂推理| E[旗舰模型: GPT-4o]
    C -->|代码生成| F[代码模型: DeepSeek-Coder]
    C -->|创意写作| G[创作模型: Claude]
    D --> H[结果聚合与后处理]
    E --> H
    F --> H
    G --> H
    H --> I[响应返回]
    B --> J[监控与指标采集]
    J --> K[路由策略动态调整]
    K --> B

路由策略的设计需要考虑以下维度:

任务复杂度:通过 Prompt 长度、是否包含多步推理、是否需要代码执行等特征判断。简单的实现可以使用规则匹配,进阶方案使用轻量分类模型。

成本预算:不同模型的 Token 单价差异可达 10 倍以上。路由层需要根据请求的预估 Token 数和当前预算余额,选择性价比最优的模型。

延迟要求:实时对话场景要求首 Token 延迟低于 500ms,而文档摘要可以容忍数秒延迟。路由层可以根据 SLA 等级选择不同延迟特征的模型。

降级策略:当首选模型不可用时,自动降级到备选模型。降级链的配置需要明确优先级和降级条件。

三、多模型编排框架的实现

以下是一个基于 Python 的多模型编排框架,支持语义路由、成本控制和降级策略:

from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Optional
import logging
import time

logger = logging.getLogger(__name__)


class TaskComplexity(Enum):
    SIMPLE = "simple"
    MODERATE = "moderate"
    COMPLEX = "complex"


class ModelTier(Enum):
    LIGHTWEIGHT = "lightweight"
    STANDARD = "standard"
    FLAGSHIP = "flagship"


@dataclass
class ModelConfig:
    name: str
    tier: ModelTier
    cost_per_1k_input: float
    cost_per_1k_output: float
    avg_latency_ms: int
    max_context: int
    capabilities: list[str] = field(default_factory=list)


@dataclass
class RouteResult:
    model: ModelConfig
    reason: str
    estimated_cost: float
    fallback_chain: list[ModelConfig] = field(default_factory=list)


class ModelRouter:
    """多模型路由器,基于任务特征选择最优模型"""

    def __init__(self, models: list[ModelConfig], budget_limit: float = 100.0):
        self._models = {m.name: m for m in models}
        self._budget_limit = budget_limit
        self._spent = 0.0

    def classify_complexity(self, prompt: str, has_code: bool = False) -> TaskComplexity:
        """基于启发式规则判断任务复杂度"""
        token_estimate = len(prompt) / 2  # 粗略估算中英文混合 Token 数
        if has_code or token_estimate > 2000:
            return TaskComplexity.COMPLEX
        elif token_estimate > 500 or "分析" in prompt or "推理" in prompt:
            return TaskComplexity.MODERATE
        return TaskComplexity.SIMPLE

    def route(self, prompt: str, has_code: bool = False,
              max_latency_ms: int = 5000) -> RouteResult:
        """根据任务特征路由到最优模型"""
        complexity = self.classify_complexity(prompt, has_code)
        candidates = self._select_candidates(complexity, max_latency_ms)

        if not candidates:
            raise RuntimeError("无可用模型满足当前约束条件")

        # 按性价比排序:优先选择成本最低且满足延迟要求的模型
        selected = min(candidates, key=lambda m: m.cost_per_1k_output)
        estimated_cost = self._estimate_cost(prompt, selected)
        fallback = [m for m in candidates if m.name != selected.name]

        if self._spent + estimated_cost > self._budget_limit:
            # 预算不足时降级到最便宜的模型
            cheapest = min(candidates, key=lambda m: m.cost_per_1k_output)
            return RouteResult(
                model=cheapest,
                reason="预算限制降级",
                estimated_cost=self._estimate_cost(prompt, cheapest),
                fallback_chain=[],
            )

        return RouteResult(
            model=selected,
            reason=f"任务复杂度: {complexity.value}",
            estimated_cost=estimated_cost,
            fallback_chain=fallback,
        )

    def _select_candidates(self, complexity: TaskComplexity,
                           max_latency_ms: int) -> list[ModelConfig]:
        """筛选满足复杂度和延迟要求的候选模型"""
        tier_map = {
            TaskComplexity.SIMPLE: [ModelTier.LIGHTWEIGHT, ModelTier.STANDARD],
            TaskComplexity.MODERATE: [ModelTier.STANDARD, ModelTier.FLAGSHIP],
            TaskComplexity.COMPLEX: [ModelTier.FLAGSHIP],
        }
        allowed_tiers = tier_map[complexity]
        return [
            m for m in self._models.values()
            if m.tier in allowed_tiers and m.avg_latency_ms <= max_latency_ms
        ]

    def _estimate_cost(self, prompt: str, model: ModelConfig) -> float:
        """估算单次请求成本"""
        input_tokens = len(prompt) / 2
        output_tokens = input_tokens * 0.5  # 假设输出为输入的 50%
        return (input_tokens * model.cost_per_1k_input / 1000 +
                output_tokens * model.cost_per_1k_output / 1000)


class Orchestrator:
    """工作流编排器,串联路由、调用与后处理"""

    def __init__(self, router: ModelRouter):
        self._router = router
        self._callers: dict[str, Any] = {}  # 模型调用器

    async def execute(self, prompt: str, has_code: bool = False,
                      max_latency_ms: int = 5000) -> dict:
        """执行完整的编排流程"""
        route_result = self._router.route(prompt, has_code, max_latency_ms)
        logger.info(
            f"路由决策: {route_result.model.name}, "
            f"原因: {route_result.reason}, "
            f"预估成本: ${route_result.estimated_cost:.4f}"
        )

        # 尝试首选模型,失败则沿降级链重试
        models_to_try = [route_result.model] + route_result.fallback_chain
        last_error = None

        for model in models_to_try:
            try:
                start = time.monotonic()
                result = await self._call_model(model, prompt)
                latency = (time.monotonic() - start) * 1000
                return {
                    "response": result,
                    "model": model.name,
                    "latency_ms": latency,
                    "cost": route_result.estimated_cost,
                }
            except Exception as e:
                last_error = e
                logger.warning(f"模型 {model.name} 调用失败: {e}, 尝试降级")

        raise RuntimeError(f"所有模型均调用失败: {last_error}")

    async def _call_model(self, model: ModelConfig, prompt: str) -> str:
        """调用指定模型(实际实现对接各模型 API)"""
        caller = self._callers.get(model.name)
        if not caller:
            raise ValueError(f"未注册模型调用器: {model.name}")
        return await caller.generate(prompt)

四、多模型编排的工程权衡

路由精度与延迟的矛盾:使用分类模型做语义路由可以提高分发精度,但分类模型本身的推理延迟(通常 50~200ms)会叠加到总延迟上。对于实时对话场景,这个额外开销可能不可接受。折中方案是使用基于规则的快速路由处理 80% 的常见请求,仅对规则无法判断的请求调用分类模型。

状态一致性:当工作流涉及多个模型依次处理时(如先用轻量模型提取摘要,再用旗舰模型做深度分析),中间状态的格式转换可能引入信息损失。建议在模型间传递结构化的 JSON 而非纯文本,以保留元数据。

成本追踪的精度:Token 计数在请求前只能估算,实际消耗取决于模型分词器的实现。不同模型的分词结果差异可达 20%。生产环境中需要在响应后根据实际的 usage 字段做精确记账,而非依赖预估值。

可观测性:多模型编排引入了更多的故障点。每个模型的调用延迟、错误率、Token 消耗都需要独立监控。推荐使用 OpenTelemetry 的 Span 机制,为每次路由决策和模型调用创建独立的 Span,便于链路追踪和性能分析。

五、总结

多模型编排通过语义路由、成本控制和降级策略,解决了单一模型在生产环境中的成本、延迟和可靠性问题。核心架构包括路由层(负责任务分类与模型选择)、调用层(负责请求转发与重试)和监控层(负责指标采集与策略调整)。在落地时,路由策略应从简单的规则匹配起步,逐步引入分类模型提升精度;成本控制需要结合预估与实际记账;降级链的配置要确保优先级明确且降级条件可观测。最终目标是让每个请求都路由到"够用且最便宜"的模型,而非一味追求最强模型。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐