一、显存碎片从哪来

1.1 碎片的两种形态

外部碎片——总空闲内存够用,但不连续。比如有 4 块 128MB 空闲,但需要一块 512MB 的连续内存,分配失败。

内部碎片——分配器按固定大小的块分配,实际使用的比分配的小。比如分配 400KB,分配器给 512KB,浪费了 112KB。

1.2 为什么 NPU 显存碎片更严重

  • 批量推理:不同 batch size 需要不同大小的 buffer,反复分配释放
  • 动态 Shape:每次推理的中间 tensor 大小不同,无法复用
  • 多模型:多个模型交替执行,显存需求波动大

以一个典型推理服务为例:

时间线:
0s:   分配模型A权重 200MB → 空闲 600MB (连续)
1s:   分配激活缓冲 100MB → 空闲 500MB (连续)
2s:   释放缓冲 → 空闲 500MB (连续)
3s:   分配模型B权重 300MB → 空闲 200MB (连续)
4s:   分配激活缓冲 150MB → 空闲 50MB
5s:   释放模型B权重 → 空闲 350MB (碎片: 150MB + 200MB)
6s:   分配 250MB → 失败! (碎片无法满足)

二、池化分配器设计

2.1 核心思想

预分配一大块显存,自己管理分配和释放,避免频繁调用系统 API。同时维护不同大小的空闲块列表,减少碎片。

import threading
from collections import defaultdict


class NPUAllocator:
    """NPU 显存池化分配器

    设计要点:
    1. 预分配: 启动时一次性申请整块显存
    2. 大小类: 按 2 的幂分级管理,减少内部碎片
    3. 空闲链表: 每个大小类维护空闲块链表,O(1) 分配
    4. 线程安全: 推理服务多线程调用,需要加锁

    为什么用 2 的幂分级?
    - 分配 400KB → 给 512KB 块,内部碎片率约 22%
    - 如果用 1KB 粒度 → 碎片率 < 0.5%,但管理开销大
    - 2 的幂是折中:碎片率可控,管理高效
    """

    def __init__(self, total_size=8 * 1024 * 1024 * 1024):  # 默认 8GB
        self.total_size = total_size
        self.used_size = 0
        self.lock = threading.Lock()

        # 按大小类管理空闲块
        # key = 块大小 (2 的幂), value = 空闲块列表
        self.free_blocks = defaultdict(list)

        # 地址 → 大小 映射(用于释放时查找)
        self.block_map = {}

        # 预分配的整块内存(模拟)
        self._pool = bytearray(total_size)
        self._base_addr = id(self._pool)

        # 初始化: 把整块内存加入空闲列表
        self._add_free_block(self._base_addr, total_size)

    def _next_power_of_2(self, size):
        """将大小向上取到 2 的幂"""
        power = 1
        while power < size:
            power <<= 1
        return power

    def _add_free_block(self, addr, size):
        self.free_blocks[size].append(addr)

    def allocate(self, size):
        """分配内存

        策略:
        1. 找到 >= size 的最小 2 的幂大小类
        2. 从该类的空闲链表取一块
        3. 如果没有,尝试拆分更大的块
        4. 如果都没有,触发碎片整理
        """
        with self.lock:
            aligned_size = self._next_power_of_2(size)

            # 尝试从对应大小类分配
            block = self._try_allocate(aligned_size)
            if block is not None:
                addr, block_size = block
                self.block_map[addr] = block_size
                self.used_size += block_size
                return addr

            # 尝试从更大的块拆分
            block = self._split_allocate(aligned_size)
            if block is not None:
                addr, block_size = block
                self.block_map[addr] = block_size
                self.used_size += block_size
                return addr

            # 内存不足
            raise MemoryError(
                f"显存分配失败: 需要 {size} bytes, "
                f"已用 {self.used_size}/{self.total_size}"
            )

    def _try_allocate(self, aligned_size):
        """从指定大小类尝试分配"""
        if self.free_blocks[aligned_size]:
            addr = self.free_blocks[aligned_size].pop()
            return addr, aligned_size
        return None

    def _split_allocate(self, aligned_size):
        """从更大的空闲块拆分"""
        for block_size in sorted(self.free_blocks.keys()):
            if block_size > aligned_size and self.free_blocks[block_size]:
                # 拆分: 一大变两小
                addr = self.free_blocks[block_size].pop()
                remaining_size = block_size - aligned_size

                # 剩余部分放回空闲列表
                self._add_free_block(addr + aligned_size, remaining_size)

                return addr, aligned_size
        return None

    def free(self, addr):
        """释放内存

        关键操作: 尝试合并相邻的空闲块,减少外部碎片。
        """
        with self.lock:
            if addr not in self.block_map:
                return

            block_size = self.block_map.pop(addr)
            self.used_size -= block_size

            # 尝试合并相邻空闲块
            merged_addr, merged_size = self._try_merge(addr, block_size)
            self._add_free_block(merged_addr, merged_size)

    def _try_merge(self, addr, size):
        """尝试与相邻空闲块合并

        这是减少外部碎片的核心。
        每次释放时检查左右两侧是否有空闲块,有则合并。
        """
        merged_addr = addr
        merged_size = size

        # 检查右侧相邻块
        right_addr = addr + size
        for block_size, addrs in list(self.free_blocks.items()):
            for a in addrs:
                if a == right_addr:
                    merged_size += block_size
                    addrs.remove(a)
                    break

        # 检查左侧相邻块
        left_addr = addr - merged_size
        for block_size, addrs in list(self.free_blocks.items()):
            for a in addrs:
                if a + block_size == left_addr:
                    merged_addr = a
                    merged_size += block_size
                    addrs.remove(a)
                    break

        return merged_addr, merged_size

    def stats(self):
        """返回显存使用统计"""
        with self.lock:
            free_size = self.total_size - self.used_size
            return {
                'total': self.total_size,
                'used': self.used_size,
                'free': free_size,
                'usage_rate': f"{self.used_size / self.total_size * 100:.1f}%",
                'fragment_count': len(self.block_map),
            }

