MoE(Mixture of Experts)是大模型规模扩展的关键技术——把一个巨大的 FFN 拆成多个小专家,每个 token 只激活其中几个。DeepSeek-V3 用 256 个专家,每个 token 只走 8 个——计算量是同等规模稠密模型的 1/32。但 MoE 的调度逻辑复杂:路由打分 → top-k 选择 → 专家分发 → 结果聚合。ops-transformer 仓库的 MoE 算子把这些步骤融合成一个 kernel。

MoE 的计算流程

MoE 单层流程(标准实现)
├─ 1. 路由打分:gate_proj(x) → [batch, seq, num_experts]
│      x 与每个专家的「门控向量」做内积
│
├─ 2. Top-K 选择:对每个 token,选得分最高的 K 个专家
│      输出:expert_indices [batch, seq, K]
│            expert_weights [batch, seq, K](softmax 归一化)
│
├─ 3. Token 分发:把 token 发给对应的专家
│      每个专家收到一批 token(数量不固定)
│
├─ 4. 专家计算:每个专家独立做 FFN(up_proj + gate_proj + down_proj)
│      up_proj(x) × sigmoid(gate_proj(x)) → hidden
│      down_proj(hidden) → output
│
└─ 5. 结果聚合:按 token 把专家输出加权求和
       y = Σ_k weight_k × expert_k(x)

步骤 3-5 是瓶颈:Token 分发涉及大量数据搬运,专家计算涉及小矩阵乘(batch size 不固定),结果聚合又涉及不规则内存访问。

MoE 融合 Kernel 的设计

ops-transformer 把步骤 2-5 融合成一个 kernel:

// ops-transformer/kernels/moe_fused.cpp

__aicore__ void MoEFusedKernel(
    GlobalTensor<float>& input,          // [batch, seq, hidden]
    GlobalTensor<float>& gate_weights,   // [num_experts, hidden]
    GlobalTensor<float>& expert_up,      // [num_experts, hidden, ffn_hidden]
    GlobalTensor<float>& expert_gate,    // [num_experts, hidden, ffn_hidden]
    GlobalTensor<float>& expert_down,   // [num_experts, ffn_hidden, hidden]
    GlobalTensor<float>& output,         // [batch, seq, hidden]
    GlobalTensor<int>& expert_indices,   // [batch, seq, top_k] 输出
    GlobalTensor<float>& expert_weights, // [batch, seq, top_k] 输出
    int batch, int seq_len, int hidden,
    int num_experts, int top_k, int ffn_hidden
) {
    // ===== 阶段 1:路由打分 + Top-K(并行处理所有 token)=====
    // 每个 block 处理一个 token
    for (int b = 0; b < batch; b++) {
        for (int s = 0; s < seq_len; s++) {
            int token_id = b * seq_len + s;

            // 路由打分:x @ gate_weights^T
            // gate_weights: [num_experts, hidden]
            // 输出: scores [num_experts]
            LocalTensor<float> scores(num_experts);
            MatMul(scores, input[token_id], gate_weights);

            // Top-K 选择(部分排序,不需要完全排序)
            // 输出:indices [top_k], weights [top_k]
            LocalTensor<int> topk_indices(top_k);
            LocalTensor<float> topk_scores(top_k);
            TopKPartial(scores, topk_indices, topk_scores, num_experts, top_k);

            // Softmax 归一化 top-k 权重
            Softmax(topk_scores, topk_scores);

            // 存储 top-k 结果
            Store(expert_indices[token_id], topk_indices);
            Store(expert_weights[token_id], topk_scores);
        }
    }

    // ===== 阶段 2:Token 重排(按专家分组)=====
    // 统计每个专家收到的 token 数量
    LocalTensor<int> expert_token_count(num_experts);
    expert_token_count = 0;

    for (int b = 0; b < batch; b++) {
        for (int s = 0; s < seq_len; s++) {
            int token_id = b * seq_len + s;
            for (int k = 0; k < top_k; k++) {
                int expert_id = expert_indices[token_id * top_k + k];
                expert_token_count[expert_id]++;
            }
        }
    }

    // 构建 token 到专家的映射表(CSR 格式)
    // expert_tokens[expert_id] = [token_id_0, token_id_1, ...]
    LocalTensor<int> expert_tokens(batch * seq_len * top_k);
    LocalTensor<int> expert_offset(num_experts + 1);
    ExclusiveScan(expert_token_count, expert_offset);

    // 填充 token 列表
    LocalTensor<int> expert_cursor(num_experts);
    expert_cursor = 0;
    for (int b = 0; b < batch; b++) {
        for (int s = 0; s < seq_len; s++) {
            int token_id = b * seq_len + s;
            for (int k = 0; k < top_k; k++) {
                int expert_id = expert_indices[token_id * top_k + k];
                int pos = expert_offset[expert_id] + expert_cursor[expert_id];
                expert_tokens[pos] = token_id;
                expert_cursor[expert_id]++;
            }
        }
    }

    // ===== 阶段 3:专家计算(分组并行)=====
    // 每个 block 处理一个专家
    for (int e = 0; e < num_experts; e++) {
        int num_tokens_for_expert = expert_token_count[e];
        if (num_tokens_for_expert == 0) continue;

        // 加载该专家收到的所有 token
        LocalTensor<float> expert_input(num_tokens_for_expert * hidden);
        for (int t = 0; t < num_tokens_for_expert; t++) {
            int token_id = expert_tokens[expert_offset[e] + t];
            Copy(expert_input[t * hidden], input[token_id * hidden], hidden);
        }

        // FFN 计算(小矩阵乘,Cube 单元)
        // up = expert_input @ expert_up[e]
        LocalTensor<float> up(num_tokens_for_expert * ffn_hidden);
        MatMul(up, expert_input, expert_up[e]);

        // gate = expert_input @ expert_gate[e]
        LocalTensor<float> gate(num_tokens_for_expert * ffn_hidden);
        MatMul(gate, expert_input, expert_gate[e]);

        // 激活:silu(gate) * up
        LocalTensor<float> hidden(num_tokens_for_expert * ffn_hidden);
        for (int i = 0; i < num_tokens_for_expert * ffn_hidden; i++) {
            hidden[i] = up[i] * Silu(gate[i]);
        }

        // down = hidden @ expert_down[e]
        LocalTensor<float> expert_output(num_tokens_for_expert * hidden);
        MatMul(expert_output, hidden, expert_down[e]);

        // ===== 阶段 4:结果写回(按 token 聚合)=====
        for (int t = 0; t < num_tokens_for_expert; t++) {
            int token_id = expert_tokens[expert_offset[e] + t];
            // 找到这个 token 在 top-k 中的权重
            int k_idx = FindKIndex(expert_indices, token_id, e);
            float weight = expert_weights[token_id * top_k + k_idx];

            // 加权累加到输出
            AtomicAdd(output[token_id * hidden], expert_output[t * hidden], weight, hidden);
        }
    }
}

