CANN-NPU 显存回收策略:内存碎片整理与显存池化机制实战
·
一、显存碎片从哪来
1.1 碎片的两种形态
外部碎片——总空闲内存够用,但不连续。比如有 4 块 128MB 空闲,但需要一块 512MB 的连续内存,分配失败。
内部碎片——分配器按固定大小的块分配,实际使用的比分配的小。比如分配 400KB,分配器给 512KB,浪费了 112KB。
1.2 为什么 NPU 显存碎片更严重
- 批量推理:不同 batch size 需要不同大小的 buffer,反复分配释放
- 动态 Shape:每次推理的中间 tensor 大小不同,无法复用
- 多模型:多个模型交替执行,显存需求波动大
以一个典型推理服务为例:
时间线:
0s: 分配模型A权重 200MB → 空闲 600MB (连续)
1s: 分配激活缓冲 100MB → 空闲 500MB (连续)
2s: 释放缓冲 → 空闲 500MB (连续)
3s: 分配模型B权重 300MB → 空闲 200MB (连续)
4s: 分配激活缓冲 150MB → 空闲 50MB
5s: 释放模型B权重 → 空闲 350MB (碎片: 150MB + 200MB)
6s: 分配 250MB → 失败! (碎片无法满足)
二、池化分配器设计
2.1 核心思想
预分配一大块显存,自己管理分配和释放,避免频繁调用系统 API。同时维护不同大小的空闲块列表,减少碎片。
import threading
from collections import defaultdict
class NPUAllocator:
"""NPU 显存池化分配器
设计要点:
1. 预分配: 启动时一次性申请整块显存
2. 大小类: 按 2 的幂分级管理,减少内部碎片
3. 空闲链表: 每个大小类维护空闲块链表,O(1) 分配
4. 线程安全: 推理服务多线程调用,需要加锁
为什么用 2 的幂分级?
- 分配 400KB → 给 512KB 块,内部碎片率约 22%
- 如果用 1KB 粒度 → 碎片率 < 0.5%,但管理开销大
- 2 的幂是折中:碎片率可控,管理高效
"""
def __init__(self, total_size=8 * 1024 * 1024 * 1024): # 默认 8GB
self.total_size = total_size
self.used_size = 0
self.lock = threading.Lock()
# 按大小类管理空闲块
# key = 块大小 (2 的幂), value = 空闲块列表
self.free_blocks = defaultdict(list)
# 地址 → 大小 映射(用于释放时查找)
self.block_map = {}
# 预分配的整块内存(模拟)
self._pool = bytearray(total_size)
self._base_addr = id(self._pool)
# 初始化: 把整块内存加入空闲列表
self._add_free_block(self._base_addr, total_size)
def _next_power_of_2(self, size):
"""将大小向上取到 2 的幂"""
power = 1
while power < size:
power <<= 1
return power
def _add_free_block(self, addr, size):
self.free_blocks[size].append(addr)
def allocate(self, size):
"""分配内存
策略:
1. 找到 >= size 的最小 2 的幂大小类
2. 从该类的空闲链表取一块
3. 如果没有,尝试拆分更大的块
4. 如果都没有,触发碎片整理
"""
with self.lock:
aligned_size = self._next_power_of_2(size)
# 尝试从对应大小类分配
block = self._try_allocate(aligned_size)
if block is not None:
addr, block_size = block
self.block_map[addr] = block_size
self.used_size += block_size
return addr
# 尝试从更大的块拆分
block = self._split_allocate(aligned_size)
if block is not None:
addr, block_size = block
self.block_map[addr] = block_size
self.used_size += block_size
return addr
# 内存不足
raise MemoryError(
f"显存分配失败: 需要 {size} bytes, "
f"已用 {self.used_size}/{self.total_size}"
)
def _try_allocate(self, aligned_size):
"""从指定大小类尝试分配"""
if self.free_blocks[aligned_size]:
addr = self.free_blocks[aligned_size].pop()
return addr, aligned_size
return None
def _split_allocate(self, aligned_size):
"""从更大的空闲块拆分"""
for block_size in sorted(self.free_blocks.keys()):
if block_size > aligned_size and self.free_blocks[block_size]:
# 拆分: 一大变两小
addr = self.free_blocks[block_size].pop()
remaining_size = block_size - aligned_size
# 剩余部分放回空闲列表
self._add_free_block(addr + aligned_size, remaining_size)
return addr, aligned_size
return None
def free(self, addr):
"""释放内存
关键操作: 尝试合并相邻的空闲块,减少外部碎片。
"""
with self.lock:
if addr not in self.block_map:
return
block_size = self.block_map.pop(addr)
self.used_size -= block_size
# 尝试合并相邻空闲块
merged_addr, merged_size = self._try_merge(addr, block_size)
self._add_free_block(merged_addr, merged_size)
def _try_merge(self, addr, size):
"""尝试与相邻空闲块合并
这是减少外部碎片的核心。
每次释放时检查左右两侧是否有空闲块,有则合并。
"""
merged_addr = addr
merged_size = size
# 检查右侧相邻块
right_addr = addr + size
for block_size, addrs in list(self.free_blocks.items()):
for a in addrs:
if a == right_addr:
merged_size += block_size
addrs.remove(a)
break
# 检查左侧相邻块
left_addr = addr - merged_size
for block_size, addrs in list(self.free_blocks.items()):
for a in addrs:
if a + block_size == left_addr:
merged_addr = a
merged_size += block_size
addrs.remove(a)
break
return merged_addr, merged_size
def stats(self):
"""返回显存使用统计"""
with self.lock:
free_size = self.total_size - self.used_size
return {
'total': self.total_size,
'used': self.used_size,
'free': free_size,
'usage_rate': f"{self.used_size / self.total_size * 100:.1f}%",
'fragment_count': len(self.block_map),
}
2.2 使用示例
# 初始化 8GB 显存池
allocator = NPUAllocator(total_size=8 * 1024 * 1024 * 1024)
# 分配模型权重
weight_buf = allocator.allocate(200 * 1024 * 1024) # 200MB
input_buf = allocator.allocate(100 * 1024 * 1024) # 100MB
# 使用后释放
allocator.free(input_buf)
# 查看统计
print(allocator.stats())
# {'total': 8589934592, 'used': 209715200, 'free': 8380219392, ...}
三、碎片整理策略
3.1 紧凑式碎片整理
把所有已分配的块移到一端,空闲空间连成一片。需要暂停所有推理任务。
def compact(self):
"""紧凑式碎片整理
步骤:
1. 暂停推理(阻止新分配)
2. 计算所有已分配块的新地址(连续排列)
3. 搬移数据到新地址
4. 更新地址映射
5. 恢复推理
代价:
- 暂停时间与已分配块数量成正比
- 数据搬移消耗带宽
- 适合在低峰期执行
"""
with self.lock:
# 收集所有已分配块
allocated = sorted(self.block_map.items(), key=lambda x: x[0])
# 计算新地址
new_addr = self._base_addr
addr_mapping = {} # 旧地址 → 新地址
for old_addr, block_size in allocated:
addr_mapping[old_addr] = new_addr
new_addr += block_size
# 搬移数据(模拟)
for old_addr, new_addr in addr_mapping.items():
# 实际中需要用 memcpy 或 DMA 搬移
pass
# 更新映射
self.block_map.clear()
for old_addr, new_addr in addr_mapping.items():
self.block_map[new_addr] = self.block_map.get(old_addr, 0)
# 重置空闲列表
self.free_blocks.clear()
free_start = new_addr
free_size = self._base_addr + self.total_size - free_start
if free_size > 0:
self._add_free_block(free_start, free_size)
3.2 增量式碎片整理
不需要暂停,每次释放时主动合并。这是日常策略。
def incremental_compact(self):
"""增量式碎片整理
在每次 free() 时自动执行,不需要暂停。
通过合并相邻空闲块,逐步减少碎片。
这是日常策略,大部分场景够用。
"""
# 已经在 free() 的 _try_merge 中实现了
pass
3.3 策略对比
| 策略 | 适用场景 | 暂停时间 | 碎片消除效果 |
|---|---|---|---|
| 增量合并 | 日常运行 | 无 | 中等 |
| 紧凑整理 | 低峰期 | 高 | 最佳 |
| 重新分配 | 模型切换 | 中等 | 好 |
四、针对推理场景的优化
4.1 预分配策略
推理服务启动时就预估好需要多少显存,一次性分配到位。
def preallocate_for_inference(model_configs):
"""根据模型配置预分配显存
推理服务的显存需求相对可预测:
- 模型权重: 固定大小,启动时确定
- KV Cache: 最大序列长度 × batch size × 层数
- 激活缓冲: 最大输入 shape × 中间层最大宽度
"""
allocator = NPUAllocator()
for config in model_configs:
# 预分配权重缓冲
weight_buf = allocator.allocate(config['weight_size'])
# 预分配 KV Cache(按最大序列长度)
kv_cache_size = (
config['max_seq_len'] *
config['max_batch'] *
config['num_layers'] *
config['hidden_dim'] *
2 # K 和 V
)
kv_buf = allocator.allocate(kv_cache_size)
return allocator
4.2 多级缓存
把显存分成几个区域,优先用小的、快的区域。
class MultiLevelCache:
"""多级显存缓存
L1: 寄存器文件 — 最快,容量最小(几十 KB)
L2: SRAM 缓存 — 快,容量中等(几百 KB)
L3: HBM 显存 — 慢,容量大(GB 级)
策略: 频繁访问的数据放 L1/L2,不频繁的放 L3。
"""
def __init__(self):
self.l1 = {} # 寄存器级缓存
self.l2 = {} # SRAM 级缓存
self.l3 = {} # HBM 级缓存
def get(self, key):
if key in self.l1:
return self.l1[key]
if key in self.l2:
val = self.l2.pop(key)
self.l1[key] = val
return val
if key in self.l3:
val = self.l3.pop(key)
self.l2[key] = val
return val
return None
def put(self, key, value, level='l3'):
if level == 'l1':
self.l1[key] = value
elif level == 'l2':
self.l2[key] = value
else:
self.l3[key] = value
五、监控与诊断
class MemoryMonitor:
"""显存使用监控
持续跟踪显存使用情况,发现异常及时告警。
"""
def __init__(self, allocator, threshold=0.9):
self.allocator = allocator
self.threshold = threshold
self.history = []
def check(self):
stats = self.allocator.stats()
self.history.append(stats)
usage_rate = stats['used'] / stats['total']
if usage_rate > self.threshold:
print(f"[WARN] 显存使用率 {usage_rate:.1%} 超过阈值 {self.threshold:.0%}")
return stats
def report(self):
"""生成显存使用报告"""
if not self.history:
return "无数据"
latest = self.history[-1]
peak_used = max(h['used'] for h in self.history)
report = f"""
显存使用报告:
总量: {latest['total'] / 1024**3:.1f} GB
当前使用: {latest['used'] / 1024**3:.1f} GB ({latest['usage_rate']})
峰值使用: {peak_used / 1024**3:.1f} GB ({peak_used / latest['total']:.1%})
碎片数: {latest['fragment_count']}
"""
return report
六、常见问题
| 问题 | 原因 | 解决方案 |
|---|---|---|
| OOM 但总空闲够用 | 外部碎片严重 | 执行紧凑式碎片整理 |
| 分配速度变慢 | 空闲链表太长 | 调整大小类粒度,减少链表长度 |
| 显存泄漏 | 忘记释放 buffer | 用 MemoryMonitor 监控,定期检查 |
| 推理延迟抖动 | 碎片整理暂停了推理 | 用增量式整理,避免暂停 |
相关仓库
- CANN - 昇腾计算架构 https://gitee.com/ascend/cann
- jemalloc - 高性能内存分配器 https://github.com/jemalloc/jemalloc
- tcmalloc - Google 线程级内存分配器 https://github.com/google/tcmalloc
- PoolAllocator - 虚拟显存池化分配器 https://github.com/vllm-project/vllm
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)