【Linux 多线程实战总结】条件变量 + 生产者消费者模型 + 阻塞队列 从 0 到 1 全解析
·
【Linux 多线程实战总结】条件变量 + 生产者消费者模型 + 阻塞队列 从 0 到 1 全解析
本篇博客全程基于 C++ 手动封装实现,涵盖条件变量核心 API、原理、封装细节、生产者消费者模型、阻塞队列、任务类解耦,以及实战中所有踩坑记录与解决方案,专为后期复习打造,保姆级详细教程!
前言
- 在多线程编程中,我们会遇到线程同步问题:多个线程操作共享资源时,需要让线程在条件不满足时等待,条件满足时唤醒。
- 互斥锁只能解决线程安全,无法解决线程协同等待 / 唤醒,因此条件变量(Condition) 应运而生。
- 本文将从核心 API→原理→封装→实战模型→踩坑全流程讲解,最终实现多生产者多消费者。
一、条件变量 核心 API 详解(原生 pthread)
条件变量基于 pthread 库实现,配套 5 个核心接口,是封装的基础
必须配合互斥锁使用(我的另一篇文章写了如何封装一个互斥锁,可以看看)
原子操作:不会出现死锁
唤醒后必须重新检查条件(防止虚假唤醒)
二、条件变量 能做什么?
- 线程同步:让线程在条件不满足时主动等待,不占用 CPU
- 协同工作:生产者 / 消费者、线程池、消息队列的核心基础
- 解决死锁:等待时自动释放锁,避免线程长时间持有锁导致阻塞
- 高效并发:相比轮询检查,条件变量是事件驱动,性能极高
三、经典应用:生产者消费者模型
- 模型核心概念
生产者线程:往共享队列(仓库)中生产数据 / 任务
消费者线程:从共享队列中消费数据 / 任务
共享队列:临界资源,需要互斥锁保护
两个条件变量:
队列满 → 生产者等待
队列空 → 消费者等待 - 模型规则(必背)
队列满,生产者阻塞等待
队列空,消费者阻塞等待
生产者生产完成 → 唤醒消费者
消费者消费完成 → 唤醒生产者
所有共享资源操作必须加锁
条件判断必须用while,禁止用if(防虚假唤醒) - 进阶封装:阻塞队列(BlockQueue)
对生产者消费者模型的封装实现:线程安全的队列,外部无感等待 / 唤醒,直接使用。
四、分步封装实战(从 0 到 1,保姆级实现)
我们基于RAII 机制封装:互斥锁 → 条件变量 → 阻塞队列 → 任务类,全程 C++ 面向对象。
前置说明
所有封装遵循规则:
系统资源(锁 / 条件变量)禁止拷贝、赋值
RAII:构造初始化,析构自动释放
接口极简,安全易用
第一步:封装互斥锁 + RAII 自动锁(另一篇文章由详细教程)
第二步:封装条件变量
#pragma once
#include <pthread.h>
#include "Mutex.hpp" // 因为要和 Mutex 一起用
namespace CondModule
{
class Cond
{
public:
// 这里与锁一样,Cond,条件变量不能拷贝
Cond(const Cond &) = delete;
Cond &operator=(const Cond &) = delete;
Cond()
{
// 初始化条件变量
/*
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t
*restrict attr);
参数:
cond:要初始化的条件变量
attr:NULL
*/
pthread_cond_init(&_cond, NULL);
}
// 核心接口
void Wait(LockModule::Mutex &mutex)
{
/*
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict
mutex);
参数:
cond:要在这个条件变量上等待
mutex:互斥量,后⾯详细解释
*/
//底层调用:自动释放锁+加锁等待
pthread_cond_wait(&_cond, mutex.GetMutexOriginal());
}
//唤醒一个线程
void Signal(){
pthread_cond_signal(&_cond);
}
//唤醒所有线程
void Broadcast(){
pthread_cond_broadcast(&_cond);
}
~Cond()
{
// 销毁条件变量
pthread_cond_destroy(&_cond);
}
private:
pthread_cond_t _cond;
};
}
第三步:封装阻塞队列
/*
阻塞队列
*/
#ifndef __BLOCK_QUEUE_HPP__
// 2. 如果没定义过 → 立刻定义它!
#define __BLOCK_QUEUE_HPP__
#include <iostream>
#include <string>
#include <queue>
#include <pthread.h>
#include "Cond.hpp"
#include "Mutex.hpp"
using namespace LockModule;
using namespace CondModule;
/*
1.:定义【模板类】(支持任意数据类型)
阻塞队列要存数字、字符串、对象都可以,所以用模板
*/
template <typename T>
class BlockQueue
{
public:
BlockQueue(int cap) : _max_cap(cap)
{
}
/*
生产者接口:入队 Enqueue
对应总结:
共享变量修改 必须加锁
队列满 → 线程等待
生产完成 → 唤醒消费者
RAII 自动锁,避免死锁
*/
void Enqueue(T &data)
{
// 下面的代码全都是临界资源,所以要加锁
LockGuard lock(_mutex);
// 首先生产者生产数据进入队列
// 先判断是否满
while (isFull())
{
// 那么就阻塞等待+解锁
_prod_cond.Wait(_mutex);
}
// 否则进入临界资源
// 入队
_queue.push(data);
std::cout << "[生产] 入队成功,队列大小:" << _queue.size() << std::endl;
// 唤醒消费者
_cons_cond.Signal();
}
/*
写【消费者接口:出队 Pop】
对应总结:
队列空 → 消费者等待
消费完成 → 唤醒生产者
*/
void Pop(T &data)
{
// 与生产者一样的原理,加锁
LockGuard lock(_mutex);
// 判断是否为空,空了解锁+等待
while (isEmpty())
{
_cons_cond.Wait(_mutex);
}
// 否则就消费一个商品
// 出队列
// 这里应该执行
data = _queue.front();
_queue.pop();
std::cout << "[消费] 出队成功,队列大小:" << _queue.size() << std::endl;
// 唤醒生产者
_prod_cond.Signal();
}
// 判空和判满
bool isFull()
{
return _queue.size() == _max_cap;
}
bool isEmpty()
{
return _queue.empty();
}
// 析构函数:自动调用锁和条件变量的析构,不用写代码
~BlockQueue() {}
private:
// 共享这个阻塞队列,所以有锁,条件变量
Mutex _mutex;
std::queue<T> _queue;
// 然后队列属性
int _max_cap; // 最大容量
// 生产条件变量和消费条件变量
Cond _prod_cond; // 队列满则等待
Cond _cons_cond; /// 队列为空则等待
};
#endif
第四步:封装任务类解耦线程与业务
#pragma once
#include <iostream>
#include <functional>
using fun_t = std::function<void()>;
class Task
{
public:
Task() : _task(nullptr)
{
}
// 带参构造(核心!传入要执行的任务 + 数据)
Task(fun_t task)
: _task(task)
{
}
// 执行任务
// 仿函数重载:让对象可以像函数一样调用(核心!)
void operator()()
{
// 执行我们封装的任务
if (_task)
_task();
}
~Task()
{
_task = nullptr;
}
private:
// 定义个任务盒子
fun_t _task;
};
五、完整测试代码(多生产者 + 多消费者)
#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include "BlockQueue.hpp"
#include "Task.hpp"
// 队列最大容量
#define CAP 5
#define PRODUCER_NUM 3 // 3个生产者
#define CONSUMER_NUM 5 // 5个消费者
#define CAP 5
// 下载任务(拼写正确)
void Download()
{
std::cout << "这是一个下载任务" << std::endl;
}
// 生产者
void *producer(void *arg)
{
BlockQueue<Task> *bq = (BlockQueue<Task> *)arg;
int data = 0;
while (true)
{
Task t(Download); // 绑定任务
bq->Enqueue(t);
std::cout << "生产者生产任务:" << data << std::endl;
data++;
sleep(1);
}
return nullptr;
}
// 消费者
void *consumer(void *arg)
{
BlockQueue<Task> *bq = (BlockQueue<Task> *)arg;
while (true)
{
Task t;
bq->Pop(t);
t();
std::cout << std::endl;
}
return nullptr;
}
int main()
{
BlockQueue<Task> bq(CAP);
// 1. 创建生产者线程数组
pthread_t producers[PRODUCER_NUM];
for (int i = 0; i < PRODUCER_NUM; i++)
{
pthread_create(&producers[i], nullptr, producer, &bq);
}
sleep(1);
// 2. 创建消费者线程数组
pthread_t consumers[CONSUMER_NUM];
for (int i = 0; i < CONSUMER_NUM; i++)
{
pthread_create(&consumers[i], nullptr, consumer, &bq);
}
// 3. 等待所有生产者
for (int i = 0; i < PRODUCER_NUM; i++)
{
pthread_join(producers[i], nullptr);
}
// 4. 等待所有消费者
for (int i = 0; i < CONSUMER_NUM; i++)
{
pthread_join(consumers[i], nullptr);
}
return 0;
}
五、核心总结
- 条件变量核心
配合互斥锁使用,解决线程同步 / 等待唤醒
Wait是原子操作:释放锁 → 等待 → 抢锁
条件判断必须用while,防虚假唤醒
禁止拷贝,系统资源唯一 - 生产者消费者模型核心
1 把锁 + 2 个条件变量 + 1 个共享队列
满→生产者等,空→消费者等
生产唤醒消费,消费唤醒生产 - 阻塞队列核心
模板类,支持任意数据 / 任务
线程安全,外部无感等待 / 唤醒
Pop必须用void+ 引用传参 - 任务类核心
基于std::function封装,支持任意函数
仿函数operator()简化调用
解耦线程与业务逻辑 - 多线程终极规则
共享资源读写 → 必须加锁
线程协同 → 必须用条件变量
RAII 机制 → 杜绝资源泄漏 / 死锁
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)