承接上一篇原子类型与自旋锁,本篇聚焦线程间协作的核心工具:条件变量。互斥锁解决了 "共享数据竞争" 问题,而条件变量解决了 "线程间等待 - 通知" 问题 —— 让线程可以在某个条件不满足时主动阻塞等待,直到其他线程通知条件成立。

1:为什么需要条件变量

先看一个问题:如何实现 "线程 A 等待线程 B 完成某个任务后再继续执行"?

错误方案:轮训查询

#include <iostream>
#include <thread>
#include <mutex>
using namespace std;

bool task_done = false;
mutex mtx;

void worker() {
    // 模拟耗时任务
    this_thread::sleep_for(chrono::seconds(2));
    lock_guard<mutex> lg(mtx);
    task_done = true;
    cout << "任务完成" << endl;
}

void waiter() {
    while (true) {
        lock_guard<mutex> lg(mtx);
        if (task_done) {
            break;
        }
        // 不加sleep会占满CPU
        // 加sleep会导致响应延迟
    }
    cout << "等待完成,继续执行" << endl;
}

int main() {
    thread t1(worker);
    thread t2(waiter);
    t1.join();
    t2.join();
    return 0;
}

轮询的致命问题

  • CPU 浪费:线程不断循环检查条件,即使条件不满足也会占用 CPU
  • 响应延迟:如果加了 sleep,条件成立后线程不能立即被唤醒
  • 锁竞争:频繁加锁解锁,增加了锁的竞争开销

条件变量就是为了解决这个问题而生:让线程在条件不满足时主动阻塞休眠,释放 CPU;当条件成立时,由其他线程主动唤醒等待的线程。

2:std::conditon_variable

1:基本原理

条件变量必须配合互斥锁一起使用,工作流程如下:

  1. 等待线程先获取互斥锁,检查条件是否满足
  2. 如果条件不满足,调用wait()原子地释放锁并阻塞当前线程
  3. 通知线程获取互斥锁,修改条件,然后调用notify_one()notify_all()唤醒等待线程
  4. 等待线程被唤醒后,原子地重新获取互斥锁,再次检查条件是否满足

关键:wait()的 "释放锁 + 阻塞" 和 "唤醒 + 重新加锁" 都是原子操作,不会产生竞态条件。

2:核心接口

// 等待:阻塞当前线程,直到被通知或虚假唤醒
void wait(unique_lock<mutex>& lck);

// 带谓词的等待:阻塞直到被通知且谓词返回true
template <class Predicate>
void wait(unique_lock<mutex>& lck, Predicate pred);

// 唤醒一个等待的线程
void notify_one() noexcept;

// 唤醒所有等待的线程
void notify_all() noexcept;

3:为什么必须用unique_lock,不能用lock_guard

课件里明确提到wait()只能接收unique_lock<mutex>类型的参数,原因是:

  • wait()需要在阻塞前手动解锁互斥锁,唤醒后手动重新加锁
  • lock_guard不支持手动解锁和加锁,而unique_lock支持
  • unique_lock的灵活性正好满足条件变量的需求

3:条件变量的基本使用

1:最简单的等待-通知示例

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
using namespace std;

mutex mtx;
condition_variable cv;
bool ready = false; // 条件标志

void worker() {
    // 模拟耗时任务
    this_thread::sleep_for(chrono::seconds(2));
    
    // 修改条件必须加锁
    lock_guard<mutex> lg(mtx);
    ready = true;
    cout << "任务完成,通知等待线程" << endl;
    
    // 通知等待线程
    cv.notify_one();
}

void waiter() {
    // 必须用unique_lock
    unique_lock<mutex> lck(mtx);
    
    // 等待条件成立
    // 注意:必须用while循环,不能用if!(后面会讲为什么)
    while (!ready) {
        cv.wait(lck); // 阻塞,释放锁
    }
    
    cout << "收到通知,继续执行" << endl;
}

int main() {
    thread t1(worker);
    thread t2(waiter);
    t1.join();
    t2.join();
    return 0;
}

2:带谓词的wait(推荐写法)

C++11 提供了带谓词的wait()重载,内部已经帮我们实现了 while 循环,代码更简洁:

// 等价于上面的while循环
cv.wait(lck, [](){ return ready; });

这是推荐的标准写法,可以避免忘记写 while 循环导致的错误。

