上一篇实现过 SPSC,已经理解了 memory ordering 在生产者和消费者之间建立的 happens-before 关系。现在我们把「一个生产者」变成「多个生产者」,难度曲线会突然变陡 —— 不是因为内存序更复杂了,而是多了一个根本性的新问题:多个生产者之间如何协调

本文会带你走完 MPSC(Multi-Producer Single-Consumer)的完整学习路径:

  1. 从朴素实现的 race 切入,把问题的本质摊开
  2. 用基于 CAS 的链表给出第一个正确答案
  3. 深入 Vyukov 经典侵入式 MPSC —— 工程界的事实标准
  4. 给出基于数组的有界 MPSC 实现
  5. 内存序、瞬态不一致、ABA、内存回收等所有关键细节
  6. 如何验证你的实现是对的(大部分教程不讲,但最重要)

上一篇SPSC:https://blog.csdn.net/cookies_s_/article/details/160229302?fromshare=blogdetail&sharetype=blogdetail&sharerId=160229302&sharerefer=PC&sharesource=cookies_s_&sharefrom=from_link


一、从 SPSC 到 MPSC:问题的本质变化

回忆一下 SPSC 为什么能做到几乎零开销:

  • 生产者只写 tail(对应上一节的write),消费者只读;
  • 消费者只写 head(对应上一节的read),生产者只读;
  • 跨线程可见性用一次 release/acquire 内存序就能覆盖。

根本原因是 —— 两个"写者"在写不同的变量,互不干扰。

一旦变成 MPSC,事情就不一样了:

在这里插入图片描述

现在多个生产者都要写同一个尾指针。队列本质上还是个 FIFO,但"谁是当前队尾"成了一个需要所有生产者达成共识的全局状态。

这就是 MPSC 的核心问题:生产者之间的同步。消费者那一侧反而依旧简单(因为只有一个),这也是 MPSC 比 MPMC 简单不少的原因。

朴素实现错在哪

新手很容易写出这样的"入队":

// ❌ 错误示范:非原子版的 enqueue
void enqueue(Node* n) {
    n->next = nullptr;
    tail->next = n;    // 步骤 1:连上新节点
    tail = n;          // 步骤 2:推进尾指针
}

只要两个生产者同时执行它,就必然出事:

在这里插入图片描述

  • P1 和 P2 在 t0 都把 tail 读成 A
  • P1 先写 A->next = N1
  • P2 马上写 A->next = N2 —— N1 彻底丢失
  • 最后两个都更新 tail,N1 从此成了幽灵节点

问题根源是「读 tail → 写 tail」之间不是原子的。要修复它只有两条路:用 CAS 把"读改写"合成一步,或者用 exchange 直接替换。下面两节分别展示这两种思路。


二、方案一:基于 CAS 的链表 MPSC(入门版)

这是最直接的思路 —— 既然 race 发生在"读写 tail"之间,那就把它们包成一个 CAS 循环。

2.1 基本想法

loop:
    prev = load(tail)
    if CAS(tail, prev, n) succeeded:
        prev->next = n   // 把链接上
        break
    else:
        retry

关键点:prev->next = n 在 CAS 成功之后才做。CAS 成功意味着「我独占了 prev 这个节点」—— 因为此时 tail 已经被改成 n,再也不会有其他生产者拿到同一个 prev 了。

2.2 代码实现

#include <atomic>
#include <utility>

template <typename T>
class CasMpscQueue {
    struct Node {
        T data;
        std::atomic<Node*> next{nullptr};
    };

    alignas(64) std::atomic<Node*> tail_;  // 生产端:最新入队的节点
    alignas(64) Node* head_;               // 消费端:下一个要取的节点的"前驱"
    // 用 stub 节点简化边界:空队列时 head_ == tail_ == &stub_
    Node stub_;

public:
    CasMpscQueue() {
        stub_.next.store(nullptr, std::memory_order_relaxed);
        tail_.store(&stub_, std::memory_order_relaxed);
        head_ = &stub_;
    }

    ~CasMpscQueue() {
        T tmp;
        while (dequeue(tmp)) {}

        // 读空后,最后一个已消费节点可能会滞留为新的前驱
        if (head_ != &stub_) {
            delete head_;
            head_ = &stub_;
        }
    }

    void enqueue(T value) {
        Node* n = new Node();
        n->data = std::move(value);
        n->next.store(nullptr, std::memory_order_relaxed);

        // 1) CAS 循环抢占尾指针
        Node* prev = tail_.load(std::memory_order_relaxed);
        while (!tail_.compare_exchange_weak(
                   prev, n,
                   std::memory_order_release,   // 成功路径:发布 n 和其前置写
                   std::memory_order_relaxed)) {
            // prev 被 CAS 自动刷新为最新的 tail,直接重试
        }
        // 2) CAS 成功后,prev 是"我"独占的 —— 安全链接
        prev->next.store(n, std::memory_order_release);
    }

    bool dequeue(T& out) {
        Node* head = head_;
        Node* next = head->next.load(std::memory_order_acquire);
        if (!next) return false;

        // 把数据搬出来,然后把旧的"前驱"节点回收
        out = std::move(next->data);
        head_ = next;
        if (head != &stub_) delete head;
        return true;
    }
};

