云原生 LLM 推理服务部署:从模型加载到请求调度的全链路优化

cover

一、LLM 推理的"延迟焦虑":首 Token 要等 5 秒,用户早走了

LLM 推理服务部署到生产环境后,最常见的性能问题是首 Token 延迟(TTFT)过高。模型加载需要 10-30 秒,首次推理需要 2-5 秒预热,用户等不了这么久。更棘手的是,LLM 推理是计算密集型+内存密集型——KV Cache 占用大量 GPU 内存,并发请求时内存不够用,被迫排队等待。

全链路优化的核心是"从模型加载到请求调度,每个环节都减少延迟"。模型预热减少首次推理延迟,KV Cache 管理减少内存占用,连续批处理(Continuous Batching)提高吞吐量,请求调度优先级保证高优请求先处理。

二、全链路优化架构

graph TB
    subgraph 模型加载优化
        A[模型分片加载<br/>Tensor Parallel] --> B[预热推理<br/>首次请求无冷启动]
        B --> C[模型缓存<br/>内存映射加载]
    end

    subgraph 推理优化
        C --> D[Continuous Batching<br/>动态组批]
        D --> E[KV Cache 管理<br/>PagedAttention]
        E --> F[流式输出<br/>Token-by-Token]
    end

    subgraph 调度优化
        F --> G[优先级队列<br/>高优请求先处理]
        G --> H[负载均衡<br/>最少连接路由]
        H --> I[弹性扩缩容<br/>GPU 感知 HPA]
    end

优化分三层:模型加载(分片+预热+缓存)、推理执行(连续批处理+KV Cache+流式输出)、请求调度(优先级+负载均衡+弹性扩缩)。每层优化独立,组合效果叠加。

三、实现

3.1 模型预热与缓存

import time
from typing import Optional

class ModelWarmup:
    """模型预热:消除首次推理延迟"""

    def __init__(self, model_loader):
        self.model_loader = model_loader
        self.model = None
        self.warmup_prompt = "Hello, this is a warmup request."

    def warmup(self, max_retries: int = 3) -> dict:
        """执行预热推理"""
        start = time.time()

        for attempt in range(max_retries):
            try:
                if self.model is None:
                    self.model = self.model_loader.load()

                # 执行一次短推理,触发所有懒初始化
                _ = self.model.generate(
                    self.warmup_prompt, max_tokens=10
                )

                elapsed = time.time() - start
                return {
                    'status': 'success',
                    'warmup_time': f'{elapsed:.2f}s',
                    'attempt': attempt + 1,
                }
            except Exception as e:
                if attempt == max_retries - 1:
                    return {
                        'status': 'failed',
                        'error': str(e),
                        'attempts': max_retries,
                    }
                time.sleep(1)

        return {'status': 'failed', 'error': 'max retries exceeded'}


class ModelCache:
    """模型缓存:使用内存映射加速加载"""

    def __init__(self, cache_dir: str = "/tmp/model_cache"):
        self.cache_dir = cache_dir
        self.loaded_models = {}

    def get_or_load(
        self, model_name: str, loader_fn
    ) -> object:
        """获取缓存的模型或加载新模型"""
        if model_name in self.loaded_models:
            return self.loaded_models[model_name]

        model = loader_fn(model_name)
        self.loaded_models[model_name] = model
        return model

    def evict(self, model_name: str) -> bool:
        """淘汰模型释放 GPU 内存"""
        if model_name in self.loaded_models:
            del self.loaded_models[model_name]
            # 触发 GPU 内存回收
            import torch
            if torch.cuda.is_available():
                torch.cuda.empty_cache()
            return True
        return False

3.2 连续批处理调度器

from dataclasses import dataclass
from typing import List, Optional
from collections import deque

@dataclass
class InferenceRequest:
    """推理请求"""
    request_id: str
    prompt: str
    max_tokens: int = 512
    priority: int = 0        # 0=最高
    created_at: float = 0.0

class ContinuousBatchScheduler:
    """连续批处理调度器"""

    def __init__(self, max_batch_size: int = 8):
        self.max_batch_size = max_batch_size
        self.pending: List[InferenceRequest] = []
        self.running: List[InferenceRequest] = []

    def add_request(self, request: InferenceRequest) -> None:
        """添加推理请求"""
        import time
        request.created_at = time.time()
        self.pending.append(request)
        # 按优先级排序(优先级数字越小越优先)
        self.pending.sort(key=lambda r: (r.priority, r.created_at))

    def schedule_batch(self) -> List[InferenceRequest]:
        """调度下一批推理请求"""
        available_slots = self.max_batch_size - len(self.running)

        if available_slots <= 0 or not self.pending:
            return []

        batch = self.pending[:available_slots]
        self.pending = self.pending[available_slots:]
        self.running.extend(batch)

        return batch

    def complete_request(
        self, request_id: str
    ) -> Optional[InferenceRequest]:
        """标记请求完成"""
        for i, req in enumerate(self.running):
            if req.request_id == request_id:
                return self.running.pop(i)
        return None

    def get_queue_depth(self) -> int:
        """获取队列深度"""
        return len(self.pending)

    def get_estimated_wait(
        self, priority: int = 0
    ) -> float:
        """估算等待时间"""
        # 同优先级的前面有多少请求
        ahead = sum(
            1 for r in self.pending
            if r.priority <= priority
        )
        # 每批处理 max_batch_size 个,每批约 2 秒
        batches = (ahead + self.max_batch_size - 1) // self.max_batch_size
        return batches * 2.0  # 秒

