多线程并发协作核心:生产者消费者模型深度解析(阻塞队列 + 环形队列实战)----《Hello Linux!》(19)
前言
在多线程并发编程中,如何协调 “数据生成” 与 “数据处理” 的节奏,避免因供需失衡导致的效率浪费或资源竞争?生产者消费者模型给出了经典答案。它通过 “解耦生产者与消费者”“统一交易场所” 的设计,让数据生成和处理可以独立并发执行,既解决了忙闲不均的问题,又最大化利用了系统资源,成为分布式系统、消息队列、服务器后台等场景的核心底层模型。
本文将从模型本质出发,层层拆解生产者消费者模型的核心逻辑:首先详解 “321 原则”(3 种关系、2 种角色、1 个交易场所),厘清生产者与生产者、消费者与消费者、生产者与消费者之间的互斥与同步关系;随后聚焦实战,分别实现两种经典的交易场所 —— 基于条件变量的阻塞队列,以及基于 POSIX 信号量的环形队列,完整呈现锁与条件变量、信号量的协同使用技巧;最后补充 POSIX 信号量的核心接口与线程间同步原理,帮你吃透模型的底层支撑技术。
无论你是想理解并发协作的设计思想,还是需要落地实际项目中的数据供需协调场景,本文都能为你提供清晰的思路与可直接复用的代码:从理论原则到代码实现,从细节注意事项(如伪唤醒处理、信号量与锁的调用顺序)到场景适配选择,让你彻底掌握生产者消费者模型的设计精髓与实战技巧,为构建高效、稳定的并发系统打下坚实基础。
生产者消费者模型
概念:作用是协调 “数据生成方” 和 “数据处理方” 的工作节奏,实现高效、解耦的并发协作
这个模型的优点:
1.解耦 2.支持并发 3.支持忙闲不均