2.2 使用示例

# 初始化 8GB 显存池
allocator = NPUAllocator(total_size=8 * 1024 * 1024 * 1024)

# 分配模型权重
weight_buf = allocator.allocate(200 * 1024 * 1024)  # 200MB
input_buf = allocator.allocate(100 * 1024 * 1024)   # 100MB

# 使用后释放
allocator.free(input_buf)

# 查看统计
print(allocator.stats())
# {'total': 8589934592, 'used': 209715200, 'free': 8380219392, ...}

三、碎片整理策略

3.1 紧凑式碎片整理

把所有已分配的块移到一端,空闲空间连成一片。需要暂停所有推理任务。

def compact(self):
    """紧凑式碎片整理

    步骤:
    1. 暂停推理(阻止新分配)
    2. 计算所有已分配块的新地址(连续排列)
    3. 搬移数据到新地址
    4. 更新地址映射
    5. 恢复推理

    代价:
    - 暂停时间与已分配块数量成正比
    - 数据搬移消耗带宽
    - 适合在低峰期执行
    """
    with self.lock:
        # 收集所有已分配块
        allocated = sorted(self.block_map.items(), key=lambda x: x[0])

        # 计算新地址
        new_addr = self._base_addr
        addr_mapping = {}  # 旧地址 → 新地址

        for old_addr, block_size in allocated:
            addr_mapping[old_addr] = new_addr
            new_addr += block_size

        # 搬移数据(模拟)
        for old_addr, new_addr in addr_mapping.items():
            # 实际中需要用 memcpy 或 DMA 搬移
            pass

        # 更新映射
        self.block_map.clear()
        for old_addr, new_addr in addr_mapping.items():
            self.block_map[new_addr] = self.block_map.get(old_addr, 0)

        # 重置空闲列表
        self.free_blocks.clear()
        free_start = new_addr
        free_size = self._base_addr + self.total_size - free_start
        if free_size > 0:
            self._add_free_block(free_start, free_size)

3.2 增量式碎片整理

不需要暂停,每次释放时主动合并。这是日常策略。

def incremental_compact(self):
    """增量式碎片整理

    在每次 free() 时自动执行,不需要暂停。
    通过合并相邻空闲块,逐步减少碎片。
    这是日常策略,大部分场景够用。
    """
    # 已经在 free() 的 _try_merge 中实现了
    pass