关键融合点:

  • 路由打分 + Top-K 融合:避免 scores 中间结果写回 HBM
  • Token 重排 + 专家计算融合:expert_input 在 L1 中直接复用
  • 原子累加聚合:多专家输出并行写回,避免串行累加

Top-K 部分排序的优化

MoE 不需要完全排序所有专家得分——只要选出最大的 K 个。ops-transformer 用快速选择(QuickSelect)算法:

// ops-transformer/kernels/topk_partial.cpp

__aicore__ void TopKPartial(
    LocalTensor<float>& scores,      // [num_experts]
    LocalTensor<int>& indices,       // [top_k] 输出
    LocalTensor<float>& values,      // [top_k] 输出
    int num_experts, int k
) {
    // 快速选择:找到第 K 大的元素作为 pivot
    float pivot = QuickSelect(scores, num_experts, k);

    // 单次扫描:选出所有 > pivot 的元素
    int count = 0;
    for (int i = 0; i < num_experts && count < k; i++) {
        if (scores[i] >= pivot) {
            indices[count] = i;
            values[count] = scores[i];
            count++;
        }
    }

    // 如果选出的元素不够 K 个(有多个等于 pivot 的),补齐
    while (count < k) {
        // 找到第一个等于 pivot 且未被选中的
        for (int i = 0; i < num_experts; i++) {
            if (scores[i] == pivot && !IsSelected(indices, i, count)) {
                indices[count] = i;
                values[count] = scores[i];
                count++;
                break;
            }
        }
    }
}

