C++11并发编程:条件变量
承接上一篇原子类型与自旋锁,本篇聚焦线程间协作的核心工具:条件变量。互斥锁解决了 "共享数据竞争" 问题,而条件变量解决了 "线程间等待 - 通知" 问题 —— 让线程可以在某个条件不满足时主动阻塞等待,直到其他线程通知条件成立。
1:为什么需要条件变量
先看一个问题:如何实现 "线程 A 等待线程 B 完成某个任务后再继续执行"?
错误方案:轮训查询
#include <iostream>
#include <thread>
#include <mutex>
using namespace std;
bool task_done = false;
mutex mtx;
void worker() {
// 模拟耗时任务
this_thread::sleep_for(chrono::seconds(2));
lock_guard<mutex> lg(mtx);
task_done = true;
cout << "任务完成" << endl;
}
void waiter() {
while (true) {
lock_guard<mutex> lg(mtx);
if (task_done) {
break;
}
// 不加sleep会占满CPU
// 加sleep会导致响应延迟
}
cout << "等待完成,继续执行" << endl;
}
int main() {
thread t1(worker);
thread t2(waiter);
t1.join();
t2.join();
return 0;
}
轮询的致命问题:
- CPU 浪费:线程不断循环检查条件,即使条件不满足也会占用 CPU
- 响应延迟:如果加了 sleep,条件成立后线程不能立即被唤醒
- 锁竞争:频繁加锁解锁,增加了锁的竞争开销
条件变量就是为了解决这个问题而生:让线程在条件不满足时主动阻塞休眠,释放 CPU;当条件成立时,由其他线程主动唤醒等待的线程。
2:std::conditon_variable
1:基本原理
条件变量必须配合互斥锁一起使用,工作流程如下:
- 等待线程先获取互斥锁,检查条件是否满足
- 如果条件不满足,调用
wait(),原子地释放锁并阻塞当前线程 - 通知线程获取互斥锁,修改条件,然后调用
notify_one()或notify_all()唤醒等待线程 - 等待线程被唤醒后,原子地重新获取互斥锁,再次检查条件是否满足
关键:
wait()的 "释放锁 + 阻塞" 和 "唤醒 + 重新加锁" 都是原子操作,不会产生竞态条件。
2:核心接口
// 等待:阻塞当前线程,直到被通知或虚假唤醒
void wait(unique_lock<mutex>& lck);
// 带谓词的等待:阻塞直到被通知且谓词返回true
template <class Predicate>
void wait(unique_lock<mutex>& lck, Predicate pred);
// 唤醒一个等待的线程
void notify_one() noexcept;
// 唤醒所有等待的线程
void notify_all() noexcept;
3:为什么必须用unique_lock,不能用lock_guard
课件里明确提到wait()只能接收unique_lock<mutex>类型的参数,原因是:
wait()需要在阻塞前手动解锁互斥锁,唤醒后手动重新加锁lock_guard不支持手动解锁和加锁,而unique_lock支持unique_lock的灵活性正好满足条件变量的需求
3:条件变量的基本使用
1:最简单的等待-通知示例
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
using namespace std;
mutex mtx;
condition_variable cv;
bool ready = false; // 条件标志
void worker() {
// 模拟耗时任务
this_thread::sleep_for(chrono::seconds(2));
// 修改条件必须加锁
lock_guard<mutex> lg(mtx);
ready = true;
cout << "任务完成,通知等待线程" << endl;
// 通知等待线程
cv.notify_one();
}
void waiter() {
// 必须用unique_lock
unique_lock<mutex> lck(mtx);
// 等待条件成立
// 注意:必须用while循环,不能用if!(后面会讲为什么)
while (!ready) {
cv.wait(lck); // 阻塞,释放锁
}
cout << "收到通知,继续执行" << endl;
}
int main() {
thread t1(worker);
thread t2(waiter);
t1.join();
t2.join();
return 0;
}
2:带谓词的wait(推荐写法)
C++11 提供了带谓词的wait()重载,内部已经帮我们实现了 while 循环,代码更简洁:
// 等价于上面的while循环
cv.wait(lck, [](){ return ready; });
这是推荐的标准写法,可以避免忘记写 while 循环导致的错误。
4:虚假唤醒
1:什么是虚假唤醒
虚假唤醒是指:即使没有线程调用notify_one()或notify_all(),wait()也可能会随机返回。
这不是 bug,而是操作系统和硬件的特性 —— 某些平台的条件变量实现允许出现虚假唤醒,以提高性能。
2:为什么必须用while循环
如果用if而不是while来判断条件,虚假唤醒发生时,线程会误以为条件已经成立,继续执行,导致程序出错:
// 错误写法!
if (!ready) {
cv.wait(lck); // 可能被虚假唤醒
}
// 虚假唤醒后,ready仍然是false,但程序会继续执行
正确写法:用while循环,每次被唤醒后都重新检查条件:
// 正确写法
while (!ready) {
cv.wait(lck);
}
或者使用带谓词的wait():
cv.wait(lck, [](){ return ready; });
记住:永远不要在 if 语句中使用 wait (),必须用 while 循环或带谓词的重载。
5:经典示例1:两个线程交替打印奇数和偶数
1:完整代码
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
using namespace std;
int main() {
mutex mtx;
condition_variable cv;
const int n = 100;
bool flag = true; // true: 打印偶数线程执行;false: 打印奇数线程执行
// 线程1:打印偶数 0,2,4,...,98
thread t1([&]() {
int i = 0;
while (i < n) {
unique_lock<mutex> lck(mtx);
// 等待flag为true
cv.wait(lck, [&](){ return flag; });
cout << "偶数线程:" << i << endl;
i += 2;
flag = false; // 切换到奇数线程
cv.notify_one(); // 通知奇数线程
}
});
// 线程2:打印奇数 1,3,5,...,99
thread t2([&]() {
int j = 1;
while (j < n) {
unique_lock<mutex> lck(mtx);
// 等待flag为false
cv.wait(lck, [&](){ return !flag; });
cout << "奇数线程:" << j << endl;
j += 2;
flag = true; // 切换到偶数线程
cv.notify_one(); // 通知偶数线程
}
});
t1.join();
t2.join();
return 0;
}
2:工作原理
- t1 先启动:t1 获取锁,flag 为 true,打印偶数,设置 flag 为 false,通知 t2;t1 再次循环,flag 为 false,阻塞等待
- t2 被唤醒:t2 获取锁,flag 为 false,打印奇数,设置 flag 为 true,通知 t1;t2 再次循环,flag 为 true,阻塞等待
- 循环往复:两个线程交替执行,直到打印完所有数字
无论哪个线程先启动,或者谁先抢到锁,这个逻辑都能保证严格的交替打印。
6:经典示例2:生产者-消费者模型
生产者 - 消费者模型是条件变量最经典的应用场景:
- 生产者线程:生产数据,放入缓冲区
- 消费者线程:从缓冲区取出数据,消费
- 缓冲区满时,生产者阻塞等待;缓冲区空时,消费者阻塞等待
1:完整代码
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
using namespace std;
constexpr int BUFFER_SIZE = 5; /// 缓冲区大小
queue<int> buffer; // 缓冲区
mutex mtx;
condition_variable not_full; // 缓冲区非满
condition_variable not_empty; // 缓冲区非空
bool production_finished = false;
void producer(int id)
{
for (int i = 0; i < 10; i++)
{
unique_lock<mutex> lck(mtx);
// 缓冲区非满
not_full.wait(lck, []()
{ return buffer.size() < BUFFER_SIZE; });
// produce data
int data = id * 100 + i;
buffer.push(data);
cout << "produce:" << data << ",buffersize:" << buffer.size() << endl;
// 通知消费者线程
not_empty.notify_one();
// move to next step
lck.unlock();
this_thread::sleep_for(chrono::milliseconds(100));
}
}
void consumer(int id)
{
for (;;)
{
unique_lock<mutex> lck(mtx);
// 缓冲区非空
not_empty.wait(lck, []()
{ return buffer.size() > 0 || production_finished; });
if (buffer.empty() && production_finished)
{
break;
}
// consume data
int data = buffer.front();
buffer.pop();
cout << "consume:" << data << ",buffersize:" << buffer.size() << endl;
// 通知生产者线程
not_full.notify_one();
lck.unlock();
this_thread::sleep_for(chrono::milliseconds(200));
}
}
int main()
{
thread producers[2];
thread consumers[3];
for (int i = 0; i < 2; i++)
{
producers[i] = thread(producer, i);
}
for (int i = 0; i < 3; i++)
{
consumers[i] = thread(consumer, i);
}
for (int i = 0; i < 2; i++)
{
producers[i].join();
}
// 设置生产完成标志
{
unique_lock<mutex> lck(mtx);
production_finished = true;
}
not_empty.notify_all();
// 等待所有消费者线程完成
for (int i = 0; i < 3; i++)
{
consumers[i].join();
}
cout << "task done" << endl;
return 0;
}
2:关键设计点
- 两个条件变量:
not_full用于生产者等待,not_empty用于消费者等待 - 提前解锁:在模拟耗时操作前提前解锁,减少锁的持有时间,提高并发度
- 谓词等待:使用带谓词的
wait(),避免虚假唤醒 - 优雅退出:等待所有生产者完成,再等待缓冲区为空,然后退出程序
7:std::condition_variable_any
std::condition_variable_any是std::condition_variable的泛化版本,区别在于:
std::condition_variable只能配合std::unique_lock<std::mutex>使用std::condition_variable_any可以配合任何满足 BasicLockable 要求的锁使用,包括:std::recursive_mutexstd::timed_mutexstd::recursive_timed_mutex- 自定义锁类型(自旋锁)
配合recursive_mutex使用
#include <condition_variable>
#include <mutex>
recursive_mutex rmtx;
condition_variable_any cv_any;
bool ready = false;
void wait_func() {
unique_lock<recursive_mutex> lck(rmtx);
cv_any.wait(lck, [](){ return ready; });
}
condition_variable_any的接口和condition_variable完全相同,只是灵活性更高,但性能略低。如果只需要配合std::mutex使用,优先选择std::condition_variable。
8:超时等待
条件变量还支持超时等待,避免线程无限阻塞:
// 等待指定时长,超时返回cv_status::timeout
template <class Rep, class Period>
cv_status wait_for(unique_lock<mutex>& lck,
const chrono::duration<Rep, Period>& rel_time);
// 等待到指定时间点,超时返回cv_status::timeout
template <class Clock, class Duration>
cv_status wait_until(unique_lock<mutex>& lck,
const chrono::time_point<Clock, Duration>& abs_time);
// 带谓词的超时等待,超时返回false
template <class Rep, class Period, class Predicate>
bool wait_for(unique_lock<mutex>& lck,
const chrono::duration<Rep, Period>& rel_time,
Predicate pred);
示例:
unique_lock<mutex> lck(mtx);
// 最多等待1秒
if (cv.wait_for(lck, chrono::seconds(1), [](){ return ready; })) {
cout << "条件成立,继续执行" << endl;
} else {
cout << "等待超时" << endl;
}
9:notify_one和notify_all
| 函数 | 作用 | 适用场景 |
|---|---|---|
notify_one() |
唤醒一个等待的线程 | 只有一个线程能处理条件成立的情况 |
notify_all() |
唤醒所有等待的线程 |
多个线程都能处理条件成立的情况 |
惊群效应
当调用notify_all()时,所有等待的线程都会被唤醒,但只有一个线程能获取到锁,其他线程会再次阻塞。这种现象称为惊群效应,会导致不必要的上下文切换和锁竞争,影响性能。
避免惊群效应的原则:
- 只有当多个线程都能处理条件成立的情况时,才使用
notify_all() - 否则,优先使用
notify_one()
10:总结
- 条件变量用于实现线程间的等待 - 通知机制,解决了轮询的 CPU 浪费和响应延迟问题
- 条件变量必须配合互斥锁使用,且只能接收
unique_lock类型的参数 - 虚假唤醒是条件变量的固有特性,必须用 while 循环或带谓词的 wait () 来处理
- 交替打印和生产者 - 消费者是条件变量最经典的两个应用场景
condition_variable_any可以配合任意锁类型使用,灵活性更高但性能略低- 超时等待可以避免线程无限阻塞,
notify_one()优先于notify_all()以避免惊群效应
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)