云原生 AI 推理服务:从 GPU 调度到模型热加载的弹性部署

cover

一、GPU 资源的浪费困境:推理服务的成本黑洞

AI 推理服务在云原生环境下面临一个核心矛盾:GPU 资源昂贵但利用率极低。生产环境中,推理服务的请求量通常存在明显的峰谷波动——白天高峰期 GPU 利用率可能达到 80%,而夜间低谷期利用率可能不足 10%。但 GPU 不能像 CPU 那样通过超卖来提升利用率,因为 GPU 显存是独占式分配的,一个模型加载后即使没有请求也在占用显存。

更深层的问题是,K8s 原生的调度器对 GPU 的支持非常有限:它只能感知 GPU 设备的有无,无法感知显存使用率、算力利用率和模型加载状态。这导致调度决策粗糙——一个已经加载了 7B 模型、显存占用 14GB 的 GPU 节点,K8s 仍然认为它"有 GPU 可用",可能将另一个需要 8GB 显存的模型调度到该节点,导致 OOM。

二、GPU 感知调度的架构设计

解决 GPU 资源浪费的核心思路是引入 GPU 感知调度器,替代 K8s 默认调度器的 GPU 分配逻辑。

flowchart TB
    subgraph 调度层
        A[推理服务 Pod] --> B[K8s 调度器]
        B --> C{GPU 感知扩展}
        C -->|默认调度| D[仅感知 GPU 数量]
        C -->|扩展调度| E[感知显存/算力/模型状态]
    end

    subgraph 资源管理层
        E --> F[GPU Device Plugin]
        F --> G[显存使用率采集]
        F --> H[算力利用率采集]
        G --> I[调度决策引擎]
        H --> I
    end

    subgraph 模型服务层
        I --> J[模型热加载器]
        J --> K[显存池化管理]
        K --> L[模型 A: 7B]
        K --> M[模型 B: 13B]
        K --> N[模型 C: 70B]
    end

    subgraph 弹性伸缩层
        I --> O[HPA: 基于队列深度]
        O --> P[缩容: 卸载低优先级模型]
        O --> Q[扩容: 加载新模型到空闲 GPU]
    end

GPU Device Plugin 是 K8s 与 GPU 硬件之间的桥梁。NVIDIA 官方提供的 device plugin 只报告 GPU 数量,我们需要扩展它以报告显存使用率和算力利用率。实现方式是在 device plugin 的 Allocate 回调中注入显存限制参数,并通过 DeviceHealth 字段报告实时状态。

模型热加载器解决模型冷启动问题。传统方式下,Pod 启动后才加载模型,冷启动时间可能长达数分钟。热加载器在 GPU 节点上维护一个模型缓存池,当新模型需要部署时,直接从缓存池加载到显存,将冷启动时间从分钟级压缩到秒级。

弹性伸缩基于请求队列深度而非 CPU 利用率。推理服务的瓶颈在 GPU 而非 CPU,CPU 利用率无法反映真实的负载情况。队列深度(等待处理的请求数)是更直接的指标。

三、GPU 感知调度与模型热加载的实现

import time
import threading
from dataclasses import dataclass, field
from typing import Optional
from collections import defaultdict


@dataclass
class GPUDevice:
    """GPU 设备状态"""
    node_name: str
    gpu_index: int
    total_memory_mb: int
    used_memory_mb: int = 0
    compute_utilization: float = 0.0  # 0.0 - 1.0
    loaded_models: list[str] = field(default_factory=list)


@dataclass
class ModelProfile:
    """模型资源画像"""
    model_name: str
    memory_required_mb: int
    compute_required: float  # 0.0 - 1.0,估算的算力占比
    priority: int = 0  # 优先级,数值越高越重要
    loading_time_seconds: float = 30.0  # 冷加载时间


class GPUScheduler:
    """GPU 感知调度器"""

    def __init__(self):
        self.devices: dict[str, list[GPUDevice]] = defaultdict(list)
        self.model_profiles: dict[str, ModelProfile] = {}
        self._lock = threading.Lock()

    def register_device(self, device: GPUDevice):
        """注册 GPU 设备"""
        with self._lock:
            self.devices[device.node_name].append(device)

    def register_model(self, profile: ModelProfile):
        """注册模型资源画像"""
        self.model_profiles[profile.model_name] = profile

    def update_device_status(
        self,
        node_name: str,
        gpu_index: int,
        used_memory_mb: int,
        compute_utilization: float,
        loaded_models: list[str],
    ):
        """更新 GPU 设备状态(由 Device Plugin 定期上报)"""
        with self._lock:
            for dev in self.devices.get(node_name, []):
                if dev.gpu_index == gpu_index:
                    dev.used_memory_mb = used_memory_mb
                    dev.compute_utilization = compute_utilization
                    dev.loaded_models = list(loaded_models)
                    break

    def schedule(self, model_name: str) -> Optional[tuple[str, int]]:
        """为模型选择最优 GPU 节点,返回 (node_name, gpu_index)"""
        profile = self.model_profiles.get(model_name)
        if not profile:
            raise ValueError(f"模型 {model_name} 未注册")

        with self._lock:
            best_device = None
            best_score = -1

            for node_name, device_list in self.devices.items():
                for dev in device_list:
                    available_memory = dev.total_memory_mb - dev.used_memory_mb
                    # 硬性约束:显存必须足够
                    if available_memory < profile.memory_required_mb:
                        continue
                    # 软性约束:算力利用率不超过 80%
                    if dev.compute_utilization + profile.compute_required > 0.8:
                        continue

                    # 评分:优先选择已加载同模型的设备(热加载)
                    score = available_memory / dev.total_memory_mb
                    if model_name in dev.loaded_models:
                        score += 10.0  # 热加载大幅加分

                    if score > best_score:
                        best_score = score
                        best_device = (node_name, dev.gpu_index)

            return best_device