2.3 这个版本的优缺点

先给这个版本一个准确定位:它是教学版实现。它非常适合拿来理解"多个生产者如何通过 CAS 协调争抢尾指针",但我不建议把它直接当成"生产环境里可原样落地的完整 MPSC 队列"。

优点:

  • 思路直接,容易理解;
  • 生产者侧的关键同步关系很清楚 —— CAS 保证每个生产者独占自己的 prev
  • dequeue 里采用的是old-head reclamation(释放旧前驱):当前取出的节点会留下来充当新的前驱节点,下一次成功出队时再回收旧前驱。

缺点:

  • CAS 循环会自旋,在高争用下多个生产者会不断失败重试;
  • 每次 enqueue 都要 new 一个节点,堆分配本身就是瓶颈;
  • compare_exchange_weak 在失败时会把最新的 tail 值写回 prev,看似省事,但在激烈争用下相当于一直在读 cache line,触发大量 MESI 协议流量;
  • 它有断链窗口:某个生产者已经 CAS 成功把 tail_ 推进了,但还没来得及写 prev->next。这时 consumer 可能暂时观察到"像是空队列"的状态,因此这个教学版不适合作为"严格线性化 queue"的最终实现
  • 这个版本的回收逻辑还不够收尾:当队列被读空时,最后一个已消费节点会滞留为新的前驱;如果对象此时直接析构,析构函数还需要额外释放这个最终前驱节点,否则会漏掉最后一块堆内存。
  • 是否有 use-after-free 风险
    use-after-free:例如在生产者被抢占导致 “CAS 成功但 prev->next 还没写” 的窗口里,consumer 可能已经把 prev 节点 dequeue 并 delete。此时恢复执行的生产者会写到已释放内存。
    但这里没有这个问题,这得归功于"每次dequeue"释放旧的前驱节点(也就是释放当前dequeue的前驱的前驱)的设计;enqueue 在 CAS 成功后,生产者对旧尾节点 prev 的唯一后续访问,就是的prev->next.store(n, …)。 而 dequeue 只有在读到 head->next != nullptr时,才会删除旧的head。
    这就意味着:如果某个生产者还没把 prev->next 链上,那么消费者看到的就是 next == nullptr。既然 next == nullptr,消费者就不会前进,更不会删除这个 prev。所以不会出现“消费者先 delete prev,生产者再写 prev->next”这种 UAF。

三、方案二:Vyukov 经典 MPSC 侵入式队列

这是 Dmitry Vyukov 在 2008 年前后提出的算法,是目前工程界 MPSC 的事实标准。很多知名项目(Facebook Folly、Pony 运行时、Rust 早期标准库等)都采用了它或其变体。

它精妙到什么程度?整个 enqueue 只有两行核心逻辑,且没有循环。

3.1 核心洞察

CAS 方案慢在哪里?—— 它反复"猜测 + 验证"。如果我们能用一个不会失败的原子操作把同步搞定,就能省掉所有重试。

std::atomic::exchange 就是这样的原语:无条件地把原子变量换成新值,并原子地返回旧值。从 C++ 语义上看,它是一个原子 read-modify-write;在硬件上通常会落到原子交换类指令,或者由 LL/SC 风格序列来实现。

Vyukov 的 enqueue 只做两步:

prev = head.exchange(n);   // 原子地把 head 换成 n,旧值给 prev
prev->next = n;            // 把 prev 的 next 链接到 n

为什么这是正确的?关键论点:

exchange 定义了所有生产者的线性化顺序。 每个生产者拿到的 prev 都是独一无二的(因为任何两个 exchange 必然排出先后关系),因此每个生产者对 prev->next 的写也是独占的,不会冲突。

3.2 数据结构

在这里插入图片描述

这里命名容易让人懵 —— 注意 Vyukov 习惯用 head 表示生产端(最新入队的节点),tail 表示消费端

3.3 为什么要做成侵入式?这不只是风格选择,而是工程上的强约束

“侵入式” 指节点内存由调用者管理 —— 队列只存指针,不做 new/delete

struct MyMessage : MpscQueue::Node {  // 继承队列节点
    int pid;
    int seq;
    // ... 业务字段 ...
};

auto* m = new MyMessage{...};
queue.enqueue(m);              // 传入 MyMessage* 而不是值
// ... consumer ...
MyMessage* m = (MyMessage*)queue.dequeue();
// consumer 负责 delete m

这看起来只是个设计偏好,实际是无锁队列绕开内存回收地雷的关键。对比一下非侵入式的隐患:

// 假设我们写一个非侵入式的 Vyukov 风格队列
void enqueue(T value) {
    Node* n = new Node(std::move(value));        // ← 队列分配
    Node* prev = head.exchange(n);
    prev->next.store(n, release);
}

T dequeue() {
    Node* tail = tail_;
    Node* next = tail->next.load(acquire);
    if (!next) return ...;
    T result = std::move(next->data);
    tail_ = next;
    delete tail;                                 // ← 能安全 delete 吗?
    return result;
}