这个的话有点像供货商–超市–购买者那种关系
这个模型可以简记为
321原则3是3种关系
生产者跟生产者 --互斥 消费者跟消费者 --互斥 生产者跟消费者 --同步和互斥
2是2种角色
1是1个交易场所 --特定结构的内存空间(一般用的是阻塞队列)
生产者的作用:1.获取数据 2.生产数据到特定的内存空间
消费者的作用:1.消费数据 2.加工处理数据
虽然只有生产者和消费者不能同时进入临界区
–但是可以一个在临界区(获取或者消费数据),一个在非临界区(生产数据或者加工处理数据)
生产消费是高效的
模拟实现生产者消费者模型
这里模拟实现的是其中特定的内存空间是阻塞队列的情况
普通的阻塞队列的模拟实现:
template <class T>
class BlockQueue
{
static const int defalutnum = 20;//这是阻塞队列的容量
public:
BlockQueue(int maxcap = defalutnum):maxcap_(maxcap)
{
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&c_cond_, nullptr);
pthread_cond_init(&p_cond_, nullptr);
// low_water_ = maxcap_/3;
// high_water_ = (maxcap_*2)/3;
}
T pop()
{
pthread_mutex_lock(&mutex_);
while(q_.size() == 0)
{
pthread_cond_wait(&c_cond_, &mutex_);
}
T out = q_.front();
q_.pop();
// if(q_.size()<low_water_) pthread_cond_signal(&p_cond_);
pthread_cond_signal(&p_cond_);
pthread_mutex_unlock(&mutex_);
return out;
}
void push(const T &in)
{
pthread_mutex_lock(&mutex_);
while(q_.size() == maxcap_){ // 这里的while防止了线程被伪唤醒
pthread_cond_wait(&p_cond_, &mutex_);
}
// 1. 队列没满 2.被唤醒
q_.push(in);
// if(q_.size() > high_water_) pthread_cond_signal(&c_cond_);
pthread_cond_signal(&c_cond_);
pthread_mutex_unlock(&mutex_);
}
~BlockQueue()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&c_cond_);
pthread_cond_destroy(&p_cond_);
}
private:
std::queue<T> q_; // 共享资源, q被当做整体使用的,q只有一份,加锁。但是共享资源也可以被看做多份!
int maxcap_; // 极值
pthread_mutex_t mutex_;
pthread_cond_t c_cond_;
pthread_cond_t p_cond_;
// int low_water_;
// int high_water_;
};
注意点:
1.要先加锁再判断临界资源是否满足条件
原因:因为判断临界资源是否就绪,也是在访问临界资源
2.这里的
low_water_和high_water_是模拟实现的低于某个值就生产,高于某个值就消费–没有这俩的时候是生产一个就消费一个
3.
pthread_cond_wait要在加锁之后去搞4.这个模板参数
T也可以是对象那种5.
pthread_cond_signal放在unlock里面或者外面都是可以的
为什么会有伪唤醒情况:
由于系统调度的意外产生的,没有收到
signal也进行下去有点像客户等外卖–还没收到外卖电话也去门口看一看
生产者和消费者的模拟实现:
分线程下来执行生产者的函数(函数里面就是获取数据和生产数据)
分线程下来执行消费者的函数(函数里面就是消费数据和处理数据)
–想要实现多生产者多消费者的话,就eg:
for循环去产生eg: void *Consumer(void *args) { ThreadData *td = static_cast<ThreadData*>(args); RingQueue<Task> *rq = td->rq; std::string name = td->threadname; while (true) { // 1. 消费数据 Task t; rq->Pop(&t); // 2. 处理数据 ......... } return nullptr; } for (int i = 0; i < 1; i++) { ThreadData *td = new ThreadData(); td->rq = rq; pthread_create(i, nullptr, Consumer, td); }
基于环形队列的生产消费模型

理念:
head表示生产者将要存的位置tail表示消费者将要消费的位置向后移时需要
(a+1)%...单生产者单消费者时需要满足3个条件(多生产多消费者时要用
POSIX信号量)1.指向同一个位置时只能有一个在访问 2.消费者不能超过生产者 3.生产者不能套圈
模拟实现:
const static int defaultcap = 20;
template<class T>
class RingQueue{
private:
void P(sem_t &sem)
{
sem_wait(&sem);
}
void V(sem_t &sem)
{
sem_post(&sem);
}
void Lock(pthread_mutex_t &mutex)
{
pthread_mutex_lock(&mutex);
}
void Unlock(pthread_mutex_t &mutex)
{
pthread_mutex_unlock(&mutex);
}
public:
RingQueue(int cap = defaultcap)
:ringqueue_(cap), cap_(cap), c_step_(0), p_step_(0)
{
sem_init(&cdata_sem_, 0, 0);
sem_init(&pspace_sem_, 0, cap);
pthread_mutex_init(&c_mutex_, nullptr);
pthread_mutex_init(&p_mutex_, nullptr);
}
void Push(const T &in) // 生产
{
P(pspace_sem_);
Lock(p_mutex_);
ringqueue_[p_step_] = in;
// 位置后移,维持环形特性
p_step_++;
p_step_ %= cap_;
Unlock(p_mutex_);
V(cdata_sem_);
}
void Pop(T *out) // 消费
{
P(cdata_sem_);
Lock(c_mutex_);
*out = ringqueue_[c_step_];
// 位置后移,维持环形特性
c_step_++;
c_step_ %= cap_;
Unlock(c_mutex_);
V(pspace_sem_);
}
~RingQueue()
{
sem_destroy(&cdata_sem_);
sem_destroy(&pspace_sem_);
pthread_mutex_destroy(&c_mutex_);
pthread_mutex_destroy(&p_mutex_);
}
private:
std::vector<T> ringqueue_;
int cap_;
int c_step_; // 消费者下标
int p_step_; // 生产者下标
sem_t cdata_sem_; // 消费者关注的数据资源
sem_t pspace_sem_; // 生产者关注的空间资源
pthread_mutex_t c_mutex_;
pthread_mutex_t p_mutex_;
};
如果想要多生产者多消费者的话,跟上面的那种产生方法一样
注意:要先
P再Lock原因:相当于先排队再买票进入,而不是买完票再排队进入
POSIX信号量
相较于
SystemV信号量,POSIX信号量还可以用于线程间同步

作用:初始化信号量
返回值:成功返回
0,失败返回-1
sem:需要初始化的那个的指针
pshared:0表示线程间共享,非零表示进程间共享
value:信号量初始值注意:在初始化之前,要先定义信号量 eg:
sem_t cdata_sem_

作用:销毁信号量
返回值:成功返回
0,失败返回-1

作用:把
sem减1,减后的值>=0就继续执行;<0就阻塞,直到>0成功返回
0,失败返回-1

作用:对信号量
sem的值加 1,若有线程因sem_wait阻塞等待该信号量,则唤醒其中一个线程。成功返回
0,失败返回-1
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)