class ModelHotLoader:
    """模型热加载器,管理 GPU 显存中的模型缓存"""

    def __init__(self, scheduler: GPUScheduler):
        self.scheduler = scheduler
        # 模型缓存:model_name → (node_name, gpu_index, load_time)
        self._cache: dict[str, tuple[str, int, float]] = {}
        self._lock = threading.Lock()

    def load_model(self, model_name: str) -> tuple[str, int, float]:
        """加载模型到 GPU,返回 (节点, GPU索引, 加载耗时)"""
        # 检查缓存中是否已有该模型
        with self._lock:
            if model_name in self._cache:
                node, gpu, load_time = self._cache[model_name]
                return node, gpu, 0.0  # 热加载,耗时接近 0

        # 调度到最优 GPU
        target = self.scheduler.schedule(model_name)
        if target is None:
            raise RuntimeError(f"无可用的 GPU 节点加载模型 {model_name}")

        node_name, gpu_index = target
        profile = self.scheduler.model_profiles[model_name]

        # 模拟模型加载过程
        start = time.time()
        # 实际实现中,这里调用模型服务 API 触发加载
        time.sleep(0.1)  # 热加载场景下,从缓存池加载只需毫秒级
        elapsed = time.time() - start

        # 更新缓存和设备状态
        with self._lock:
            self._cache[model_name] = (node_name, gpu_index, elapsed)
            self.scheduler.update_device_status(
                node_name, gpu_index,
                used_memory_mb=self._get_device(node_name, gpu_index).used_memory_mb + profile.memory_required_mb,
                compute_utilization=self._get_device(node_name, gpu_index).compute_utilization,
                loaded_models=self._get_device(node_name, gpu_index).loaded_models + [model_name],
            )

        return node_name, gpu_index, elapsed

    def unload_model(self, model_name: str):
        """卸载模型,释放显存"""
        with self._lock:
            if model_name not in self._cache:
                return
            node_name, gpu_index, _ = self._cache.pop(model_name)

        profile = self.scheduler.model_profiles[model_name]
        dev = self._get_device(node_name, gpu_index)
        self.scheduler.update_device_status(
            node_name, gpu_index,
            used_memory_mb=max(0, dev.used_memory_mb - profile.memory_required_mb),
            compute_utilization=dev.compute_utilization,
            loaded_models=[m for m in dev.loaded_models if m != model_name],
        )

    def _get_device(self, node_name: str, gpu_index: int) -> GPUDevice:
        for dev in self.devices.get(node_name, []):
            if dev.gpu_index == gpu_index:
                return dev
        raise ValueError(f"设备不存在: {node_name}/{gpu_index}")

    @property
    def devices(self):
        return self.scheduler.devices

关键设计决策:

  1. 显存硬约束 + 算力软约束:显存不足直接拒绝调度(硬约束),算力利用率超过 80% 只是降低评分(软约束),因为算力可以分时复用。

  2. 热加载优先:如果目标 GPU 已经加载了同模型,评分大幅加分。这避免了重复加载的显存浪费和冷启动延迟。

  3. 模型卸载释放显存:缩容时优先卸载低优先级模型,释放显存给高优先级模型使用。

四、GPU 感知调度的代价与适用边界

Device Plugin 的维护成本。自定义 Device Plugin 需要与 K8s 调度器深度集成,K8s 版本升级时可能需要适配。NVIDIA 官方的 GPU 时间分片(Time-Slicing)和 MPS(Multi-Process Service)方案可以作为替代,但粒度更粗。

模型热加载的显存碎片。频繁加载和卸载模型会导致显存碎片化,即使总剩余显存足够,也可能无法找到连续的显存块加载大模型。解决方案是在模型服务层实现显存池化管理,预分配固定大小的显存块。

弹性伸缩的冷启动问题。即使有热加载器,首次部署到新节点时仍需从磁盘加载模型,冷启动时间可能长达数分钟。对于延迟敏感型服务,建议保持最低 1 个副本常驻,避免缩容到零。

适用边界:GPU 感知调度适合"多模型共享 GPU、请求量有明显峰谷"的场景。对于"单模型独占 GPU、请求量稳定"的场景,K8s 默认调度器已经足够。

五、总结

云原生 AI 推理服务的核心挑战是 GPU 资源的低效利用。GPU 感知调度器通过感知显存使用率和算力利用率,替代 K8s 默认调度器的粗粒度 GPU 分配。模型热加载器通过显存池化和缓存机制,将冷启动时间从分钟级压缩到秒级。弹性伸缩基于请求队列深度而非 CPU 利用率,更准确地反映推理服务的真实负载。落地时需注意三个边界:第一,自定义 Device Plugin 的维护成本需要与 K8s 版本升级节奏对齐;第二,显存碎片化需要通过池化管理解决;第三,延迟敏感型服务应保持最低副本数,避免缩容到零的冷启动风险。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