好的,我们来详细探讨 C++ 中 std::condition_variable 的使用。条件变量是多线程同步中用于线程间通信的强大工具,通常与互斥锁 std::mutex 结合使用。它允许一个或多个线程等待某个条件成立或被通知。

核心概念

  1. std::condition_variable: 提供等待和通知机制。
  2. std::mutex: 用于保护共享数据,确保线程安全访问。
  3. 等待条件:
    • wait(std::unique_lock<std::mutex>& lock): 原子操作:释放锁并阻塞等待。
    • wait(std::unique_lock<std::mutex>& lock, Predicate pred): 更安全的等待方式,检查条件以避免虚假唤醒。
  4. 通知条件:
    • notify_one(): 唤醒一个等待线程。
    • notify_all(): 唤醒所有等待线程。

经典场景:生产者-消费者模型

我们将用一个简单的生产者-消费者队列来演示条件变量的实战用法。

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <atomic>

class ThreadSafeQueue {
private:
    std::queue<int> data_queue;        // 共享数据
    std::mutex mtx;                    // 保护队列的互斥锁
    std::condition_variable cond;      // 条件变量
    std::atomic<bool> stop_flag{false}; // 安全停止标志

public:
    // 生产者:向队列添加数据
    void push(int value) {
        {
            std::lock_guard<std::mutex> lock(mtx); // 加锁
            data_queue.push(value);
        } // 解锁发生在lock_guard析构时
        cond.notify_one(); // 通知一个等待的消费者
    }

    // 消费者:尝试从队列取出数据
    bool try_pop(int& value) {
        std::lock_guard<std::mutex> lock(mtx); // 加锁
        if (data_queue.empty()) {
            return false; // 队列为空,立即返回
        }
        value = data_queue.front();
        data_queue.pop();
        return true;
    }

    // 消费者:等待并取出数据
    bool wait_and_pop(int& value) {
        std::unique_lock<std::mutex> lock(mtx); // 必须用unique_lock
        // 等待条件:队列非空 或 收到停止信号
        cond.wait(lock, [this]() {
            return !data_queue.empty() || stop_flag.load();
        });

        if (stop_flag.load() && data_queue.empty()) {
            return false; // 停止且队列空,退出
        }

        value = data_queue.front();
        data_queue.pop();
        return true;
    }

    // 通知所有等待线程停止
    void stop() {
        stop_flag.store(true);
        cond.notify_all(); // 唤醒所有等待线程
    }
};

// 生产者线程函数
void producer(ThreadSafeQueue& queue, int id) {
    for (int i = 0; i < 5; ++i) {
        queue.push(id * 100 + i); // 生产数据
        std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟耗时
    }
}

// 消费者线程函数
void consumer(ThreadSafeQueue& queue, int id) {
    int value;
    while (queue.wait_and_pop(value)) { // 阻塞等待数据
        std::cout << "Consumer " << id << " got: " << value << std::endl;
    }
    std::cout << "Consumer " << id << " exiting." << std::endl;
}

int main() {
    ThreadSafeQueue queue;

    // 创建消费者线程
    std::thread c1(consumer, std::ref(queue), 1);
    std::thread c2(consumer, std::ref(queue), 2);

    // 创建生产者线程
    std::thread p1(producer, std::ref(queue), 1);
    std::thread p2(producer, std::ref(queue), 2);

    p1.join();
    p2.join();

    // 通知消费者停止
    queue.stop();

    c1.join();
    c2.join();

    return 0;
}

关键点解析

  1. unique_lock vs lock_guard: wait 函数需要操作锁(释放和重新获取),因此必须使用更灵活的 std::unique_lock
  2. 带谓词的 wait: cond.wait(lock, predicate) 等价于:
    while (!predicate()) {
        cond.wait(lock);
    }
    

    它能有效防止虚假唤醒(线程在没有收到 notify 的情况下被唤醒)。
  3. 安全停止机制:
    • 使用 std::atomic<bool> stop_flag 确保安全修改。
    • 在等待条件中加入 stop_flag 检查:[this]() { return !data_queue.empty() || stop_flag.load(); }
    • stop() 方法设置标志并调用 notify_all() 唤醒所有等待线程。
  4. 通知时机: 生产者通常在修改共享数据(如 push)并释放锁之后调用 notify_one()notify_all()。这样可以避免唤醒的线程立即因锁未释放而再次阻塞。

运行逻辑

  1. 生产者:生产数据放入队列,释放锁后通知一个消费者。
  2. 消费者
    • 尝试获取锁。
    • 检查条件(队列非空或停止信号)。
    • 如果条件不满足,则释放锁并阻塞等待。
    • 被唤醒后重新获取锁,再次检查条件(防止虚假唤醒)。
    • 条件满足则取出数据。
  3. 主线程:等待生产者结束,发送停止信号唤醒所有消费者,消费者检查到停止信号且队列空则退出。

注意事项

  • 死锁风险:确保在等待条件变量前正确持有锁,并在等待后正确释放。
  • 丢失唤醒:确保在条件可能满足时发送通知(如 push 后)。
  • 虚假唤醒:使用带谓词的 wait 是标准做法。
  • 系统时钟影响:某些系统时钟调整可能导致 wait_forwait_until 行为异常。

这个示例清晰地展示了 std::condition_variable 在多线程同步中的核心作用。理解并掌握其用法对于编写高效、安全的并发程序至关重要。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