concurrentqueue:一个高并发高性能的C++无锁队列
1.并发队列介绍
在多线程 编程中,concurrentqueue(并发队列)是一种支持多线程安全访问的队列数据结构,主要用于解决多个线程同时进行 “入队(enqueue)” 和 “出队(dequeue)” 操作时的数据竞争问题,确保线程安全和数据一致性。
并发队列是多线程环境中线程间通信的核心组件,典型场景包括:
- 生产者 - 消费者模型:生产者线程往队列中写入数据,消费者线程从队列中读取数据,通过队列协调线程间的数据传递。
- 任务调度:线程池中的任务队列,多个工作线程从队列中获取任务执行。
实现方式
并发队列的实现需保证 “线程安全”,常见实现方式有两种:
1)基于锁的实现(Lock-based)
通过互斥锁(mutex)保证同一时间只有一个线程操作队列,配合条件变量(condition_variable)处理 “队列空时消费者等待” 或 “队列满时生产者等待” 的场景。
优点:实现简单,逻辑清晰,适合大多数场景。
缺点:高并发下锁竞争可能导致性能瓶颈。
C++ 示例(简单的有界并发队列):
#include <queue>
#include <mutex>
#include <condition_variable>
#include <stdexcept>
template <typename T>
class BoundedConcurrentQueue {
private:
std::queue<T> queue_;
mutable std::mutex mutex_; // 保护队列的互斥锁
std::condition_variable not_full_; // 队列不满时通知生产者
std::condition_variable not_empty_; // 队列非空时通知消费者
size_t max_size_; // 队列最大容量(有界)
public:
explicit BoundedConcurrentQueue(size_t max_size) : max_size_(max_size) {}
// 入队(阻塞,直到队列有空间)
void enqueue(const T& item) {
std::unique_lock<std::mutex> lock(mutex_);
// 等待队列不满
not_full_.wait(lock, [this] { return queue_.size() < max_size_; });
queue_.push(item);
not_empty_.notify_one(); // 通知消费者队列非空
}
// 出队(阻塞,直到队列有元素)
T dequeue() {
std::unique_lock<std::mutex> lock(mutex_);
// 等待队列非空
not_empty_.wait(lock, [this] { return !queue_.empty(); });
T item = queue_.front();
queue_.pop();
not_full_.notify_one(); // 通知生产者队列有空间
return item;
}
// 尝试入队(非阻塞,失败返回false)
bool try_enqueue(const T& item) {
std::lock_guard<std::mutex> lock(mutex_);
if (queue_.size() >= max_size_) {
return false;
}
queue_.push(item);
not_empty_.notify_one();
return true;
}
// 尝试出队(非阻塞,失败返回false)
bool try_dequeue(T& item) {
std::lock_guard<std::mutex> lock(mutex_);
if (queue_.empty()) {
return false;
}
item = queue_.front();
queue_.pop();
not_full_.notify_one();
return true;
}
size_t size() const {
std::lock_guard<std::mutex> lock(mutex_);
return queue_.size();
}
};
2)无锁实现(Lock-free)
不依赖锁,而是通过原子操作(atomic operations)(如std::atomic)和CAS(Compare-And-Swap) 机制实现线程安全。核心是通过原子指令保证操作的原子性,避免锁竞争。
优点:高并发下性能更好(无锁开销),适合对性能要求极高的场景(如高频交易、实时数据处理)。
缺点:实现复杂,需处理 “ABA 问题”“内存顺序” 等细节,调试难度大。
实现关键点:
- 用原子指针管理队列节点(如链表节点)。
- 通过 CAS 操作保证节点插入 / 删除的原子性。
- 处理多生产者 / 多消费者场景下的并发冲突。
2.concurrentqueue简介
ConcurrentQueue 是一个线程安全的先进先出(FIFO)集合,允许多个生产者线程和多个消费者线程同时操作。它基于 C++11 的特性实现,完全无锁(lock-free),这意味着它通过原子操作来保证线程安全,而不是依赖传统的互斥锁。这种设计不仅提高了性能,还减少了锁带来的开销。
主要特性有:
- 高性能:ConcurrentQueue 是一个高性能的并发队列,特别是在多线程环境下表现优异。它支持快速的批量操作,例如批量入队和批量出队,这些操作的速度甚至可以接近非并发队列。
- 单头文件实现:整个队列的实现仅包含在一个头文件中,这使得它非常易于集成到任何项目中。
- 完全线程安全:ConcurrentQueue 是一个完全无锁的队列,可以在任意数量的线程中安全地使用。
- 支持 C++11 的移动语义:在可能的情况下,队列会移动元素而不是复制它们,从而提高效率。
- 灵活的内存管理:内存可以一次性分配,也可以根据需要动态分配。
- 无限制的元素类型和数量:队列对元素类型或最大数量没有人为限制。
- 支持数据类型:支持任意类型元素(包括非可复制、非可移动类型)。
- 可移植性:跨平台(Windows、Linux、macOS),仅依赖 C++11 标准库。
- 支持模式:提供阻塞和非阻塞两种操作模式。
- 异常安全:即使在并发操作中,队列也能保证异常安全。
- 批量操作:队列支持批量入队和出队操作,这可以显著减少操作的开销。
- 自定义特性:可以通过自定义特性模板参数来调整队列的行为,例如内存分配和块大小。
- 无构造函数的类型出队:对于没有默认构造函数的类型,可以通过包装类来避免构造函数的调用。
仓库地址:https://github.com/cameron314/concurrentqueue
无锁队列测试效果统计:A Fast General Purpose Lock-Free Queue for C++
3.concurrentqueue核心类与接口
3.1.基础并发队列:moodycamel::ConcurrentQueue<T>
模板参数 T 为队列中元素的类型(需支持移动语义)
1.构造与析构
// 默认构造
moodycamel::ConcurrentQueue<int> q;
// 带初始容量提示(非强制,仅优化内存预分配)
moodycamel::ConcurrentQueue<std::string> q(1024); // 提示初始可能存储1024个元素
2.入队操作(生产者)
-
enqueue:入队一个元素(成功返回true,内存不足时返回false)。
template <typename U>
bool enqueue(U&& item); // 接收右值(推荐,避免拷贝)
//示例:
q.enqueue(42); // 入队int
q.enqueue(std::string("hello")); // 入队字符串(右值)
std::string s = "world";
q.enqueue(std::move(s)); // 移动入队(s变为空)
-
try_enqueue:与enqueue功能相同,是enqueue的别名(更符合 C++ 命名习惯)。 -
批量入队:
enqueue_bulk一次性入队多个元素(效率高于多次单元素入队)
// 入队[first, last)范围内的元素
template <typename It>
bool enqueue_bulk(It first, size_t count);
//示例:
std::vector<int> nums = {1, 2, 3, 4};
q.enqueue_bulk(nums.begin(), nums.size()); // 批量入队4个元素
3.出队操作(消费者)
-
try_dequeue:尝试出队一个元素(成功返回true,队列空时返回false)。
bool try_dequeue(T& item); // 出队元素存入item
//示例:
int val;
if (q.try_dequeue(val)) {
std::cout << "出队元素: " << val << std::endl;
} else {
std::cout << "队列空" << std::endl;
}
-
批量出队:
try_dequeue_bulk一次性出队多个元素(最多max_count个):
// 出队元素存入[out, out + max_count),返回实际出队数量
template <typename It>
size_t try_dequeue_bulk(It out, size_t max_count);
//示例:
int buffer[10];
size_t dequeued = q.try_dequeue_bulk(buffer, 10); // 最多出队10个
std::cout << "实际出队: " << dequeued << "个" << std::endl;
4.其他常用方法
-
size_approx():返回队列中元素的近似数量(无锁环境下无法精确计数,仅供参考)。
std::cout << "队列近似大小: " << q.size_approx() << std::endl;
-
empty():判断队列是否为空(同样是近似判断,可能存在瞬时误差)。
if (q.empty()) {
std::cout << "队列当前为空" << std::endl;
}
-
clear():清空队列(非线程安全,调用时需确保无其他线程操作队列)。
q.clear(); // 清空所有元素
3.2.阻塞式并发队列:moodycamel::BlockingConcurrentQueue<T>
继承自 ConcurrentQueue<T>,增加了 阻塞等待 功能:当队列空时,wait_dequeue 会阻塞消费者线程,直到有元素入队;当队列满时(若设置了容量上限),enqueue 会阻塞生产者线程。
核心阻塞接口
-
wait_dequeue:阻塞等待,直到成功出队一个元素。
void wait_dequeue(T& item); // 无限期阻塞,直到出队成功
wait_dequeue_for:限时阻塞,超时返回 false
template <typename Rep, typename Period>
bool wait_dequeue_for(T& item, std::chrono::duration<Rep, Period> const& timeout);
-
wait_dequeue_until:阻塞到指定时间点,超时返回false。
template <typename Clock, typename Duration>
bool wait_dequeue_until(T& item, std::chrono::time_point<Clock, Duration> const& deadline);
示例:
moodycamel::BlockingConcurrentQueue<int> bq;
// 消费者线程:阻塞等待元素
std::thread consumer([&]() {
int val;
bq.wait_dequeue(val); // 阻塞,直到有元素入队
std::cout << "消费者收到: " << val << std::endl;
});
// 生产者线程:1秒后入队
std::thread producer([&]() {
std::this_thread::sleep_for(std::chrono::seconds(1));
bq.enqueue(100);
});
consumer.join();
producer.join();
4.concurrentqueue示例
#include "concurrentqueue.h"
#include <thread>
#include <iostream>
#include <vector>
// 定义并发队列(存储int类型)
moodycamel::ConcurrentQueue<int> q;
// 生产者线程:往队列中入队数据
void producer(int id, int count) {
for (int i = 0; i < count; ++i) {
q.enqueue(i); // 非阻塞入队(成功返回true,失败返回false)
// 也可使用enqueue_with_allocator(自定义内存分配)
}
std::cout << "Producer " << id << " finished\n";
}
// 消费者线程:从队列中出队数据
void consumer(int id) {
int item;
size_t count = 0;
// 非阻塞尝试出队,直到队列空且所有生产者完成(实际需配合结束标志)
while (q.try_dequeue(item)) {
++count;
// 处理数据(示例中仅计数)
}
std::cout << "Consumer " << id << " processed " << count << " items\n";
}
int main() {
// 启动2个生产者,每个生产1000个数据
std::thread p1(producer, 1, 1000);
std::thread p2(producer, 2, 1000);
// 启动2个消费者
std::thread c1(consumer, 1);
std::thread c2(consumer, 2);
// 等待生产者完成
p1.join();
p2.join();
// 等待消费者处理完剩余数据(实际场景需确保所有数据被消费)
c1.join();
c2.join();
return 0;
}
编译方式:只需包含头文件(concurrentqueue.h和blockingconcurrentqueue.h),无需链接额外库,直接编译即可。
5.4.库的选择建议
| 场景需求 | 推荐库 | 理由 |
|---|---|---|
| 高性能、多生产者多消费者 | moodycamel::ConcurrentQueue | 无锁设计,性能顶尖,无额外依赖 |
| 已使用 TBB 并行框架 | tbb::concurrent_queue | 与 TBB 组件无缝集成,开发效率高 |
| 依赖 Boost 生态、追求稳定性 | boost::lockfree::queue | 经过长期验证,兼容性好 |
| 大型项目、需要丰富工具链支持 | folly::ConcurrentQueue | 功能全面,适合 Facebook 生态项目 |
6.总结
在 C++ 中,虽然有一些标准库和第三方库提供了并发队列的实现,但它们往往存在一些限制。例如,Boost 的并发队列对对象的赋值运算符和析构函数有严格要求,而 Intel 的 TBB 队列则不是无锁的。ConcurrentQueue 不仅限制更少,而且经过了良好的测试,提供了更高级的特性。
C++ 并发队列库各有侧重,其中moodycamel::ConcurrentQueue以其无锁高性能、无依赖、易用性成为多数场景下的首选。实际开发中,应根据项目的并发规模、依赖限制、性能需求选择合适的库,避免重复实现线程安全逻辑。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)