// QuickSelect:找到第 K 大的元素(O(N) 平均复杂度)
float QuickSelect(LocalTensor<float>& arr, int n, int k) {
    int left = 0, right = n - 1;
    while (left < right) {
        // 随机选 pivot(避免最坏情况)
        int pivot_idx = left + (hash(left + right) % (right - left + 1));
        float pivot = arr[pivot_idx];

        // 三路划分
        int i = left;
        for (int j = left; j <= right; j++) {
            if (arr[j] > pivot) {  // 大的在左边
                Swap(arr[i], arr[j]);
                i++;
            }
        }

        // 判断 pivot 位置
        if (k < i) {
            right = i - 1;
        } else if (k >= i && k <= (n - 1 - (right - i + 1))) {
            return pivot;  // pivot 正好在第 k 位
        } else {
            left = i;
        }
    }
    return arr[left];
}

完全排序需要 O(N log N),快速选择只需要 O(N)。当 num_experts=256、top_k=8 时,快速选择比排序快 10×。

专家并行与 Token 并行的平衡

MoE 的并行有两个维度:

  • Token 并行:不同 token 的路由和计算互不依赖
  • 专家并行:不同专家的计算互不依赖

ops-transformer 的策略:

  • 小 batch(batch×seq < num_experts):专家并行为主
  • 大 batch(batch×seq > num_experts):Token 并行为主
// 自动选择并行策略
void MoESelectParallelStrategy(int batch, int seq, int num_experts) {
    int total_tokens = batch * seq;

    if (total_tokens < num_experts) {
        // 专家并行:每个 block 处理一个专家
        // 适合推理场景(batch=1, seq=512)
        LaunchMoEExpertParallel(batch, seq, num_experts);
    } else {
        // Token 并行:每个 block 处理一批 token
        // 适合训练场景(batch=8, seq=2048)
        LaunchMoETokenParallel(batch, seq, num_experts);
    }
}

踩坑一:Token 分布不均导致负载不平衡

某些热门专家可能收到大量 token,冷门专家几乎没 token。最坏情况:95% 的 token 走同一个专家,其他专家空转。

缓解策略:容量因子(Capacity Factor)

// 限制每个专家最多处理的 token 数
int max_tokens_per_expert = (batch * seq * top_k) / num_experts * capacity_factor;

// capacity_factor = 1.0:完美均衡(每个专家处理相同数量的 token)
// capacity_factor = 1.5:允许 50% 的负载波动
// capacity_factor = 2.0:允许热点专家处理双倍 token

// 超出容量的 token:走 drop 或辅助专家
if (expert_token_count[e] > max_tokens_per_expert) {
    // 策略 1:丢弃超出的 token(损失信息)
    // 策略 2:路由到次优专家(保留信息,但效果略差)
    int second_best = FindSecondBestExpert(token_id, scores);
    expert_indices[token_id * top_k + k] = second_best;
}

踩坑二:原子累加的精度损失

多专家输出用 AtomicAdd 累加。FP16 精度下,连续 8 次累加可能损失 5% 的精度。

修复:累加时用 FP32 中间结果

// 错误:直接 FP16 累加
AtomicAdd(output_fp16, expert_output_fp16, weight);  // 精度损失

// 正确:先转 FP32,累加完再转回
float output_fp32 = float(output_fp16) + float(expert_output_fp16) * weight;
output_fp16 = half(output_fp32);

ops-transformer 的 MoE kernel 内部自动处理精度转换——用户感知不到 FP32 中间结果。

踩坑三:专家权重加载的内存布局

每个专家的 FFN 权重形状相同 [hidden, ffn_hidden],但实际加载时是交错的:

权重文件布局(不适合 MoE)
expert_0_up [hidden, ffn]
expert_0_gate [hidden, ffn]
expert_0_down [ffn, hidden]
expert_1_up [hidden, ffn]
...

优化布局(适合 MoE 融合)
all_up [num_experts, hidden, ffn]    ← 连续存储
all_gate [num_experts, hidden, ffn]
all_down [num_experts, ffn, hidden]

ops-transformer 提供权重转换脚本:

python tools/convert_moe_weights.py \
    --input standard_format.pt \
    --output moe_optimized.pt \
    --layout interleaved

转换后的权重可以直接送入 MoE 融合 kernel——不需要运行时重排。


MoE 是大模型规模扩展的核心技术,但 MoE 的调度逻辑是性能杀手——路由打分、Token 分发、结果聚合这些步骤在 CPU 上是毫秒级,在 NPU 上必须压到微秒级。ops-transformer 的 MoE 融合算子把路由、分发、计算、聚合四个阶段合并成一个 kernel,中间结果不回 HBM。top-k 快速选择、专家并行与 Token 并行的自适应切换、容量因子负载平衡——这些是 MoE 在 NPU 上跑得快的关键。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