delete tail 这一步看上去很吓人,但问题要说准确。对这种 exchange/XCHG + prev->next = n 结构来说,consumer 只有在已经观察到 tail->next != nullptr 之后,才可能把旧前驱 tail 回收;因此,这里更本质的风险不是"producer 还没补链,consumer 就先把 prev delete 掉",而是:

  • exchangeprev->next = n 之间存在断链窗口
  • 如果 producer 恰好卡在这个窗口里,consumer 会暂时看不到后续节点;
  • 这会影响可见性和进展性,也是为什么这类非侵入式版本常被描述为"push 对 consumer 有 blocking window"、甚至不是严格线性化的 queue。

那为什么工程上依然常选侵入式?核心原因有三点:

  1. 节点生命周期边界更清楚:队列只负责链接与摘链,不再额外承担一层内部节点管理;
  2. 零额外内部节点:如果消息对象本来就需要动态分配,侵入式不会再多包一层 queue node;
  3. 热路径更轻:无需在队列内部再维护一套更通用的生命周期协议。

如果你真的要做"非侵入式 + 队列自己管理内部节点"的泛化版本,那么可选方案才会落到:

  1. 侵入式,把节点生命周期甩给调用者 —— 约定"谁 enqueue 谁负责让节点活到被 dequeue 出来后再释放";
  2. Hazard Pointer:当结构本身无法证明没有其他线程还握着节点指针时,用 hazard pointer 保护访问中的节点;
  3. Epoch-Based Reclamation:把 delete 推迟到所有可能的并发引用都失效之后。

方案 1 最简单,几乎零开销;方案 2、3 都要额外的数据结构和协议,在热路径上加开销。Vyukov 选侵入式不是偶然 —— 它首先把 ownership 边界外置,然后把队列内部保持在极简。

另外两个收益:

  • 零分配:enqueue 不会 new,对硬实时和 allocator 敏感场景友好
  • 缓存局部性:Node 和业务数据在同一块内存里,consumer 拿到节点时数据就在 cache line 里

3.4 enqueue 详解

在这里插入图片描述

分四步看最清楚:

① 初始状态head == tail == &stub,stub 是 next 为 null 的哨兵。

② XCHG:生产者 P1 执行 prev = head.exchange(X)。现在 head 指向 X,但 A 的 next 还是 null —— 此刻队列"断链"。

③ 回链:P1 执行 prev->next.store(X, release),把 A 连上 X。链条修复。

④ 并发生产者:如果 P2 在 ② 之后紧跟着也做 XCHG,它拿到的 prev 是 X(因为 ② 已经把 head 换成 X 了)。P2 接下来会写 X->next = Y,完全不会和 P1 冲突。

注意这四步中步骤 ② 和 ③ 之间存在"断链窗口"—— 这是 Vyukov 算法最值得深入的地方,下一节专门讨论。

3.5 dequeue 深入解析(全文最绕的部分)

这里要先强调一个容易混淆的点:下面代码里的 tail_,并不总是“下一个可弹出节点的前驱”。只有当 tail_ == &stub_ 时,stub_ 才扮演哨兵/前驱节点;一旦 consumer 跳过 stub_tail_ 通常就表示当前持有的真实节点。于是快速路径的语义是“返回当前 tail,并把 tail_ 推进到 next”,而不是“返回 tail->next”。前文把 tail 近似理解成消费端锚点,是为了帮助理解 stub 和最后一个节点的处理;真正读代码时,应以后面这个更精确的语义为准。
也就是说这里tail更精确的语义是:

  • tail == stub 时,stub 是前驱
  • tail != stub 时,tail 往往就是当前待返回节点

表面上看 dequeue 应该很简单:沿着 tail->next 往前走就行。但瞬态不一致让它变得棘手。先看这个场景:

在这里插入图片描述

想象这个时序:

  1. 队列只有一个元素 A(stub → A,其中 headstub.next 都指向 A)
  2. 生产者 P 执行 prev = head.exchange(n) —— 此时 head = n,但 A->next 还是 null
  3. P 在这时被抢占(OS 切换、缺页、debugger 停住……)
  4. Consumer 来读,它看到 tail = stub, stub->next = A, A->next = null

Consumer 怎么判断此时的状态?如果它以为"A 之后没有元素了",那完全错误 —— 元素 n 已经被 exchange 进来了,只是还没链上。

这也就是为什么 dequeue 的代码比 enqueue 复杂得多 —— 它要处理所有可能的队列状态,不只是"有数据"和"真空"。

dequeue 一共有四个出口,对应四种情况:

出口 语义 消费者应如何应对
tail == stub && next == null 真空,或生产者刚做完 XCHG 还没回链 返回 null,调用者 yield 后重试
next != null 最常走的快速路径,有下一个节点 推进 tail_,返回当前节点
tail != head && next == null 断链窗口:head 已经超过 tail,但中间的 next 还没连上 返回 null,调用者 yield 后重试
tail == head && next == null 队列里只剩最后一个节点,要取走它,但取完后 tail 会悬空 重新 enqueue stub,再取走节点

出口 ④ 是最微妙的一步,也是最多人问"为什么要这么做"的地方。 我们来详细走一遍。

假设当前状态是:

tail → A  (stub 已经被跳过,tail_ 已经指向 A)
A.next = null
head → A  (因为 A 是最新入队的)

