昇腾CANN ops-transformer MoE:专家混合路由的 NPU 融合优化实战
MoE(Mixture of Experts)是大模型规模扩展的关键技术——把一个巨大的 FFN 拆成多个小专家,每个 token 只激活其中几个。DeepSeek-V3 用 256 个专家,每个 token 只走 8 个——计算量是同等规模稠密模型的 1/32。但 MoE 的调度逻辑复杂:路由打分 → top-k 选择 → 专家分发 → 结果聚合。ops-transformer 仓库的 MoE 算子把这些步骤融合成一个 kernel。
MoE 的计算流程
MoE 单层流程(标准实现)
├─ 1. 路由打分:gate_proj(x) → [batch, seq, num_experts]
│ x 与每个专家的「门控向量」做内积
│
├─ 2. Top-K 选择:对每个 token,选得分最高的 K 个专家
│ 输出:expert_indices [batch, seq, K]
│ expert_weights [batch, seq, K](softmax 归一化)
│
├─ 3. Token 分发:把 token 发给对应的专家
│ 每个专家收到一批 token(数量不固定)
│
├─ 4. 专家计算:每个专家独立做 FFN(up_proj + gate_proj + down_proj)
│ up_proj(x) × sigmoid(gate_proj(x)) → hidden
│ down_proj(hidden) → output
│
└─ 5. 结果聚合:按 token 把专家输出加权求和
y = Σ_k weight_k × expert_k(x)
步骤 3-5 是瓶颈:Token 分发涉及大量数据搬运,专家计算涉及小矩阵乘(batch size 不固定),结果聚合又涉及不规则内存访问。
MoE 融合 Kernel 的设计
ops-transformer 把步骤 2-5 融合成一个 kernel:
// ops-transformer/kernels/moe_fused.cpp
__aicore__ void MoEFusedKernel(
GlobalTensor<float>& input, // [batch, seq, hidden]
GlobalTensor<float>& gate_weights, // [num_experts, hidden]
GlobalTensor<float>& expert_up, // [num_experts, hidden, ffn_hidden]
GlobalTensor<float>& expert_gate, // [num_experts, hidden, ffn_hidden]
GlobalTensor<float>& expert_down, // [num_experts, ffn_hidden, hidden]
GlobalTensor<float>& output, // [batch, seq, hidden]
GlobalTensor<int>& expert_indices, // [batch, seq, top_k] 输出
GlobalTensor<float>& expert_weights, // [batch, seq, top_k] 输出
int batch, int seq_len, int hidden,
int num_experts, int top_k, int ffn_hidden
) {
// ===== 阶段 1:路由打分 + Top-K(并行处理所有 token)=====
// 每个 block 处理一个 token
for (int b = 0; b < batch; b++) {
for (int s = 0; s < seq_len; s++) {
int token_id = b * seq_len + s;
// 路由打分:x @ gate_weights^T
// gate_weights: [num_experts, hidden]
// 输出: scores [num_experts]
LocalTensor<float> scores(num_experts);
MatMul(scores, input[token_id], gate_weights);
// Top-K 选择(部分排序,不需要完全排序)
// 输出:indices [top_k], weights [top_k]
LocalTensor<int> topk_indices(top_k);
LocalTensor<float> topk_scores(top_k);
TopKPartial(scores, topk_indices, topk_scores, num_experts, top_k);
// Softmax 归一化 top-k 权重
Softmax(topk_scores, topk_scores);
// 存储 top-k 结果
Store(expert_indices[token_id], topk_indices);
Store(expert_weights[token_id], topk_scores);
}
}
// ===== 阶段 2:Token 重排(按专家分组)=====
// 统计每个专家收到的 token 数量
LocalTensor<int> expert_token_count(num_experts);
expert_token_count = 0;
for (int b = 0; b < batch; b++) {
for (int s = 0; s < seq_len; s++) {
int token_id = b * seq_len + s;
for (int k = 0; k < top_k; k++) {
int expert_id = expert_indices[token_id * top_k + k];
expert_token_count[expert_id]++;
}
}
}
// 构建 token 到专家的映射表(CSR 格式)
// expert_tokens[expert_id] = [token_id_0, token_id_1, ...]
LocalTensor<int> expert_tokens(batch * seq_len * top_k);
LocalTensor<int> expert_offset(num_experts + 1);
ExclusiveScan(expert_token_count, expert_offset);
// 填充 token 列表
LocalTensor<int> expert_cursor(num_experts);
expert_cursor = 0;
for (int b = 0; b < batch; b++) {
for (int s = 0; s < seq_len; s++) {
int token_id = b * seq_len + s;
for (int k = 0; k < top_k; k++) {
int expert_id = expert_indices[token_id * top_k + k];
int pos = expert_offset[expert_id] + expert_cursor[expert_id];
expert_tokens[pos] = token_id;
expert_cursor[expert_id]++;
}
}
}
// ===== 阶段 3:专家计算(分组并行)=====
// 每个 block 处理一个专家
for (int e = 0; e < num_experts; e++) {
int num_tokens_for_expert = expert_token_count[e];
if (num_tokens_for_expert == 0) continue;
// 加载该专家收到的所有 token
LocalTensor<float> expert_input(num_tokens_for_expert * hidden);
for (int t = 0; t < num_tokens_for_expert; t++) {
int token_id = expert_tokens[expert_offset[e] + t];
Copy(expert_input[t * hidden], input[token_id * hidden], hidden);
}
// FFN 计算(小矩阵乘,Cube 单元)
// up = expert_input @ expert_up[e]
LocalTensor<float> up(num_tokens_for_expert * ffn_hidden);
MatMul(up, expert_input, expert_up[e]);
// gate = expert_input @ expert_gate[e]
LocalTensor<float> gate(num_tokens_for_expert * ffn_hidden);
MatMul(gate, expert_input, expert_gate[e]);
// 激活:silu(gate) * up
LocalTensor<float> hidden(num_tokens_for_expert * ffn_hidden);
for (int i = 0; i < num_tokens_for_expert * ffn_hidden; i++) {
hidden[i] = up[i] * Silu(gate[i]);
}
// down = hidden @ expert_down[e]
LocalTensor<float> expert_output(num_tokens_for_expert * hidden);
MatMul(expert_output, hidden, expert_down[e]);
// ===== 阶段 4:结果写回(按 token 聚合)=====
for (int t = 0; t < num_tokens_for_expert; t++) {
int token_id = expert_tokens[expert_offset[e] + t];
// 找到这个 token 在 top-k 中的权重
int k_idx = FindKIndex(expert_indices, token_id, e);
float weight = expert_weights[token_id * top_k + k_idx];
// 加权累加到输出
AtomicAdd(output[token_id * hidden], expert_output[t * hidden], weight, hidden);
}
}
}
关键融合点:
- 路由打分 + Top-K 融合:避免 scores 中间结果写回 HBM
- Token 重排 + 专家计算融合:expert_input 在 L1 中直接复用
- 原子累加聚合:多专家输出并行写回,避免串行累加
Top-K 部分排序的优化
MoE 不需要完全排序所有专家得分——只要选出最大的 K 个。ops-transformer 用快速选择(QuickSelect)算法:
// ops-transformer/kernels/topk_partial.cpp
__aicore__ void TopKPartial(
LocalTensor<float>& scores, // [num_experts]
LocalTensor<int>& indices, // [top_k] 输出
LocalTensor<float>& values, // [top_k] 输出
int num_experts, int k
) {
// 快速选择:找到第 K 大的元素作为 pivot
float pivot = QuickSelect(scores, num_experts, k);
// 单次扫描:选出所有 > pivot 的元素
int count = 0;
for (int i = 0; i < num_experts && count < k; i++) {
if (scores[i] >= pivot) {
indices[count] = i;
values[count] = scores[i];
count++;
}
}
// 如果选出的元素不够 K 个(有多个等于 pivot 的),补齐
while (count < k) {
// 找到第一个等于 pivot 且未被选中的
for (int i = 0; i < num_experts; i++) {
if (scores[i] == pivot && !IsSelected(indices, i, count)) {
indices[count] = i;
values[count] = scores[i];
count++;
break;
}
}
}
}
// QuickSelect:找到第 K 大的元素(O(N) 平均复杂度)
float QuickSelect(LocalTensor<float>& arr, int n, int k) {
int left = 0, right = n - 1;
while (left < right) {
// 随机选 pivot(避免最坏情况)
int pivot_idx = left + (hash(left + right) % (right - left + 1));
float pivot = arr[pivot_idx];
// 三路划分
int i = left;
for (int j = left; j <= right; j++) {
if (arr[j] > pivot) { // 大的在左边
Swap(arr[i], arr[j]);
i++;
}
}
// 判断 pivot 位置
if (k < i) {
right = i - 1;
} else if (k >= i && k <= (n - 1 - (right - i + 1))) {
return pivot; // pivot 正好在第 k 位
} else {
left = i;
}
}
return arr[left];
}
完全排序需要 O(N log N),快速选择只需要 O(N)。当 num_experts=256、top_k=8 时,快速选择比排序快 10×。
专家并行与 Token 并行的平衡
MoE 的并行有两个维度:
- Token 并行:不同 token 的路由和计算互不依赖
- 专家并行:不同专家的计算互不依赖
ops-transformer 的策略:
- 小 batch(batch×seq < num_experts):专家并行为主
- 大 batch(batch×seq > num_experts):Token 并行为主
// 自动选择并行策略
void MoESelectParallelStrategy(int batch, int seq, int num_experts) {
int total_tokens = batch * seq;
if (total_tokens < num_experts) {
// 专家并行:每个 block 处理一个专家
// 适合推理场景(batch=1, seq=512)
LaunchMoEExpertParallel(batch, seq, num_experts);
} else {
// Token 并行:每个 block 处理一批 token
// 适合训练场景(batch=8, seq=2048)
LaunchMoETokenParallel(batch, seq, num_experts);
}
}
踩坑一:Token 分布不均导致负载不平衡
某些热门专家可能收到大量 token,冷门专家几乎没 token。最坏情况:95% 的 token 走同一个专家,其他专家空转。
缓解策略:容量因子(Capacity Factor)
// 限制每个专家最多处理的 token 数
int max_tokens_per_expert = (batch * seq * top_k) / num_experts * capacity_factor;
// capacity_factor = 1.0:完美均衡(每个专家处理相同数量的 token)
// capacity_factor = 1.5:允许 50% 的负载波动
// capacity_factor = 2.0:允许热点专家处理双倍 token
// 超出容量的 token:走 drop 或辅助专家
if (expert_token_count[e] > max_tokens_per_expert) {
// 策略 1:丢弃超出的 token(损失信息)
// 策略 2:路由到次优专家(保留信息,但效果略差)
int second_best = FindSecondBestExpert(token_id, scores);
expert_indices[token_id * top_k + k] = second_best;
}
踩坑二:原子累加的精度损失
多专家输出用 AtomicAdd 累加。FP16 精度下,连续 8 次累加可能损失 5% 的精度。
修复:累加时用 FP32 中间结果
// 错误:直接 FP16 累加
AtomicAdd(output_fp16, expert_output_fp16, weight); // 精度损失
// 正确:先转 FP32,累加完再转回
float output_fp32 = float(output_fp16) + float(expert_output_fp16) * weight;
output_fp16 = half(output_fp32);
ops-transformer 的 MoE kernel 内部自动处理精度转换——用户感知不到 FP32 中间结果。
踩坑三:专家权重加载的内存布局
每个专家的 FFN 权重形状相同 [hidden, ffn_hidden],但实际加载时是交错的:
权重文件布局(不适合 MoE)
expert_0_up [hidden, ffn]
expert_0_gate [hidden, ffn]
expert_0_down [ffn, hidden]
expert_1_up [hidden, ffn]
...
优化布局(适合 MoE 融合)
all_up [num_experts, hidden, ffn] ← 连续存储
all_gate [num_experts, hidden, ffn]
all_down [num_experts, ffn, hidden]
ops-transformer 提供权重转换脚本:
python tools/convert_moe_weights.py \
--input standard_format.pt \
--output moe_optimized.pt \
--layout interleaved
转换后的权重可以直接送入 MoE 融合 kernel——不需要运行时重排。
MoE 是大模型规模扩展的核心技术,但 MoE 的调度逻辑是性能杀手——路由打分、Token 分发、结果聚合这些步骤在 CPU 上是毫秒级,在 NPU 上必须压到微秒级。ops-transformer 的 MoE 融合算子把路由、分发、计算、聚合四个阶段合并成一个 kernel,中间结果不回 HBM。top-k 快速选择、专家并行与 Token 并行的自适应切换、容量因子负载平衡——这些是 MoE 在 NPU 上跑得快的关键。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)