4:虚假唤醒

1:什么是虚假唤醒

虚假唤醒是指:即使没有线程调用notify_one()notify_all()wait()也可能会随机返回。

这不是 bug,而是操作系统和硬件的特性 —— 某些平台的条件变量实现允许出现虚假唤醒,以提高性能。

2:为什么必须用while循环

如果用if而不是while来判断条件,虚假唤醒发生时,线程会误以为条件已经成立,继续执行,导致程序出错:

// 错误写法!
if (!ready) {
    cv.wait(lck); // 可能被虚假唤醒
}
// 虚假唤醒后,ready仍然是false,但程序会继续执行

正确写法:用while循环,每次被唤醒后都重新检查条件:

// 正确写法
while (!ready) {
    cv.wait(lck);
}

或者使用带谓词的wait()

cv.wait(lck, [](){ return ready; });

记住:永远不要在 if 语句中使用 wait (),必须用 while 循环或带谓词的重载

5:经典示例1:两个线程交替打印奇数和偶数

1:完整代码

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
using namespace std;

int main() {
    mutex mtx;
    condition_variable cv;
    const int n = 100;
    bool flag = true; // true: 打印偶数线程执行;false: 打印奇数线程执行

    // 线程1:打印偶数 0,2,4,...,98
    thread t1([&]() {
        int i = 0;
        while (i < n) {
            unique_lock<mutex> lck(mtx);
            // 等待flag为true
            cv.wait(lck, [&](){ return flag; });
            
            cout << "偶数线程:" << i << endl;
            i += 2;
            flag = false; // 切换到奇数线程
            
            cv.notify_one(); // 通知奇数线程
        }
    });

    // 线程2:打印奇数 1,3,5,...,99
    thread t2([&]() {
        int j = 1;
        while (j < n) {
            unique_lock<mutex> lck(mtx);
            // 等待flag为false
            cv.wait(lck, [&](){ return !flag; });
            
            cout << "奇数线程:" << j << endl;
            j += 2;
            flag = true; // 切换到偶数线程
            
            cv.notify_one(); // 通知偶数线程
        }
    });

    t1.join();
    t2.join();
    return 0;
}

2:工作原理

  1. t1 先启动:t1 获取锁,flag 为 true,打印偶数,设置 flag 为 false,通知 t2;t1 再次循环,flag 为 false,阻塞等待
  2. t2 被唤醒:t2 获取锁,flag 为 false,打印奇数,设置 flag 为 true,通知 t1;t2 再次循环,flag 为 true,阻塞等待
  3. 循环往复:两个线程交替执行,直到打印完所有数字

无论哪个线程先启动,或者谁先抢到锁,这个逻辑都能保证严格的交替打印。

6:经典示例2:生产者-消费者模型

生产者 - 消费者模型是条件变量最经典的应用场景:

  • 生产者线程:生产数据,放入缓冲区
  • 消费者线程:从缓冲区取出数据,消费
  • 缓冲区满时,生产者阻塞等待;缓冲区空时,消费者阻塞等待

1:完整代码

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
using namespace std;

constexpr int BUFFER_SIZE = 5; /// 缓冲区大小
queue<int> buffer;             // 缓冲区

mutex mtx;
condition_variable not_full;  // 缓冲区非满
condition_variable not_empty; // 缓冲区非空
bool production_finished = false;

void producer(int id)
{
    for (int i = 0; i < 10; i++)
    {
        unique_lock<mutex> lck(mtx);

        // 缓冲区非满
        not_full.wait(lck, []()
                      { return buffer.size() < BUFFER_SIZE; });

        // produce data
        int data = id * 100 + i;
        buffer.push(data);
        cout << "produce:" << data << ",buffersize:" << buffer.size() << endl;

        // 通知消费者线程
        not_empty.notify_one();

        // move to next step
        lck.unlock();
        this_thread::sleep_for(chrono::milliseconds(100));
    }
}

void consumer(int id)
{
    for (;;)
    {
        unique_lock<mutex> lck(mtx);

        // 缓冲区非空
        not_empty.wait(lck, []()
                       { return buffer.size() > 0 || production_finished; });

        if (buffer.empty() && production_finished)
        {
            break;
        }

        // consume data
        int data = buffer.front();
        buffer.pop();
        cout << "consume:" << data << ",buffersize:" << buffer.size() << endl;

        // 通知生产者线程
        not_full.notify_one();

        lck.unlock();
        this_thread::sleep_for(chrono::milliseconds(200));
    }
}

