torchtitan-npu:大模型预训练在昇腾上的分布式执行链路与 Checkpoint 优化
前言
大模型预训练的工程复杂度不仅来自模型本身的参数规模,更来自分布式训练过程中跨设备通信、计算资源调度、以及训练状态持久化(Checkpoint)带来的系统性开销。torchtitan 是 PyTorch 官方提供的大模型预训练参考实现,基于原生 PyTorch 分布式接口构建,支持 FSDP(Fully Sharded Data Parallel)与张量并行(Tensor Parallel)等主流并行策略。torchtitan-npu 在此基础上,面向昇腾 NPU 硬件平台进行了系统性适配:该技术重构了分布式执行链路以匹配 Ascend 910 的通信原语与内存层次,并针对 NPU 的设备内存特性重新设计了 Checkpoint 的写入路径与异步流水线。本文从分布式执行链路的设计、通信拓扑优化、Checkpoint 机制三个维度,深入剖析该技术在昇腾平台上的架构实现。
大模型分布式训练的执行链路抽象
单步训练的计算图分解
在大模型预训练场景中,一次训练迭代(Iteration)的计算链路可以分解为以下阶段:
- 前向传播(Forward Pass):输入数据经过嵌入层(Embedding)、多层 Transformer Block(含自注意力与前馈网络)、最后经过输出投影层产生 logits。
- 损失计算(Loss Computation):logits 与标签计算交叉熵损失。
- 反向传播(Backward Pass):损失函数逐级反向传播梯度至每一层参数。
- 优化器步骤(Optimizer Step):优化器(如 AdamW)根据梯度更新模型参数。
在单设备场景下,上述四个阶段串行执行,计算资源利用率受限于单一设备的算力与内存容量。当模型参数量超过 NPU 设备内存上限(Ascend 910 单卡设备内存为 32GB/64GB,取决于具体 SKU),必须将模型参数、梯度、优化器状态分布到多个 NPU 设备上。
分布式并行策略的层次结构
torchtitan-npu 采用了多维并行策略的叠加方案:
数据并行(Data Parallel, DP):将不同的数据批次分配到不同 NPU 上,每张卡持有完整的模型副本。该策略的通信发生在反向传播之后的梯度同步(All-Reduce)。
全分片数据并行(Fully Sharded Data Parallel, FSDP):将模型参数、梯度、优化器状态按层级分片到不同 NPU 上,每张卡只保留部分参数。前向/反向传播时通过 All-Gather 临时重建完整参数,计算完成后立即释放。该策略显著降低了单卡内存占用,但引入了额外的通信开销。
张量并行(Tensor Parallel, TP):将每一层的参数矩阵按行或列切分到不同 NPU 上,计算过程中通过 All-Reduce 或 All-to-All 通信完成跨卡数据聚合。该策略适用于单层参数无法放入单卡内存的超大模型(如 GPT-3 175B 的单层参数超过 10GB)。
流水线并行(Pipeline Parallel, PP):将模型的不同层分配到不同 NPU 上,形成流水线。微批次(Micro-batch)在不同阶段之间流水执行,提高设备利用率。
torchtitan-npu 通过 PyTorch 的 distributed 接口将上述并行策略组合,并在昇腾平台上替换底层的通信后端(Backend),使其从默认的 NCCL(NVIDIA Collective Communications Library)迁移到昇腾的 HCCL(Huawei Collective Communication Library)。
昇腾 NPU 上的分布式执行链路重构
HCCL 通信后端的集成
PyTorch 的分布式训练依赖 ProcessGroup 抽象来管理多卡之间的集合通信原语。默认情况下,ProcessGroupNCCL 通过 NCCL 库实现高性能 GPU 间通信。在昇腾平台上,该技术使用 ProcessGroupHCCL 替换 NCCL 后端,底层调用 HCCL 提供的集合通信 API。
HCCL 支持与 NCCL 功能对齐的通信原语集合,包括:
HcclAllReduce:全归约,用于数据并行下的梯度同步。HcclAllGather:全收集,用于 FSDP 下的参数重建。HcclReduceScatter:归约散布,用于 FSDP 下的梯度分片。HcclBroadcast:广播,用于初始化参数同步。
该技术的关键工程工作在于:适配 PyTorch distributed 的 C++ 扩展接口,实现 ProcessGroupHCCL 类,使其能够正确初始化 HCCL 通信域(Communicator),并在 NPU 的流(Stream)模型下正确调度集合通信任务。
// ProcessGroupHCCL 的 AllReduce 实现伪代码
// 为什么使用异步执行:NPU 的通信与计算可以并行,
// 通过将通信任务投递到独立流,避免阻塞计算流
c10::intrusive_ptr<Work> ProcessGroupHCCL::allreduce(
std::vector<at::Tensor>& tensors,
const AllreduceOptions& opts) {
auto& tensor = tensors[0];
// 确保张量数据位于 NPU 设备内存
TORCH_CHECK(tensor.is_npu(), "Tensor must be on NPU device");
// 获取当前 NPU 流(计算流与通信流分离)
auto stream = c10_npu::getCurrentNPUStream();
// 将 AllReduce 任务投递到通信流(非阻塞)
// 通信流与计算流通过 event 机制同步,避免数据竞争
auto work = c10::make_intrusive<WorkHCCL>();
hcclAllReduce(tensor.data_ptr(),
tensor.data_ptr(),
tensor.numel(),
getHcclDataType(tensor.scalar_type()),
hcclSum,
comm_,
stream.stream());
return work;
}
计算-通信重叠优化
大模型训练的迭代时间中,通信开销往往占据 30%-50%(仅供参考)。torchtitan-npu 通过以下技术手段实现计算与通信的重叠:
梯度累积与通信流水线化:在反向传播过程中,每一层的梯度计算完成后立即触发对应的 All-Reduce 通信,而非等待整个反向传播完成后统一通信。该技术利用 NPU 的多流机制:反向传播的矩阵乘法在计算流上执行,而梯度的 All-Reduce 在通信流上并行执行。两层之间的通信通过 NPU 的硬件调度器自动流水线化。
激活值重计算(Activation Recomputation)的通信感知调度:激活值重计算通过在反向传播时重新执行前向传播来节省激活值的存储内存,但引入了额外的计算开销。该技术将激活值重计算的通信需求(如 TP 下的 All-Reduce)与底层通信流水线的空闲周期重叠,降低了重计算带来的端到端延迟。
通信压缩:对于 FP32 梯度,All-Reduce 的通信量是参数量的 4 倍(按字节计)。torchtitan-npu 支持梯度压缩策略:将 FP32 梯度量化为 FP16 或 BF16 后再通信,在通信端解量化后参与优化器更新。该技术在通信带宽受限的场景下可显著降低迭代时间,但会引入量化误差,需根据模型特性谨慎选择。
内存层次感知的 FSDP 实现
FSDP 的核心挑战在于:在参数分片的前提下,如何在前向/反向传播时高效地临时重建完整参数,并在计算完成后及时释放,以最小化峰值内存占用。
该技术针对 Ascend 910 的内存层次进行了以下优化:
参数预取(Parameter Prefetching):在多层 Transformer 模型中,FSDP 按层分片参数。当前层在计算时,可提前通过 All-Gather 获取下一层的完整参数,并重叠在当前层的计算流中。该技术通过 NPU 的 DMA 引擎实现参数预取的非阻塞执行,隐藏了 All-Gather 的通信延迟。
CPU Offload 策略:当 NPU 设备内存不足以容纳优化器状态(AdamW 需要存储一阶与二阶动量,内存占用为参数精度的 2 倍)时,该技术支持将优化器状态溢出到主机内存。具体实现为:在优化器步骤完成后,将动量张量通过 aclrtMemcpy 异步拷贝到主机内存;在下一次优化器步骤前,再预先取回。该策略以通信带宽换取设备内存容量,适用于超大模型在有限 NPU 内存上的训练场景。
Checkpoint 优化:分布式训练状态的持久化
Checkpoint 的工程挑战
大模型训练的 Checkpoint 包含以下组件:
- 模型参数(Model Weights)
- 优化器状态(Optimizer States):AdamW 的 momentum 与 variance
- 训练状态(Training State):迭代步数、学习率调度器状态、随机种子等
- 数据集状态(Dataloader State):数据采样的进度
以 Llama-2 70B 模型为例,其 FP16 参数占用约 140GB,AdamW 优化器状态(FP32)占用约 560GB,总计超过 700GB。在分布式训练场景下,这个 Checkpoint 被分片存储在不同 NPU 所在节点的本地文件系统中。传统的 Checkpoint 写入方式存在两个核心问题:
写入阻塞训练:同步 Checkpoint 在主训练进程中进行,所有 NPU 通过 torch.distributed.barrier() 同步后,逐一将分片状态写入磁盘。在此期间,GPU/NPU 计算资源完全闲置,造成显著的训练时间损失。对于 700GB 规模的 Checkpoint,即使在 NVMe SSD 上写入也可能需要数十分钟(仅供参考)。
存储冗余:当使用数据并行且未启用 FSDP 时,每个 NPU 持有完整的模型副本,Checkpoint 文件中存在大量重复数据,浪费存储空间。
异步 Checkpoint 流水线
torchtitan-npu 采用了异步 Checkpoint 机制来消除写入对训练迭代的阻塞:
原理:在主训练循环中,Checkpoint 的序列化与写入操作被委派到独立的后台线程(或独立进程),主进程在触发 Checkpoint 请求后立即继续下一轮训练迭代,无需等待写入完成。
实现细节:
# 异步 Checkpoint 的核心实现逻辑
# 为什么使用多进程而非多线程:Python GIL 会限制多线程的并行写入性能,
# 而多进程可以充分利用多块 NVMe 盘的并行写入带宽
class AsyncCheckpointer:
def __init__(self, output_dir: str):
self.output_dir = output_dir
# 使用 multiprocessing 避免 GIL,最大化磁盘写入并行度
self._process_pool = mp.Pool(processes=4)
self._pending_futures: List[Future] = []
def save_async(self, state_dict: Dict[str, Any], step: int):
"""异步保存 Checkpoint,立即返回不阻塞训练"""
# 序列化状态字典为字节流(CPU 侧操作,较轻量)
serialized = self._serialize(state_dict)
# 提交到进程池异步写入磁盘
# 注意:这里不调用 future.result(),避免阻塞
future = self._process_pool.apply_async(
self._write_to_disk,
args=(serialized, step)
)
self._pending_futures.append(future)
# 清理已完成的异步任务,防止内存泄漏
self._collect_finished_tasks()
def _write_to_disk(self, serialized: bytes, step: int):
"""在独立进程中执行的实际写入函数"""
# 写入临时文件,完成后原子重命名,避免损坏
tmp_path = os.path.join(self.output_dir, f"step_{step}.tmp")
final_path = os.path.join(self.output_dir, f"step_{step}.pt")
with open(tmp_path, "wb") as f:
f.write(serialized)
os.rename(tmp_path, final_path)
在上述实现中,save_async 在训练主进程中调用,但实际的磁盘写入在独立进程中完成。主进程通过 _collect_finished_tasks 定期回收已完成的异步任务,防止 Future 对象无限堆积。
NPU 设备内存到主机内存的异步拷贝
Checkpoint 数据位于 NPU 设备内存中(模型参数与优化器状态),需要先拷贝到主机内存才能序列化写入磁盘。该技术的异步 Checkpoint 进一步将这一拷贝操作流水线化:
Pinned Memory 缓冲区:该技术在主机的 Pinned Memory(页锁定内存)区域预先分配一块固定大小的缓冲区。NPU 到主机的拷贝目标直接指向该缓冲区,避免了页错误带来的延迟抖动。
双缓冲机制(Double Buffering):该技术维护两个 Pinned Memory 缓冲区:当缓冲区 A 正在接收 NPU 拷贝数据时,缓冲区 B 同时被序列化并写入磁盘。两个缓冲区通过轮转方式复用,实现了 NPU→Host 拷贝与磁盘写入的流水线重叠。
# 双缓冲异步 Checkpoint 的核心逻辑
# 为什么使用双缓冲:NPU→Host 的拷贝(aclrtMemcpy)与
# Host 的磁盘写入(f.write)可以同时执行,
# 单缓冲会串行这两个操作,双缓冲可隐藏其中一个的延迟
class DoubleBufferedCheckpointer:
def __init__(self, buffer_size: int):
# 分配两个 Pinned Memory 缓冲区
self.buffers = [
torch.empty(buffer_size, dtype=torch.uint8,
pin_memory=True, device='cpu'),
torch.empty(buffer_size, dtype=torch.uint8,
pin_memory=True, device='cpu')
]
self._write_idx = 0 # 当前用于 NPU→Host 拷贝的缓冲区索引
self._flush_idx = 1 # 当前用于磁盘写入的缓冲区索引
self._events = [torch.npu.Event(), torch.npu.Event()]
def save_step(self, state_tensors: List[torch.Tensor], step: int):
"""每个 Checkpoint 步骤切换缓冲区"""
buf = self.buffers[self._write_idx]
# 异步将 NPU 张量拷贝到 Pinned Memory 缓冲区
# 使用 NPU 的 memcpy 异步流,不阻塞计算流
with torch.npu.stream(self._copy_stream):
offset = 0
for t in state_tensors:
buf[offset:offset+t.nbytes()].copy_(
t.flatten().view(torch.uint8),
non_blocking=True)
offset += t.nbytes()
# 记录拷贝完成的 event
self._events[self._write_idx].record()
# 切换缓冲区,使下一个 Checkpoint 步骤使用另一个缓冲
self._write_idx, self._flush_idx = self._flush_idx, self._write_idx
# 等待上一个缓冲区的拷贝完成,然后序列化写入磁盘
self._events[self._flush_idx].wait()
self._serialize_and_write(self.buffers[self._flush_idx], step)
分布式 Checkpoint 的分片合并与增量保存
分片合并(Sharded Checkpoint Merging):当使用 FSDP 时,每个 NPU 只持有模型的部分参数。传统的做法是每个 NPU 独立保存一个分片文件,模型恢复时需要所有分片共同加载。该技术支持在保存时将分片参数通过 All-Gather 临时聚合为一个完整模型(仅限 Checkpoint 保存时,不影响训练时的内存占用),然后由 Rank 0 进程统一写入一个完整 Checkpoint 文件。这一机制显著简化了模型推理部署时的加载流程。
增量 Checkpoint(Incremental Checkpoint):在大模型训练中,模型参数在相邻两个 Checkpoint 步骤之间的变化量通常很小(尤其是学习率衰减到后期时)。该技术支持增量 Checkpoint 模式:仅将参数变化量(Delta)持久化,而非完整参数。恢复时,从最近的完整 Checkpoint 出发,依次应用增量 Delta 重建最新状态。该策略在 Checkpoint 频率较高(如每 100 步保存一次)的场景下,可显著降低存储开销与写入时间。
性能特征与工程权衡
通信拓扑对训练吞吐的影响
该技术的性能高度依赖 NPU 之间的互连带宽。Ascend 910 服务器通常采用 8×NPU 的异构计算节点,NPU 之间通过 HCCS(Huawei Compute Connectivity System)高速互连,单链路双向带宽可达 56 GB/s(仅供参考)。当训练规模扩展到多个节点时,节点间通过 InfiniBand 或 RoCE 网络互连,带宽通常显著低于节点内互连,成为通信瓶颈。
该技术针对跨节点通信进行了拓扑感知优化:在初始化 ProcessGroup 时,优先在节点内完成 All-Reduce 操作(Intra-node Reduce),再将部分结果通过节点间通信聚合(Inter-node Gather),减少了跨节点的通信量。
Checkpoint 频率与存储成本的权衡
异步 Checkpoint 虽然消除了写入对训练的阻塞,但引入了额外的工程复杂度:当训练进程异常退出时,最近一次异步 Checkpoint 可能尚未完成写入,导致部分训练状态丢失。该技术通过以下方式缓解这一问题:
- 保留最近 N 次(默认 N=3)Checkpoint 的副本,避免单点损坏导致无法恢复。
- 在每次 Checkpoint 完成后,通过
fsync系统调用强制刷盘,确保数据持久化到物理介质。 - 支持将 Checkpoint 同时写入多个挂载点(如本地 NVMe + 远程对象存储),提高容错能力。
结尾
torchtitan-npu 在 PyTorch 原生分布式训练抽象与昇腾 NPU 硬件之间构建了一套完整的执行链路,通过 HCCL 通信后端的集成、计算-通信重叠优化、内存层次感知的 FSDP 实现,将大模型预训练的分布式计算高效映射到 Ascend 910 处理器上。该技术在 Checkpoint 机制上的创新——异步写入流水线、双缓冲 NPU→Host 拷贝、增量持久化——显著降低了训练状态保存对迭代时间的阻塞,为大模型在昇腾平台上的大规模训练提供了工程可行的系统支撑。该技术的架构设计为 PyTorch 生态与昇腾 CANN 生态的深度集成提供了参考实现,也为其他异构 AI 加速芯片适配 PyTorch 分布式训练提供了可复用的技术路径。
仓库地址:https://gitee.com/ascend/cann/tree/master/torchtitan-npu
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)