C++条件变量实战:生产者消费者模型详解
·
好的,我们来详细探讨 C++ 中 std::condition_variable 的使用。条件变量是多线程同步中用于线程间通信的强大工具,通常与互斥锁 std::mutex 结合使用。它允许一个或多个线程等待某个条件成立或被通知。
核心概念
std::condition_variable: 提供等待和通知机制。std::mutex: 用于保护共享数据,确保线程安全访问。- 等待条件:
wait(std::unique_lock<std::mutex>& lock): 原子操作:释放锁并阻塞等待。wait(std::unique_lock<std::mutex>& lock, Predicate pred): 更安全的等待方式,检查条件以避免虚假唤醒。
- 通知条件:
notify_one(): 唤醒一个等待线程。notify_all(): 唤醒所有等待线程。
经典场景:生产者-消费者模型
我们将用一个简单的生产者-消费者队列来演示条件变量的实战用法。
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <atomic>
class ThreadSafeQueue {
private:
std::queue<int> data_queue; // 共享数据
std::mutex mtx; // 保护队列的互斥锁
std::condition_variable cond; // 条件变量
std::atomic<bool> stop_flag{false}; // 安全停止标志
public:
// 生产者:向队列添加数据
void push(int value) {
{
std::lock_guard<std::mutex> lock(mtx); // 加锁
data_queue.push(value);
} // 解锁发生在lock_guard析构时
cond.notify_one(); // 通知一个等待的消费者
}
// 消费者:尝试从队列取出数据
bool try_pop(int& value) {
std::lock_guard<std::mutex> lock(mtx); // 加锁
if (data_queue.empty()) {
return false; // 队列为空,立即返回
}
value = data_queue.front();
data_queue.pop();
return true;
}
// 消费者:等待并取出数据
bool wait_and_pop(int& value) {
std::unique_lock<std::mutex> lock(mtx); // 必须用unique_lock
// 等待条件:队列非空 或 收到停止信号
cond.wait(lock, [this]() {
return !data_queue.empty() || stop_flag.load();
});
if (stop_flag.load() && data_queue.empty()) {
return false; // 停止且队列空,退出
}
value = data_queue.front();
data_queue.pop();
return true;
}
// 通知所有等待线程停止
void stop() {
stop_flag.store(true);
cond.notify_all(); // 唤醒所有等待线程
}
};
// 生产者线程函数
void producer(ThreadSafeQueue& queue, int id) {
for (int i = 0; i < 5; ++i) {
queue.push(id * 100 + i); // 生产数据
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟耗时
}
}
// 消费者线程函数
void consumer(ThreadSafeQueue& queue, int id) {
int value;
while (queue.wait_and_pop(value)) { // 阻塞等待数据
std::cout << "Consumer " << id << " got: " << value << std::endl;
}
std::cout << "Consumer " << id << " exiting." << std::endl;
}
int main() {
ThreadSafeQueue queue;
// 创建消费者线程
std::thread c1(consumer, std::ref(queue), 1);
std::thread c2(consumer, std::ref(queue), 2);
// 创建生产者线程
std::thread p1(producer, std::ref(queue), 1);
std::thread p2(producer, std::ref(queue), 2);
p1.join();
p2.join();
// 通知消费者停止
queue.stop();
c1.join();
c2.join();
return 0;
}
关键点解析
unique_lockvslock_guard:wait函数需要操作锁(释放和重新获取),因此必须使用更灵活的std::unique_lock。- 带谓词的
wait:cond.wait(lock, predicate)等价于:
它能有效防止虚假唤醒(线程在没有收到while (!predicate()) { cond.wait(lock); }notify的情况下被唤醒)。 - 安全停止机制:
- 使用
std::atomic<bool> stop_flag确保安全修改。 - 在等待条件中加入
stop_flag检查:[this]() { return !data_queue.empty() || stop_flag.load(); }。 stop()方法设置标志并调用notify_all()唤醒所有等待线程。
- 使用
- 通知时机: 生产者通常在修改共享数据(如
push)并释放锁之后调用notify_one()或notify_all()。这样可以避免唤醒的线程立即因锁未释放而再次阻塞。
运行逻辑
- 生产者:生产数据放入队列,释放锁后通知一个消费者。
- 消费者:
- 尝试获取锁。
- 检查条件(队列非空或停止信号)。
- 如果条件不满足,则释放锁并阻塞等待。
- 被唤醒后重新获取锁,再次检查条件(防止虚假唤醒)。
- 条件满足则取出数据。
- 主线程:等待生产者结束,发送停止信号唤醒所有消费者,消费者检查到停止信号且队列空则退出。
注意事项
- 死锁风险:确保在等待条件变量前正确持有锁,并在等待后正确释放。
- 丢失唤醒:确保在条件可能满足时发送通知(如
push后)。 - 虚假唤醒:使用带谓词的
wait是标准做法。 - 系统时钟影响:某些系统时钟调整可能导致
wait_for或wait_until行为异常。
这个示例清晰地展示了 std::condition_variable 在多线程同步中的核心作用。理解并掌握其用法对于编写高效、安全的并发程序至关重要。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)