int main()
{
    thread producers[2];
    thread consumers[3];

    for (int i = 0; i < 2; i++)
    {
        producers[i] = thread(producer, i);
    }

    for (int i = 0; i < 3; i++)
    {
        consumers[i] = thread(consumer, i);
    }

    for (int i = 0; i < 2; i++)
    {
        producers[i].join();
    }
    // 设置生产完成标志
    {
        unique_lock<mutex> lck(mtx);
        production_finished = true;
    }
    not_empty.notify_all();
    // 等待所有消费者线程完成
    for (int i = 0; i < 3; i++)
    {
        consumers[i].join();
    }

    cout << "task done" << endl;
    return 0;
}

2:关键设计点

  1. 两个条件变量not_full用于生产者等待,not_empty用于消费者等待
  2. 提前解锁:在模拟耗时操作前提前解锁,减少锁的持有时间,提高并发度
  3. 谓词等待:使用带谓词的wait(),避免虚假唤醒
  4. 优雅退出:等待所有生产者完成,再等待缓冲区为空,然后退出程序

7:std::condition_variable_any

std::condition_variable_anystd::condition_variable的泛化版本,区别在于:

  • std::condition_variable只能配合std::unique_lock<std::mutex>使用
  • std::condition_variable_any可以配合任何满足 BasicLockable 要求的锁使用,包括:
    • std::recursive_mutex
    • std::timed_mutex
    • std::recursive_timed_mutex
    • 自定义锁类型(自旋锁)

配合recursive_mutex使用

#include <condition_variable>
#include <mutex>

recursive_mutex rmtx;
condition_variable_any cv_any;
bool ready = false;

void wait_func() {
    unique_lock<recursive_mutex> lck(rmtx);
    cv_any.wait(lck, [](){ return ready; });
}

condition_variable_any的接口和condition_variable完全相同,只是灵活性更高,但性能略低。如果只需要配合std::mutex使用,优先选择std::condition_variable

8:超时等待

条件变量还支持超时等待,避免线程无限阻塞:

// 等待指定时长,超时返回cv_status::timeout
template <class Rep, class Period>
cv_status wait_for(unique_lock<mutex>& lck,
                   const chrono::duration<Rep, Period>& rel_time);

// 等待到指定时间点,超时返回cv_status::timeout
template <class Clock, class Duration>
cv_status wait_until(unique_lock<mutex>& lck,
                     const chrono::time_point<Clock, Duration>& abs_time);

// 带谓词的超时等待,超时返回false
template <class Rep, class Period, class Predicate>
bool wait_for(unique_lock<mutex>& lck,
              const chrono::duration<Rep, Period>& rel_time,
              Predicate pred);

示例:

unique_lock<mutex> lck(mtx);
// 最多等待1秒
if (cv.wait_for(lck, chrono::seconds(1), [](){ return ready; })) {
    cout << "条件成立,继续执行" << endl;
} else {
    cout << "等待超时" << endl;
}

9:notify_one和notify_all

函数 作用 适用场景
notify_one() 唤醒一个等待的线程 只有一个线程能处理条件成立的情况
notify_all() 唤醒所有等待的线程

多个线程都能处理条件成立的情况

惊群效应

当调用notify_all()时,所有等待的线程都会被唤醒,但只有一个线程能获取到锁,其他线程会再次阻塞。这种现象称为惊群效应,会导致不必要的上下文切换和锁竞争,影响性能。

避免惊群效应的原则

  • 只有当多个线程都能处理条件成立的情况时,才使用notify_all()
  • 否则,优先使用notify_one()

10:总结

  • 条件变量用于实现线程间的等待 - 通知机制,解决了轮询的 CPU 浪费和响应延迟问题
  • 条件变量必须配合互斥锁使用,且只能接收unique_lock类型的参数
  • 虚假唤醒是条件变量的固有特性,必须用 while 循环或带谓词的 wait () 来处理
  • 交替打印和生产者 - 消费者是条件变量最经典的两个应用场景
  • condition_variable_any可以配合任意锁类型使用,灵活性更高但性能略低
  • 超时等待可以避免线程无限阻塞,notify_one()优先于notify_all()以避免惊群效应
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