3.3 策略对比

策略 适用场景 暂停时间 碎片消除效果
增量合并 日常运行 中等
紧凑整理 低峰期 最佳
重新分配 模型切换 中等

四、针对推理场景的优化

4.1 预分配策略

推理服务启动时就预估好需要多少显存,一次性分配到位。

def preallocate_for_inference(model_configs):
    """根据模型配置预分配显存

    推理服务的显存需求相对可预测:
    - 模型权重: 固定大小,启动时确定
    - KV Cache: 最大序列长度 × batch size × 层数
    - 激活缓冲: 最大输入 shape × 中间层最大宽度
    """
    allocator = NPUAllocator()

    for config in model_configs:
        # 预分配权重缓冲
        weight_buf = allocator.allocate(config['weight_size'])

        # 预分配 KV Cache(按最大序列长度)
        kv_cache_size = (
            config['max_seq_len'] *
            config['max_batch'] *
            config['num_layers'] *
            config['hidden_dim'] *
            2  # K 和 V
        )
        kv_buf = allocator.allocate(kv_cache_size)

    return allocator

4.2 多级缓存

把显存分成几个区域,优先用小的、快的区域。

class MultiLevelCache:
    """多级显存缓存

    L1: 寄存器文件 — 最快,容量最小(几十 KB)
    L2: SRAM 缓存 — 快,容量中等(几百 KB)
    L3: HBM 显存 — 慢,容量大(GB 级)

    策略: 频繁访问的数据放 L1/L2,不频繁的放 L3。
    """
    def __init__(self):
        self.l1 = {}  # 寄存器级缓存
        self.l2 = {}  # SRAM 级缓存
        self.l3 = {}  # HBM 级缓存

    def get(self, key):
        if key in self.l1:
            return self.l1[key]
        if key in self.l2:
            val = self.l2.pop(key)
            self.l1[key] = val
            return val
        if key in self.l3:
            val = self.l3.pop(key)
            self.l2[key] = val
            return val
        return None

    def put(self, key, value, level='l3'):
        if level == 'l1':
            self.l1[key] = value
        elif level == 'l2':
            self.l2[key] = value
        else:
            self.l3[key] = value

五、监控与诊断

class MemoryMonitor:
    """显存使用监控

    持续跟踪显存使用情况,发现异常及时告警。
    """
    def __init__(self, allocator, threshold=0.9):
        self.allocator = allocator
        self.threshold = threshold
        self.history = []

    def check(self):
        stats = self.allocator.stats()
        self.history.append(stats)

        usage_rate = stats['used'] / stats['total']

        if usage_rate > self.threshold:
            print(f"[WARN] 显存使用率 {usage_rate:.1%} 超过阈值 {self.threshold:.0%}")

        return stats

    def report(self):
        """生成显存使用报告"""
        if not self.history:
            return "无数据"

        latest = self.history[-1]
        peak_used = max(h['used'] for h in self.history)

        report = f"""
显存使用报告:
  总量: {latest['total'] / 1024**3:.1f} GB
  当前使用: {latest['used'] / 1024**3:.1f} GB ({latest['usage_rate']})
  峰值使用: {peak_used / 1024**3:.1f} GB ({peak_used / latest['total']:.1%})
  碎片数: {latest['fragment_count']}
"""
        return report

六、常见问题

问题 原因 解决方案
OOM 但总空闲够用 外部碎片严重 执行紧凑式碎片整理
分配速度变慢 空闲链表太长 调整大小类粒度,减少链表长度
显存泄漏 忘记释放 buffer 用 MemoryMonitor 监控,定期检查
推理延迟抖动 碎片整理暂停了推理 用增量式整理,避免暂停

相关仓库

  • CANN - 昇腾计算架构 https://gitee.com/ascend/cann
  • jemalloc - 高性能内存分配器 https://github.com/jemalloc/jemalloc
  • tcmalloc - Google 线程级内存分配器 https://github.com/google/tcmalloc
  • PoolAllocator - 虚拟显存池化分配器 https://github.com/vllm-project/vllm
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