3.3 KV Cache 管理

class KVCacheManager:
    """KV Cache 管理器:PagedAttention 简化实现"""

    def __init__(
        self,
        total_memory_gb: float = 40,
        page_size: int = 16,    # 每个 page 的 token 数
        page_memory_mb: float = 0.5,  # 每个 page 的内存
    ):
        self.page_size = page_size
        self.page_memory_mb = page_memory_mb
        total_pages = int(
            total_memory_gb * 1024 / page_memory_mb
        )
        # 空闲页面池
        self.free_pages = list(range(total_pages))
        # 每个请求占用的页面
        self.request_pages = {}

    def allocate(
        self, request_id: str, num_tokens: int
    ) -> List[int]:
        """为请求分配 KV Cache 页面"""
        num_pages = (num_tokens + self.page_size - 1) // self.page_size

        if len(self.free_pages) < num_pages:
            # 内存不足,尝试抢占低优先级请求的页面
            freed = self._preempt_low_priority(num_pages - len(self.free_pages))
            if len(self.free_pages) < num_pages:
                raise MemoryError(
                    f"KV Cache 不足: 需要 {num_pages} 页, "
                    f"可用 {len(self.free_pages)} 页"
                )

        pages = self.free_pages[:num_pages]
        self.free_pages = self.free_pages[num_pages:]
        self.request_pages[request_id] = pages
        return pages

    def release(self, request_id: str) -> int:
        """释放请求的 KV Cache 页面"""
        if request_id not in self.request_pages:
            return 0

        pages = self.request_pages.pop(request_id)
        self.free_pages.extend(pages)
        return len(pages)

    def _preempt_low_priority(
        self, num_needed: int
    ) -> int:
        """抢占低优先级请求的页面"""
        freed = 0
        # 按 token 数降序抢占(大请求优先释放)
        sorted_requests = sorted(
            self.request_pages.items(),
            key=lambda x: len(x[1]),
            reverse=True,
        )

        for req_id, pages in sorted_requests:
            if freed >= num_needed:
                break
            self.free_pages.extend(pages)
            del self.request_pages[req_id]
            freed += len(pages)

        return freed

    def get_utilization(self) -> float:
        """获取 KV Cache 利用率"""
        total = len(self.free_pages) + sum(
            len(p) for p in self.request_pages.values()
        )
        used = sum(len(p) for p in self.request_pages.values())
        return used / total if total > 0 else 0.0

四、LLM 推理优化的 Trade-offs 分析

批处理大小 vs. 延迟:大 batch 提高吞吐量但增加延迟(等待凑批),小 batch 延迟低但吞吐量差。建议 max_batch_size=8-16,配合 continuous batching 动态组批——有请求就处理,不等凑满。

KV Cache 容量 vs. 并发数:KV Cache 越大,支持的并发请求越多,但 GPU 内存有限。40GB 显存的 A100,KV Cache 大约占 20-30GB(模型权重占 10-15GB),可支持约 50-100 个并发请求(取决于序列长度)。超出容量的请求需要排队或抢占。

流式输出 vs. 完整输出:流式输出(Server-Sent Events)让用户逐 Token 看到结果,体感延迟低,但实现复杂(需要管理部分生成的 KV Cache)。完整输出实现简单,但用户要等全部生成完才能看到。建议所有对话场景使用流式输出。

量化精度 vs. 推理速度:INT4 量化将推理速度提升 2-3 倍,但精度损失 2-5%。对话场景对精度容忍度较高(用户不会逐字对比),建议使用 GPTQ/AWQ 量化。摘要和翻译场景对精度更敏感,建议 INT8 或 FP16。

五、总结

LLM 推理服务的全链路优化从模型加载、推理执行到请求调度三个层面入手。模型预热消除冷启动延迟,Continuous Batching 提高吞吐量,PagedAttention 管理 KV Cache 内存,优先级调度保证高优请求先处理。

落地建议:部署时先执行模型预热(至少 1 次推理),消除首次请求延迟。使用 vLLM 或 TGI 等推理框架(内置 Continuous Batching 和 PagedAttention),不要自己实现。GPU 感知 HPA 根据队列深度和 GPU 利用率自动扩缩容。流式输出作为默认模式。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