Consumer 想取走 A。取走后 tail_ 应该指向谁?队列理论上空了,我们希望回到初始状态 tail == head == stub,但我们不能直接把 tail 设回 stub—— 因为这需要原子地"把 head 也改成 stub",这是两个变量的同步修改,多生产者环境下做不到。

Vyukov 的做法极其巧妙:把 stub 作为普通节点重新入队一次

enqueue(&stub_);   // stub 现在以"普通节点"身份插到 head 后面

enqueue 会做:

prev = head.exchange(&stub);   // 原子地把 head 换成 stub,prev = A
prev->next = &stub;            // 把 A 连上 stub

现在队列状态变成:

tail → A → stub
head → stub

这时 Consumer 再做 next = tail->next.load(),看到 next = stub,可以安全地取走 A —— 把 tail_ 推进到 stub。最终状态:

tail → stub  (空哨兵)
head → stub

完美回到初始状态。代价是:这条路径做了一次额外的 XCHG + store,但它只在"队列正好被读空"这个临界状态触发,平均成本可以忽略。

更精彩的是:如果在 Consumer 刚调用 enqueue(&stub) 之前有生产者也在 enqueue,那么两者会走正常的 XCHG 排序路径,互不干扰 —— stub 被当成普通节点处理,整个算法依然自洽。

3.6 完整代码

// 侵入式 MPSC 队列 —— 节点内存由调用者管理
class VyukovMpscIntrusive
{
public:
    struct Node
    {
        std::atomic<Node *> next;
    };

    // 初始化:tail_和head_都指向stub_,表示队列空
    VyukovMpscIntrusive()
    {
        stub_.next.store(nullptr, std::memory_order_relaxed);
        head_.store(&stub_, std::memory_order_relaxed);
        tail_ = &stub_;
    }

    // ========== Producer 端 ==========
    // 多线程并发调用安全;producer 端是 wait-free,队列整体进展性见 3.8
    void enqueue(Node *n)
    {
        // 初始化新节点。relaxed 就够了 —— 这里还没发布给任何人
        n->next.store(nullptr, std::memory_order_relaxed);

        // [步骤 ①] 原子地把 head 换成 n,拿到旧的 head 作为 prev
        //   acq_rel 的含义:
        //     release 部分 → 把 n 及其初始化写"发布"出去,供后续 acquire 看到
        //     acquire 部分 → 看到之前所有 release 写过的 head(即前驱节点的完整状态)
        //   硬件层面:x86 上是 LOCK XCHG(自带 full barrier)
        Node *prev = head_.exchange(n, std::memory_order_acq_rel);

        // [步骤 ②] 把 n 连到 prev 后面 —— 关闭断链窗口
        //   release 保证:consumer 在用 acquire 读到这个指针后,
        //   能看到 n 的完整内容(包括业务字段)
        //   注意:即便 prev == stub,这里也照样写,因为 stub.next 是 atomic
        prev->next.store(n, std::memory_order_release);
    }

    // ========== Consumer 端 ==========
    // 只能由单一线程调用
    // 返回 nullptr 的情况(对应 3.5 的出口 ①/③ 和 ④ 的二次失败):
    //   a) 队列真空
    //   b) 有生产者在断链窗口内 —— 上层调用者应 yield 后重试
    Node *dequeue()
    {
        Node *tail = tail_;                                      // 本地快照
        Node *next = tail->next.load(std::memory_order_acquire); // 与 enqueue 的 release 配对

        // ---- 情况 A:tail 当前指向 stub ----
        // stub 永远是"位于队头的哨兵",要么 next == null(无数据),
        // 要么 next 指向真正的队头节点
        if (tail == &stub_)
        {
            if (next == nullptr)
            {
                // 出口 ①:可能是真空,也可能是生产者正在断链窗口中
                // 上层无需区分,yield 后重试即可
                return nullptr;
            }
            // stub 后面有真节点,把 stub 跳过去
            tail_ = next;
            tail = next;
            next = tail->next.load(std::memory_order_acquire);
        }

        // ---- 情况 B:快速路径 ----
        // tail 是真节点,且后面还有节点
        if (next != nullptr)
        {
            // 出口 ②:正常消费,推进 tail_,返回当前节点给调用者
            tail_ = next;
            return tail;
        }

        // ---- 情况 C:tail 是真节点,但 next == null ----
        // 两种可能:
        //   (1) tail 确实是队列中最后一个节点
        //   (2) 有生产者已经 XCHG 到 head,但还没写 prev->next
        // 通过比较 tail 和 head 来区分
        Node *head = head_.load(std::memory_order_acquire);
        if (tail != head)
        {
            // 出口 ③:head 已经超过 tail,说明处于断链窗口 —— 等生产者完成
            return nullptr;
        }

        // ---- 情况 D:tail == head,队列里只剩 tail 这一个节点 ----
        // 直接取走 tail 会让 tail_ 悬空(下次 enqueue 时 prev->next 写到哪?)
        // 解决:把 stub 重新作为"普通节点"入队,变成 [tail → stub]
        // 然后就能安全地走情况 B 的快速路径了
        enqueue(&stub_);

        // stub 入队之后,tail->next 应该被设为 &stub(除非又有新生产者插队)
        next = tail->next.load(std::memory_order_acquire);
        if (next != nullptr)
        {
            // 出口 ④ 成功:推进 tail_,返回原 tail
            tail_ = next;
            return tail;
        }
        // 出口 ④ 失败:在 enqueue(stub) 与重新 load 之间又有新生产者插入,
        // 并且那个生产者也卡在断链窗口里。下次调用时重试即可。
        return nullptr;
    }

private:
    alignas(64) std::atomic<Node *> head_; // 生产端热点
    alignas(64) Node *tail_;               // 消费端热点
    alignas(64) Node stub_;                // 独占一行,避免伪共享
};

