C++ 无锁队列实现:从 SPSC 到 MPSC
C++ 无锁队列实现:从 SPSC 到 MPSC
本文围绕一个有界环形队列展开,先用 SPSC 版本说明最基本的生产者消费者模型,再重点分析 MPSC 版本的迭代过程。MPSC 的核心难点不在“多个线程怎么拿到不同位置”,而在“多个生产者完成构造的顺序可能和认领位置的顺序不同”,因此不能把 tail_ 当作可读边界,也不能把 commit_ 简单理解成完成数量,最终必须使用 commit_ + slot_ticket_ 表示连续可读前缀和每个槽位对应的精确票号
一、先把无锁队列的边界讲清楚
1. lock-free、wait-free、blocking 的区别
在并发队列里,“无锁”不是“不需要等待”,也不是“一定比加锁更快”。blocking 队列依赖互斥锁或条件变量,如果持锁线程被挂起,其他线程可能全部卡住;lock-free 强调系统整体能继续向前推进,通常依赖 CAS 这样的原子原语;wait-free 更强,要求每个线程的每次操作都能在有限步骤内完成。工程里经常把“不使用 mutex 的队列”也称为无锁队列,但严格进度保证要单独分析,不能只看代码里有没有 std::mutex
本文的 MPSC 环形队列没有使用互斥锁,生产者之间靠 CAS 抢票,消费者是单线程 Pop。从严格进度角度看,如果某个生产者在认领 tail_ 之后、构造对象之前永久停止,那么后面的 commit_ 会被这个洞卡住,消费者也无法越过它读取后续元素,所以它更准确地说是一个 lock-free 风格的无互斥锁 bounded MPSC ring,而不是 wait-free 队列
2. 为什么先区分 SPSC、MPSC、MPMC
生产者消费者队列没有通用最优解,最重要的分类维度就是生产者和消费者数量:SPSC 是单生产者单消费者,MPSC 是多生产者单消费者,SPMC 是单生产者多消费者,MPMC 是多生产者多消费者。线程数量越多,共享状态就越多,原子竞争和顺序约束也越复杂,所以只有一个生产者和一个消费者时,应该优先使用 SPSC,而不是直接上 MPMC
本文的 SPSC 队列只需要 read_ 和 write_ 两个下标,生产者只写 write_,消费者只写 read_;MPSC 队列则多了生产者之间的抢票问题,以及“票号已经被认领,但对象还没构造完成”的发布问题
二、SPSC RingBuffer:最简单的环形无锁队列
1. SPSC 的核心状态
SPSC 版本的状态非常少:read_ 表示下一个要读的位置,write_ 表示下一个要写的位置,底层数组不是 T buffer_[Capacity],而是 std::aligned_storage_t<sizeof(T), alignof(T)> buffer_[Capacity]。这样做的目的,是让队列自己控制对象的构造和析构:Push 时用 placement new 构造对象,Pop 时 move 出对象并手动调用析构
因为代码使用 next_w == read_ 判断队列满,所以这个 SPSC 队列会浪费一个槽位,Capacity 为 64 时实际最多存 63 个元素。这是很多环形队列常见的设计,用一个空槽区分“满”和“空”
2. Push 和 Pop 的发布关系
SPSC 的 Push 逻辑可以概括为三步:读取当前 write_,判断下一个写位置是否撞上 read_,如果没有满,就在 buffer_[w] 上构造对象,最后用 write_.store(next_w, std::memory_order_release) 发布写入完成。消费者 Pop 时先读取 read_,再用 write_.load(std::memory_order_acquire) 判断是否为空,只要 acquire 看到了生产者 release 发布的 write_,就能安全读取对应槽位中的对象
bool Push(U&& value) {
const std::size_t w = write_.load(std::memory_order_relaxed);
const std::size_t next_w = (w + 1) & (Capacity - 1);
if (next_w == read_.load(std::memory_order_acquire)) {
return false;
}
new (&buffer_[w]) T(std::forward<U>(value));
write_.store(next_w, std::memory_order_release);
return true;
}
bool Pop(T& value) {
const std::size_t r = read_.load(std::memory_order_relaxed);
if (r == write_.load(std::memory_order_acquire)) {
return false;
}
value = std::move(*reinterpret_cast<T*>(&buffer_[r]));
reinterpret_cast<T*>(&buffer_[r])->~T();
read_.store((r + 1) & (Capacity - 1), std::memory_order_release);
return true;
}
这里容易误解的是 buffer_[r] 的类型不是 T,而是一个满足大小和对齐要求的原始存储块,所以 value = buffer_[r]、static_cast<T>(buffer_[r]) 都是错的。正确方式是先把该地址解释为 T*,再解引用读取对象,也就是 *reinterpret_cast<T*>(&buffer_[r]),读取之后还要手动析构,因为这个对象不是普通数组自动管理生命周期
3. SPSC 的应用例子
一个典型 SPSC 场景是“采集线程突发生产,处理线程匀速消费”,例如网卡收包线程把 Packet 放入队列,业务线程从队列取出 Packet 做解码、写库或进一步处理。测试程序里构造了 RingBuffer<Packet, 64>,生产者每 2ms 产生一批最多 48 个包,消费者每处理一个包 sleep 50us 模拟较重业务,最后用 seq 校验顺序和无丢失
struct Packet {
std::uint64_t seq;
int payload;
};
RingBuffer<Packet, 64> queue;
constexpr int kTotalPackets = 5000;
constexpr int kBurstSize = 48;
// 生产者突发写入,消费者按顺序读取并校验 seq
SPSC 的关键前提是“只有一个线程写 write_,只有一个线程写 read_”。一旦生产者变成多个,这个结构马上不够用,因为多个生产者会同时争抢同一个写位置,必须进入 MPSC 的设计
三、MPSC RingBuffer 的问题本质
1. MPSC 不能只把 SPSC 的 write_ 改成 atomic
MPSC 是多个生产者、一个消费者。看起来只要把 SPSC 的 write_ 换成 tail_.fetch_add(1) 就能让每个生产者拿到不同下标,但这只解决“抢位置”的问题,没有解决“什么时候可以读”的问题。生产者拿到位置之后还要执行 placement new,而不同生产者构造对象的耗时不一样,可能出现后拿票的线程先构造完成、先拿票的线程还没构造完成
所以 MPSC 中至少要区分三个概念:tail_ 表示下一个可认领票号,生产者 CAS 成功后就说明这个票号被占用;buffer_[ticket & mask] 表示真实存储槽位;commit_ 表示从 head_ 开始连续可读的上界,消费者只能读取 [head_, commit_) 这个连续区间
2. 正确版本的状态设计
最终版本使用了四类核心状态:head_ 由消费者推进,表示下一个要读的票号;tail_ 由多个生产者 CAS 推进,表示下一个待认领票号;commit_ 表示连续可读上界,不是已经完成的元素数量;slot_ticket_[idx] 记录某个槽位当前构造完成的精确票号,kEmpty 表示槽位为空
std::atomic<std::size_t> head_{0};
std::atomic<std::size_t> tail_{0};
std::atomic<std::size_t> commit_{0};
std::aligned_storage_t<sizeof(T), alignof(T)> buffer_[Capacity];
std::atomic<std::size_t> slot_ticket_[Capacity];
static constexpr std::size_t kEmpty = static_cast<std::size_t>(-1);
这个设计有一个非常重要的思想:环形数组的下标会复用,但票号单调递增。idx = ticket & (Capacity - 1) 只表示落在哪个槽位,不能表示“这是第几轮复用”。如果只用 bool 型 ready 标记槽位是否有数据,在环形复用时就会出现歧义;而 slot_ticket_[idx] == ticket 可以同时表达“这个槽位有对象”以及“这个对象正好属于当前要发布的票号”
3. Push 的正确流程
MPSC Push 分为三段:第一段 CAS 认领票号,第二段在对应槽位构造对象,第三段写入 slot_ticket_ 并尝试推进 commit_。CAS 只负责抢票,不负责发布数据,所以 tail_.compare_exchange_weak 可以使用 relaxed;真正的数据发布发生在 slot_ticket_[idx].store(w, release) 和 commit_ 的 release CAS 上
bool Push(const T& value) {
std::size_t w = 0;
do {
w = tail_.load(std::memory_order_relaxed);
if (w - head_.load(std::memory_order_acquire) >= Capacity) {
return false;
}
} while (!tail_.compare_exchange_weak(
w, w + 1,
std::memory_order_relaxed, std::memory_order_relaxed));
const std::size_t idx = w & (Capacity - 1);
new (&buffer_[idx]) T(value);
slot_ticket_[idx].store(w, std::memory_order_release);
publish_commit();
return true;
}
这里不能简单写成“先判断是否满,再 fetch_add”,因为多个生产者可能同时看到队列还有空间,然后都执行 fetch_add,结果认领数量超过剩余容量。CAS 循环把“读取候选 tail、判断容量、认领 tail”绑定成一个重试过程,失败后必须重新读取最新 tail_ 和 head_ 再判断
四、MPSC 的迭代过程:为什么前面的版本不对
1. 第一版:没有 commit_,直接用 tail_ 当可读上界
最直觉的错误版本是生产者 tail_++ 后构造对象,消费者只要发现 head_ < tail_ 就读取。这在 SPSC 中通常成立,因为唯一生产者会按顺序构造;但在 MPSC 中,tail_ 只能说明“票号被认领了”,不能说明“对象已经构造好了”
// 错误思路:tail_ 被推进后,消费者就认为对应位置可读
std::size_t w = tail_.fetch_add(1, std::memory_order_relaxed);
new (&buffer_[w & (Capacity - 1)]) T(value);
举一个具体并发时序:生产者 A 拿到票号 0,然后被调度器切走,还没有执行 placement new;生产者 B 拿到票号 1,并且很快构造完成;此时 tail_ 可能已经是 2,消费者看到 head_ < tail_,就会读取票号 0 对应槽位,但票号 0 的对象根本还没有构造,这就是未构造读取
这一版的根本错误是把“认领顺序”当成了“完成顺序”。MPSC 的生产者之间没有构造顺序保证,所以 tail_ 不能作为读边界,只能作为认领边界
2. 第二版:只使用 commit_,每个生产者完成后直接 commit_++
发现 tail_ 不能表示可读后,很容易想到加一个 commit_,生产者构造完成后让 commit_++,消费者只读 head_ < commit_ 的元素。这个思路仍然是错的,因为 commit_ 不能表示“完成数量”,而必须表示“从 0 开始连续完成到哪里”
// 错误思路:构造完成后直接推进 commit_
new (&buffer_[idx]) T(value);
commit_.fetch_add(1, std::memory_order_release);
继续看时序:A 拿到票号 0 后还没构造,B 拿到票号 1 后先构造完成,如果 B 直接 commit_++,commit_ 从 0 变成 1。消费者看到 head_=0, commit_=1,会认为票号 0 可读,但票号 0 仍然没有构造完成。注意,B 完成的是票号 1,但 commit_=1 表示的是 [0, 1) 可读,也就是票号 0 可读,这两个含义完全不是一回事
所以 commit_ 的含义必须固定为“连续可读前缀的右边界”。只有当票号 0 完成时,commit_ 才能从 0 推到 1;只有票号 1 也完成时,commit_ 才能从 1 推到 2;票号 2 先完成也不能越过票号 1
3. 第三版:commit_ 扫描 tail_,但没有记录每个槽位是否完成
另一个错误方向是写一个 publish_commit,从当前 commit_ 往后推进到 tail_。这个版本看上去比直接 commit_++ 更像“连续发布”,但如果只看 tail_,仍然不知道某个票号对应的对象是否已经构造完成
// 错误思路:tail_ 只是认领上界,不能作为构造完成上界
void publish_commit(std::size_t w) {
std::size_t c = commit_.load();
const std::size_t mxw = tail_.load();
while (c == w && w < mxw) {
if (commit_.compare_exchange_weak(c, c + 1)) {
++c;
++w;
} else {
c = commit_.load();
}
}
}
这个版本的关键问题是:tail_ 表示“已经有人拿了这些票”,不是“这些票都已经完成构造”。如果 A 拿到票号 0 后停住,B、C 分别拿到票号 1、2 并完成,tail_ 可以是 3,但 commit_ 仍然只能停在 0。推进 commit_ 时必须逐个判断“票号 c 本身是否完成”,不能只看 tail_
4. 第四版:commit_ + ready_,为什么 bool ready 还不够
在第三版之后,自然会想到给每个槽位加一个 ready_[idx],生产者构造完成后把 ready_[idx] = true,publish_commit 从 commit_ 开始扫描,只要当前槽位 ready 就向前推进。这一版解决了“是否完成构造”的问题,但还没有解决“环形数组槽位复用”的问题
// 仍然有隐患:ready_ 只能说明这个槽位曾经 ready,不能说明它属于当前票号 c
ready_[idx].store(true, std::memory_order_release);
while (ready_[c & (Capacity - 1)].load(std::memory_order_acquire)) {
commit_.compare_exchange_weak(c, c + 1, std::memory_order_release);
}
问题出在 idx = ticket & (Capacity - 1)。假设 Capacity = 4,票号 0 和票号 4 都会映射到槽位 0。ready_[0] == true 时,它到底表示“票号 0 已经构造完成”,还是“票号 4 已经构造完成”,单靠一个 bool 无法回答。只要清理时机或内存序稍有不慎,publish_commit 就可能把上一轮复用留下的 ready 当成当前票号的 ready,从而提前发布一个未构造或不属于当前票号的元素
这就是从 ready_ 走向 slot_ticket_ 的原因:ready 只能表达状态,ticket 才能表达身份。环形队列的槽位会复用,所以每个槽位的完成标记必须携带“这一轮属于哪个单调票号”
5. 最终版:commit_ + slot_ticket_,发布连续完成的票号前缀
最终版本把 ready_[idx] 升级成 slot_ticket_[idx]。生产者构造完成后不只是写 true,而是写入自己的票号 w;publish_commit 从当前 commit_ = c 开始检查,只有当 slot_ticket_[c & mask] == c 时,才说明“当前需要发布的票号 c 已经在正确槽位构造完成”,然后才可以把 commit_ 推进到 c + 1
void publish_commit() {
std::size_t c = commit_.load(std::memory_order_relaxed);
while (slot_ticket_[c & (Capacity - 1)].load(std::memory_order_acquire) == c) {
if (commit_.compare_exchange_weak(
c, c + 1,
std::memory_order_release, std::memory_order_relaxed)) {
++c;
} else {
c = commit_.load(std::memory_order_relaxed);
}
}
}
这段逻辑的精髓是“只发布连续前缀”。如果票号 1、2、3 都已经完成,但票号 0 还没完成,那么 commit_ 必须停在 0;等票号 0 完成并写入 slot_ticket_[0] = 0 后,任何一个生产者调用 publish_commit 都可以从 0 开始连续扫描,把 commit_ 一口气推进到第一个未完成票号之前
五、消费者 Pop 为什么只看 commit_,不看 tail_
1. commit_ 是消费者唯一可信的可读边界
Pop 的逻辑很短:先读取 head_,再读取 commit_,如果二者相等说明没有连续可读元素;如果 head_ < commit_,就可以读取 head_ 对应槽位。这里不需要看 tail_,因为 tail_ 包含了已认领未完成的洞,而 commit_ 已经由 slot_ticket_ 校验过,表示连续完成的可读上界
bool Pop(T& value) {
const std::size_t r = head_.load(std::memory_order_acquire);
if (r == commit_.load(std::memory_order_acquire)) {
return false;
}
std::size_t idx = r & (Capacity - 1);
value = std::move(*reinterpret_cast<T*>(&buffer_[idx]));
reinterpret_cast<T*>(&buffer_[idx])->~T();
slot_ticket_[idx].store(kEmpty, std::memory_order_release);
head_.fetch_add(1, std::memory_order_release);
return true;
}
commit_.load(acquire) 和 publish_commit 中的 release CAS 配对,保证消费者看到 commit_ 推进后,也能看到生产者在槽位里构造好的对象。读取完成后,消费者先析构对象,再把 slot_ticket_[idx] 清成 kEmpty,最后 release 推进 head_,让生产者 acquire 读取 head_ 时知道这个槽位已经可以复用
2. 为什么 Pop 里必须手动析构
底层 buffer 是原始存储,不是 T 数组。Push 使用 new (&buffer_[idx]) T(value) 在原地构造对象,所以 Pop 读取后必须调用 reinterpret_cast<T*>(&buffer_[idx])->~T() 销毁对象。对于 int 这类平凡类型,看起来不析构也许没问题,但模板队列要支持 std::string、智能指针、复杂对象,就必须严格管理生命周期
这里还要注意 reinterpret_cast<T>(buffer_[idx]) 是错误的,因为它试图把一个 storage 对象转换成 T;reinterpret_cast<T*>(&buffer_[idx]) 才是在同一块地址上获得 T 指针。换句话说,placement new 负责“在这块内存里生出一个 T”,reinterpret_cast 指针负责“按 T 的方式访问这块已经构造好的对象”
六、用具体并发时序理解 commit_ + slot_ticket_
1. 乱序完成但顺序发布
假设 Capacity = 8,当前 head_ = 0, tail_ = 0, commit_ = 0,三个生产者同时 Push。P0 CAS 成功拿到票号 0,但构造很慢;P1 拿到票号 1 并完成,写入 slot_ticket_[1] = 1;P2 拿到票号 2 并完成,写入 slot_ticket_[2] = 2。此时 publish_commit 从 c = 0 开始检查 slot_ticket_[0] == 0,发现不成立,所以 commit_ 仍然是 0,消费者不会读取任何元素
等 P0 完成构造并写入 slot_ticket_[0] = 0,再次调用 publish_commit 时,扫描过程会看到 slot_ticket_[0] == 0、slot_ticket_[1] == 1、slot_ticket_[2] == 2,于是可以把 commit_ 连续推进到 3。消费者随后按票号 0、1、2 读取,既不会读到未构造对象,也不会破坏 FIFO 顺序
2. 环形复用时为什么 ticket 比 ready 更安全
假设 Capacity = 4,票号 0、1、2、3 使用下标 0、1、2、3,票号 4 又回到下标 0。如果只用 ready_[0],发布线程看到 true 时无法知道它属于票号 0 还是票号 4;如果使用 slot_ticket_[0],那么发布票号 4 时必须看到 slot_ticket_[0] == 4,旧的 slot_ticket_[0] == 0 不可能误判为票号 4 已完成
这就是整个迭代中最关键的一步:commit_ 解决“只能按连续前缀发布”的问题,slot_ticket_ 解决“环形槽位复用时如何确认当前票号身份”的问题,二者缺一不可
七、内存序该如何理解
1. tail_ 的 CAS 为什么可以 relaxed
tail_ 的作用只是给生产者分配唯一票号,它不向消费者发布数据,也不表示对象完成。因此 CAS 成功与否只影响哪个生产者拿到哪个 w,不承担跨线程可见性发布任务。真正需要同步的是构造完成后的 slot_ticket_,以及最终发布给消费者的 commit_
2. slot_ticket_ 的 release/acquire 发布对象构造完成
生产者先执行 placement new,再执行 slot_ticket_[idx].store(w, release),这表示“对象构造完成,并且这个槽位属于票号 w”。publish_commit 用 acquire 读取 slot_ticket_,如果读到精确票号,就可以确认构造对当前线程可见,再用 release CAS 推进 commit_
3. commit_ 的 release/acquire 发布可读边界
commit_ 是消费者判断可读的最终依据。生产者推进 commit_ 时使用 release,消费者读取 commit_ 时使用 acquire,只要消费者看到 r < commit_,就说明这个票号对应的对象已经经过 slot_ticket_ 校验,并且对象内容对消费者可见
4. head_ 的 release/acquire 发布槽位释放
消费者 Pop 完成后析构对象,清空 slot_ticket_,然后 head_.fetch_add(1, release) 宣布槽位已释放。生产者 Push 中用 acquire 读取 head_,用于判断容量和槽位复用安全性。这里的同步方向和 commit_ 相反:commit_ 是生产者发布给消费者,head_ 是消费者发布给生产者
八、测试案例:如何证明没有丢失和重复
1. 基础 SPSC 顺序测试
基础测试创建一个 RingBuffer<int, 32>,生产者连续 Push 0 到 9999,消费者连续 Pop 并断言 val == i。这个测试覆盖了 SPSC 的顺序性、环形回绕、队列满时自旋等待和队列空时自旋等待
RingBuffer<int, 32> queue;
constexpr int N = 10000;
void producer() {
for (int i = 0; i < N; ++i) {
while (!queue.Push(i)) {}
}
}
void consumer() {
int val = 0;
for (int i = 0; i < N; ++i) {
while (!queue.Pop(val)) {}
assert(val == i);
}
}
2. SPSC 突发生产测试
第二个测试用 Packet 模拟真实数据流,生产者突发写入,消费者较慢处理,并通过 seq 确保包的顺序完全一致。这个例子说明 SPSC 队列适合“一个采集线程到一个处理线程”的流水线结构,队列的作用是吸收短时间速率差,而不是无限缓存
3. MPSC 多生产者测试
MPSC 测试创建 MPSCRingBuffer<int, 64>,4 个生产者各自写入 2500 个不同整数,总数 10000。消费者用 std::vector<bool> seen 记录是否读过某个值,断言每个值都在合法范围内,并且没有重复出现。这个测试不要求 Pop 顺序等于业务值大小,因为业务值由不同生产者生成;它真正验证的是无丢失、无重复、无未构造读取导致的非法值
MPSCRingBuffer<int, 64> queue;
constexpr int kProducers = 4;
constexpr int kPerProducer = 2500;
constexpr int kTotal = kProducers * kPerProducer;
void producer(int id) {
for (int i = 0; i < kPerProducer; ++i) {
const int ticket = id * kPerProducer + i;
while (!queue.Push(ticket)) {}
}
}
void consumer() {
std::vector<bool> seen(kTotal, false);
int got = 0;
while (got < kTotal) {
int v = 0;
while (!queue.Pop(v)) {}
assert(v >= 0 && v < kTotal);
assert(!seen[v]);
seen[v] = true;
++got;
}
}
实际运行结果如下,说明 SPSC 顺序正确,MPSC 在 4 个生产者并发写入时没有丢失和重复
[basic] 10000 items, order OK
[spsc_pipeline] processed 5000 packets in order, no loss
[mpsc] 4 producers, 10000 items, no loss, no duplicate
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)