3.7 内存序为什么是这样选的

逐行辨析:

位置 使用的内存序 为什么
n->next.store(nullptr, relaxed) relaxed 新节点还没发布给任何其他线程,单线程可见即可
head_.exchange(n, acq_rel) acq_rel release 部分:发布新节点 n 的初始化;acquire 部分:拿到 prev 时能看到上游生产者对 prev 的所有 release 写
prev->next.store(n, release) release 发布 n 给 consumer —— consumer 用 acquire 加载 tail->next 就能看到 n 的完整内容
tail->next.load(acquire) acquire 与上面的 release 配对,建立跨线程 happens-before
head_.load(acquire) acquire 判断断链窗口时,需要看到最新的 head 值

最关键的一对 happens-before 是 prev->next.store(release)tail->next.load(acquire),它负责让消费者看到节点的完整内容。head 的 acq_rel 只负责生产者之间的线性化。

💡 常见误区:很多人看到 exchange(acq_rel) 会觉得"既然 acq_rel 这么强,后面的 store 用 relaxed 不就行了?"错 —— acq_rel 作用于 head,但 prev->next 是另一个变量。在 ARM/POWER 这样的弱内存模型上,如果 prev->next.store 用 relaxed,consumer 可能看到 next 指针已经设置,但 n 的 data 还没写入。必须 release。

3.8 Vyukov 算法的进展性

  • producer / push 这一侧是 wait-free 的:每次入队固定做一次 exchange 和一次链接写,没有 CAS 重试环;
  • consumer / pop 这一侧存在 blocking window:如果某个 producer 卡在 exchangeprev->next = n 之间,consumer 需要等那条链接补上,期间可能连续返回空或重试;
  • 因此,整个队列不宜笼统地称为"完全 lock-free"。更准确的表述是:它的 producer 非常强,consumer 侧则受一个极小的瞬态不一致窗口影响。

这也是为什么很多资料会把它描述成:

  • wait-free producers
  • push is blocking with respect to consumer
  • an algorithm with a tiny window of inconsistency

在生产环境中这通常可以接受,因为:

  • 断链窗口只有一条普通链接写;
  • 即便某个 producer 卡在窗口里,其他 producer 仍然可以继续推进 head
  • 这个窗口极短,正常运行时命中的概率很低。

如果场景要求严格 wait-free 的整个队列语义(例如某些硬实时场景),那就要看更重型的 wait-free 队列设计,例如 Kogan-Petrank 一类算法,代价是实现复杂度和常数开销都会显著上升。


四、方案三:基于数组的有界 MPSC

链表版的缺点是每次 enqueue 都要分配节点(除非像 Vyukov 那样侵入式)。在高性能日志、网络收包等场景,我们愿意用有界换取零分配更好的缓存局部性

4.1 设计思想:用序号当状态位

普通环形缓冲区在 MPSC 下的问题是:多个生产者都要抢 write_pos,写完还要"宣告这个槽位可读"。我们需要一个机制让消费者知道"第 k 号槽位是否写完"。

Dmitry Vyukov 的 bounded MPMC 算法 给出了一个漂亮的解法 —— 每个 cell 配一个原子序号。虽然它原本是 MPMC,用在 MPSC 上更简单(消费端不需要 CAS)。

在这里插入图片描述

核心规则:

  • 初始化时 cell[i].seq = i
  • 生产者想写 cell[i] 时,要求 cell[i].seq == enqueue_pos;写完后把 seq 置为 enqueue_pos + 1(表示"该 cell 数据可读")
  • 消费者想读 cell[i] 时,要求 cell[i].seq == dequeue_pos + 1;读完后把 seq 置为 dequeue_pos + capacity(表示"下一轮此 cell 可写")

序号既是版本号又是状态位,一次 atomic load 就能判定当前操作是否合法。

4.2 完整实现

// 数组版本
#include <atomic>
#include <cstddef>
#include <cstdint>

template <typename T, size_t Capacity>
class BoundedMpscQueue
{
private:
    // 数组大小要求为2的幂次
    static_assert((Capacity & (Capacity - 1)) == 0, "Capacity must be a power of two");
    static_assert(Capacity >= 2, "Capacity must be >= 2");

    // 单个元素包含:序号 + 数据
    struct Cell
    {
        std::atomic<size_t> sequence;
        T data;
    };

    alignas(64) Cell buffer_[Capacity]; // 环形队列
    alignas(64) std::atomic<size_t> enqueue_pos_; // 递增入队序号 -- 下一个入队的序号
    alignas(64) size_t dequeue_pos_; // 递增出队序号 -- 下一个出队的序号 - MPSC 下只有消费者读写,无需原子

    static constexpr size_t kMask = Capacity - 1; // 取余掩码

public:
    BoundedMpscQueue() : enqueue_pos_(0), dequeue_pos_(0)
    {
        for (size_t i = 0; i < Capacity; ++i)
        {
            // 初始化:buffer_[i].sequence = i
            buffer_[i].sequence.store(i, std::memory_order_relaxed);
        }
    }

    // 禁止拷贝和赋值
    BoundedMpscQueue(const BoundedMpscQueue &) = delete;
    BoundedMpscQueue &operator=(const BoundedMpscQueue &) = delete;

    // 多生产者调用,返回 false 表示队列满
    bool enqueue(T value)
    {
        Cell *cell; // ptr
        size_t pos = enqueue_pos_.load(std::memory_order_relaxed);

        for (;;)
        {
            cell = &buffer_[pos & kMask];
            size_t seq = cell->sequence.load(std::memory_order_acquire);
            intptr_t diff = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos);

            if (diff == 0)
            {
                // 这个 cell 空闲,尝试抢占 enqueue_pos
                if (enqueue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
                {
                    break; // 抢到了,跳出去写数据
                }
                // 失败:pos 被 CAS 更新为最新值,进入下一次循环
            }
            else if (diff < 0)
            {
                // cell.seq < pos:上一轮写入的数据还没被消费 → 队列满
                return false;
            }
            else
            {
                // diff > 0:其他生产者抢先了,重新加载 pos
                pos = enqueue_pos_.load(std::memory_order_relaxed);
            }
        }

        // 独占了 cell,放入数据
        cell->data = std::move(value);
        // 把 seq 推进到 pos + 1,告诉消费者"可读了"
        cell->sequence.store(pos + 1, std::memory_order_release);
        return true;
    }

    // 单消费者调用,返回 false 表示队列空
    bool dequeue(T &out)
    {
        Cell *cell = &buffer_[dequeue_pos_ & kMask];
        size_t seq = cell->sequence.load(std::memory_order_acquire);
        intptr_t diff = static_cast<intptr_t>(seq) - static_cast<intptr_t>(dequeue_pos_ + 1);

        if (diff < 0)
            return false; // 还没被写入

        // MPSC 下消费者独占 dequeue,不用 CAS
        out = std::move(cell->data);
        // 把 seq 推进到 dequeue_pos + Capacity,告诉"下一轮的生产者"可写
        cell->sequence.store(dequeue_pos_ + Capacity, std::memory_order_release);
        dequeue_pos_++;
        return true;
    }
};

4.3 数组版与链表版的对比

维度 Vyukov 链表侵入式 有界数组
容量 无界(只受内存限制) 编译期固定
内存分配 调用者负责节点 完全静态
缓存局部性 取决于节点布局 极好(连续数组)
满队列行为 不会满 需要上层处理 false
实现复杂度
进展性 见 3.8 enqueue 需要 CAS 循环;工程上可视为非阻塞实现,严格进展性不在本文里单独证明

何时选哪个?

  • 消息速率不可预测、容量难估 → 链表版
  • 硬实时、无法接受 new / GC pause → 数组版
  • 超高吞吐(几千万 msg/s)、数据小 → 数组版(cache 友好)
  • 数据大(例如每条消息几 KB)→ 链表版避免数组内的拷贝

五、关键细节深挖

5.1 关于 ABA 问题

Vyukov MPSC 有没有 ABA?答案:没有。 原因:

  • Vyukov enqueue 用的是 exchange 而不是 compare_exchange —— exchange 不比较,无条件交换,根本不存在"看到旧值以为没变"的问题
  • CAS 链表版虽然用了 CAS,但作用在 tail_ 上。只要节点地址不被复用,这里就没有典型的 ABA;真正需要小心的是:一旦你把链表版改造成节点池 / freelist 复用,指针值就可能循环出现,这时 ABA 才会变成实打实的问题
  • 数组版的 CAS 作用在 enqueue_pos(单调递增的整数),不可能 ABA

如果你把链表版改造成"节点池"以避免频繁 new/delete,就会引入真正的 ABA(CAS 前后读到相同指针但指向的节点已经换过几轮)。这种情况下需要:

  1. 带版本号的指针(tagged pointer),借助 128-bit CAS(x86 的 cmpxchg16b,ARM 的 casp
  2. 用 hazard pointer 保护节点
  3. 用 epoch-based reclamation

所有这些方案都复杂且有额外开销。这也是为什么 Vyukov 侵入式版宁可让 ownership 外置,也不在队列内部引入更通用的节点复用与回收协议。

5.2 内存回收问题

已在 3.3 节讨论,这里把边界再讲清楚:单消费者场景对内存回收的要求,确实比多消费者松一个量级,但不等于"可以无脑 delete"。

  • 多消费者 MPMC 下,一个消费者刚读完一个节点,另一个消费者可能仍在访问它 —— 这类场景通常离不开 hazard pointer、epoch、RCU 一类安全回收协议;
  • 单消费者 MPSC 下,如果队列结构本身能证明"旧前驱被释放之前,不会再有其他线程访问它",那么像第 2 节这种 old-head reclamation(释放旧前驱) 是可以成立的;
  • 但如果你继续泛化:加入节点池复用、freelist、helping、更多读者,或者让"还有没有线程握着这个节点"不再能从结构不变式直接推出,就该认真考虑 hazard pointer / epoch 了;
  • Vyukov 侵入式把"节点什么时候释放"完全交给调用者,队列内部因此根本不需要实现一套自己的 SMR(safe memory reclamation)协议。

5.3 伪共享(False Sharing)

每次 MPSC 代码都要特别强调 alignas(64)。原因:现代 CPU 按缓存行(通常 64 字节)传输数据,如果 head_tail_ 恰好在同一行:

  • 生产者频繁写 head_ → 该行在 P 的 cache 里 Modified
  • 消费者读 tail_ → 该行被迫从 P 传到 C,即使 tail_ 根本没变化

这会导致大量 cache line 弹跳(ping-pong),吞吐量可能掉一个数量级。

正确做法:

alignas(64) std::atomic<Node*> head_;   // 生产者热
alignas(64) Node* tail_;                // 消费者热
alignas(64) Node stub_;                 // 静态,隔离作用

C++17 提供了更优雅的方式:

#include <new>
alignas(std::hardware_destructive_interference_size) std::atomic<Node*> head_;

但目前 hardware_destructive_interference_size 的编译器支持参差不齐,手动 64 最保险。

5.4 CAS 循环的指数退避

方案一和方案三里都有 CAS 循环。在高争用下,简单的 while(!CAS) 会让多个线程疯狂争抢同一个 cache line。优化手法:

for (int spin = 0; !tail_.compare_exchange_weak(prev, n, ...); ) {
    if (++spin > 16) {
        #if defined(__x86_64__)
            __builtin_ia32_pause();   // x86 的 PAUSE
        #elif defined(__aarch64__)
            __asm__ __volatile__("yield");
        #endif
        spin = 0;
    }
}

PAUSE 指令在 x86 上的作用:告诉 CPU “我在自旋”,减少推测执行、降低 pipeline 冲突,释放超线程兄弟核的资源。经验值:在 16~32 次失败后 pause 一下。


六、如何验证你的 MPSC 是对的

无锁代码最常见的故障模式是:“跑起来看似对,压测几小时偶发崩溃,但 gdb 附上去又复现不了”。这一节给出一个能捕捉绝大部分错误的最小验证套件。

6.1 per-producer 单调序号法

核心思想:每个生产者发一个单调递增的 seq,消费者收到后按生产者分桶验证

在这里插入图片描述

为什么这个方法好?—— 它能同时抓住四种错误:

  • 消息丢失:消费者看到某个生产者的 seq 从 3 跳到 5,中间的 4 被吃掉了
  • 消息重复:同一个 (pid, seq) 被收到两次
  • FIFO 破坏:同一生产者的 seq 非单调(如从 5 倒退到 3)
  • 数据污染:pid 越界或 seq 异常 —— 说明读到了半写入的节点

相比之下,只检查总条数(sent==received)的测试是不够的 —— 如果一条消息被丢了、另一条被重复接收了,总数反而是对的。

6.2 验证代码

#include <vector>
#include <atomic>
#include <thread>
#include <cstdio>
#include <cstdlib>

// 验证消息:pid + seq
struct Msg : public VyukovMpscIntrusive::Node {
    int pid;
    int seq;
};

void verify_mpsc(int n_producers, int per_producer) {
    VyukovMpscIntrusive q;

    std::vector<std::thread> producers;
    for (int pid = 0; pid < n_producers; ++pid) {
        producers.emplace_back([&, pid] {
            for (int s = 0; s < per_producer; ++s) {
                auto* m = new Msg();
                m->pid = pid;
                m->seq = s;
                q.enqueue(m);
            }
        });
    }

    std::thread consumer([&] {
        // last[pid] = 上次收到的 seq;初始为 -1
        std::vector<int> last(n_producers, -1);
        int total = n_producers * per_producer;
        int received = 0;

        while (received < total) {
            auto* node = q.dequeue();
            if (!node) {
                std::this_thread::yield();
                continue;
            }
            auto* m = static_cast<Msg*>(node);

            // -------- 四重检查 --------
            if (m->pid < 0 || m->pid >= n_producers) {
                fprintf(stderr, "污染:pid 越界 %d\n", m->pid);
                std::abort();
            }
            if (m->seq < 0 || m->seq >= per_producer) {
                fprintf(stderr, "污染:pid=%d seq 越界 %d\n", m->pid, m->seq);
                std::abort();
            }
            int expected = last[m->pid] + 1;
            if (m->seq != expected) {
                fprintf(stderr,
                    "FIFO 违反:pid=%d expected seq=%d, got seq=%d "
                    "(丢失/重复/乱序)\n",
                    m->pid, expected, m->seq);
                std::abort();
            }
            last[m->pid] = m->seq;
            received++;
            delete m;
        }

        // -------- 收尾检查:每个生产者都应该收到完整的序列 --------
        for (int i = 0; i < n_producers; ++i) {
            if (last[i] != per_producer - 1) {
                fprintf(stderr,
                    "生产者 %d 未收齐: last=%d, expected=%d\n",
                    i, last[i], per_producer - 1);
                std::abort();
            }
        }
        printf("验证通过:%d 条消息全部按序收到\n", total);
    });

    for (auto& t : producers) t.join();
    consumer.join();
}

6.3 配合 ThreadSanitizer

即便验证测试跑通了,也可能存在数据竞争(两个线程无序地读写同一个非原子变量)。这种 bug 在宽松的 x86 上经常隐而不发,但会在 ARM 上炸掉。Clang/GCC 的 TSan 是排查这类问题的核武器:

g++ -std=c++17 -O1 -g -fsanitize=thread -pthread mpsc_demo.cpp -o mpsc_tsan
./mpsc_tsan --verify

TSan 会在任何数据竞争点打印详细的栈。无锁代码里 TSan 偶尔会误报(比如它不理解某些高级回收协议),但对基础并发错误仍然非常有用。

不过这里要提醒一件很重要的事:看到 TSan 报告,并不等于可以立刻把它翻译成"某个经典并发 bug 的完整因果链"。 像第 2 节这种 old-head reclamation 结构,真正需要你核对的是:

  1. 报警的地址是不是同一个逻辑节点,还是 allocator 复用了同一块地址;
  2. consumer 回收旧前驱的前提是否已经满足;
  3. 这个问题究竟是真实的 use-after-free地址复用导致的可疑并发写,还是你对队列不变式理解错了

因此,TSan 在本文里更适合承担两个角色:

  • 第一层筛查:抓明显的数据竞争、非原子访问、遗漏的同步;
  • 辅助定位:告诉你"这里有并发写/并发读写值得看",但最终定性仍要回到算法不变式本身。

对第 2 节 CAS 教学版,更值得重点验证的是两类问题:

  • 断链窗口下的行为:producer 已经推进了 tail_,但 prev->next 还没补上时,consumer 会不会把"暂时不可见"误判成"真实为空";
  • 回收收尾是否完整:最后一个已消费节点会滞留为新的前驱,析构时是否显式释放了这个最终前驱。

所以,TSan 很重要,但不要把它当成自动判案机;它给你的是线索,最终结论仍然要靠时序推演和不变式验证来下。

6.4 更严格的:Relacy Race Detector

TSan 只能检测实际发生的 race,而 Relacy 能穷举所有可能的内存序列来发现潜在的 race。它是一个单头 C++ 库,把你的测试代码放进它的框架里:

#include <relacy/relacy_std.hpp>
// 把原来的 std::atomic 换成 rl::atomic
// 把 std::thread 换成 rl::thread
// 用 $() 包住每个原子操作

Relacy 会模拟各种可能的执行顺序,特别擅长抓 Vyukov 这种"实际 CPU 上几乎不会触发但理论上有风险"的竞争。缺点是写起来很繁琐。工业级 MPSC 实现一般都会过一遍 Relacy。

6.5 压力测试要持续多久?

经验值:

  • 几秒钟的跑通只能证明基础逻辑对,说明不了什么
  • 10 分钟:能抓到大部分明显的竞争
  • 几小时:能抓到极小概率的内存序问题
  • 长期 fuzz(几天):能抓到硬件异常引发的边界情况

建议做法:写好验证测试后,在 CI 上跑 30 分钟,并在 ARM/ARM64 机器上至少跑一次(x86 的强内存模型会掩盖很多问题)。


七、常见陷阱自检清单

实现 MPSC 时建议逐项检查:

  • stub 节点初始化:空队列时 head == tail == &stub,且 stub.next == nullptr
  • headtail 分离缓存行:至少相差 64 字节
  • enqueue 中 exchangeacq_relprev->next.storerelease
  • dequeue 中所有 loadacquire
  • dequeue 处理断链窗口next == nullptrtail != head 时返回空等重试,不要把暂时不可见误判成真正空队列
  • dequeue 处理"只剩最后一个节点":re-enqueue stub 逻辑不能缺
  • 节点内存归属清晰:侵入式场景下让调用者负责 delete;非侵入式场景要明确 old-head reclamation 的边界
  • 析构能收尾:如果最后一个已消费节点会滞留为新前驱,析构函数要显式释放它
  • 容量是 2 的幂(数组版),可以用 & mask 代替昂贵的 % capacity
  • CAS 循环有退避:至少加个 pause 指令
  • 有正确性验证测试:per-producer 单调序号 + TSan
  • 弱内存模型测试:如果能在 ARM/ARM64 机器上跑一下最好

八、延伸阅读

按推荐顺序:

  1. Dmitry Vyukov 的 1024cores.net —— 无锁算法圣经级资源,尤其推荐他对 bounded MPMCintrusive MPSC 的分析
  2. Maged Michael & Scott, “Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms” (1996) —— 奠基性论文,读懂它能把链表 MPMC 也拿下
  3. Herlihy & Shavit, The Art of Multiprocessor Programming —— 第 10 章专门讲并发队列
  4. Paul E. McKenney, Is Parallel Programming Hard? —— 对内存模型和 RCU 讲得极透彻,免费 PDF
  5. moodycamel::ConcurrentQueue 源码 —— C++ 工业级 MPMC 实现,代码注释详尽
  6. Kogan & Petrank, “Wait-Free Queues With Multiple Enqueuers and Dequeuers” (2011) —— 想深入 wait-free 队列再看
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