OS78.【Linux】线程互斥(7) 基于阻塞队列的多生产者-多消费者模型
目录
如果生产者线程或消费者线程在条件变量下的等待队列中排队了被误唤醒了怎么办?
1.知识回顾
参见OS78.【Linux】线程互斥(4) 基于阻塞队列的生产者-消费者模型(单生产者、单消费者的初步版本)文章复习生产者-消费者模型和基于阻塞队列的单生产者-单消费者代码
2.可以定制唤醒的策略
使用OS78.【Linux】线程互斥(4) 基于阻塞队列的生产者-消费者模型(单生产者、单消费者的初步版本)文章的代码,做进一步修改,比如生产一批再消费一批
定制唤醒的策略: 可以定义两个阈值:生产阈值和消费阈值
size_t _produce_threshold;//生产阈值
size_t _consume_threshold;//消费阈值
如果队列中元素的个数<生产阈值,那么生产者就来生产;
如果队列中元素的个数>消费阈值,那么消费者就来消费
如果队列中元素的个数[生产阈值,消费阈值],那么生产者和消费者自由运行
画图表示:

代码为:
#include <pthread.h>
#include <unistd.h>
#include <cstdio>
#include <cstdlib>
#include <queue>
template <class T>
class blocking_queue
{
public:
blocking_queue(size_t max_capacity=20)
:_max_capacity(max_capacity)
{
pthread_mutex_init(&_lock,nullptr);
pthread_cond_init(&_p_cond,nullptr);
pthread_cond_init(&_c_cond,nullptr);
_produce_threshold=_max_capacity/3;
_consume_threshold=(_produce_threshold*2)/3;
}
T& pop()
{
sleep(1);
pthread_mutex_lock(&_lock);
if (_q.empty())
pthread_cond_wait(&_c_cond,&_lock);
auto& obj=_q.front();
_q.pop();
if (_q.size()<_produce_threshold)
pthread_cond_signal(&_p_cond);
pthread_mutex_unlock(&_lock);
return obj;
}
void push(const T& obj)
{
sleep(1);
pthread_mutex_lock(&_lock);
if (_q.size()==_max_capacity)
pthread_cond_wait(&_p_cond,&_lock);
_q.push(obj);
if (_q.size()>_consume_threshold)
pthread_cond_signal(&_c_cond);
pthread_mutex_unlock(&_lock);
}
~blocking_queue()
{
pthread_mutex_destroy(&_lock);
pthread_cond_destroy(&_p_cond);
pthread_cond_destroy(&_c_cond);
while(!_q.empty())
_q.pop();
}
private:
std::queue<T> _q;
size_t _max_capacity;
pthread_mutex_t _lock;
pthread_cond_t _p_cond;//生产者的条件变量
pthread_cond_t _c_cond;//消费者的条件变量
size_t _produce_threshold;//生产阈值
size_t _consume_threshold;//消费阈值
};
void* produce(void* args)
{
blocking_queue<int>* bq=static_cast<blocking_queue<int>*>(args);
for (;;)
{
int data=rand()%10;
bq->push(data);
printf("生产者生产了数据%d\n",data);
}
return nullptr;
}
void* consume(void* args)
{
blocking_queue<int>* bq=static_cast<blocking_queue<int>*>(args);
for (;;)
{
int data=bq->pop();
printf("消费者消费了数据%d\n",data);
}
return nullptr;
}
int main()
{
srand((unsigned int)time(NULL));
blocking_queue<int>* bq=new blocking_queue<int>();
pthread_t producer,consumer;
pthread_create(&producer,nullptr,produce,bq);
pthread_create(&consumer,nullptr,consume,bq);
pthread_join(producer,nullptr);
pthread_join(consumer,nullptr);
delete bq;
return 0;
}
运行结果:

3.阻塞队列也可以放对象
由于class blocking_queue有template <class T>,那么也可以放任务对象:
blocking_queue<Task>* bq=new blocking_queue<Task>();
这里仅做提醒,就不详细写了
★4.全局看待生产者-消费者模型
面试题
2023美团基础研发部实习后端一面
https://www.nowcoder.com/discuss/469259452760276992
感谢VandNe牛友提供题目!
生产者消费者模型有互斥锁,那会不会效率很低啊,怎么提高生产者消费者模型的效率?
答: 这里需要全局看待生产者-消费者模型
回答
问题1: 生产者-消费者模型有前提的,生产者将数据放到特定的内存区域中,前提生产者有数据可写,那生产者提供的数据从哪里来?
答: 从用户、网络中获取,生产者生产的数据也是要花时间获取的
问题2: 消费行为单指消费者将数据读取到自己的线程中吗?
答: 不单指将数据读取到自己的线程中,消费者线程可能需要对数据做加工处理
完整的过程
完整的生产者-消费者模型流程如下:
生产者:
1. 获取数据
2. 生产数据到队列消费者:
1. 消费数据
2. 加工处理数据
★提高生产者消费者模型的效率
有了上述的完整的过程,也就可以提高生产者消费者模型的效率了
因为特定内存区域加了锁,所以该内存区域称为临界区,那么生产者和消费者访问该临界区是串行的,
如果只看临界区,是找不到提高生产者消费者模型的效率的思路的
必须全局看待生产者-消费者模型!
这里从上面的完整的过程分析:
假设生产者正在向临界区中写入数据,消费者虽然不能访问该临界区,但可以将之前得到的数据进行加工处理
假设消费者正在向临界区中读取数据, 生产者虽然不能访问该临界区, 但可以获取数据以备之后写入
从上述假设可以得出: 很大概率上,生产者和消费者,其中一个访问临界区代码,另外一个正在访问非临界区代码,相当于生产动作(指的是获取数据+写入数据)和消费动作(指的是读取数据+加工数据)是同时运行的,这样在一定程度上可以实现并发
(并发的概念: OS21.【Linux】进程状态(2) O(1)调度算法、进程切换和其他概念)
画图表示:
重复1~4

5.伪唤醒问题
以下代码是单生产者、单消费者版本:
#include <pthread.h>
#include <unistd.h>
#include <cstdio>
#include <cstdlib>
#include <queue>
template <class T>
class blocking_queue
{
public:
blocking_queue(size_t max_capacity=20)
:_max_capacity(max_capacity)
{
pthread_mutex_init(&_lock,nullptr);
pthread_cond_init(&_p_cond,nullptr);
pthread_cond_init(&_c_cond,nullptr);
}
T& pop()
{
sleep(1);
pthread_mutex_lock(&_lock);
if (_q.empty())
pthread_cond_wait(&_c_cond,&_lock);
auto& obj=_q.front();
_q.pop();
pthread_cond_signal(&_p_cond);
pthread_mutex_unlock(&_lock);
return obj;
}
void push(const T& obj)
{
sleep(1);
pthread_mutex_lock(&_lock);
if (_q.size()>=_max_capacity)
pthread_cond_wait(&_p_cond,&_lock);
_q.push(obj);
pthread_cond_signal(&_c_cond);
pthread_mutex_unlock(&_lock);
}
~blocking_queue()
{
pthread_mutex_destroy(&_lock);
pthread_cond_destroy(&_p_cond);
pthread_cond_destroy(&_c_cond);
while(!_q.empty())
_q.pop();
}
private:
std::queue<T> _q;
size_t _max_capacity;
pthread_mutex_t _lock;
pthread_cond_t _p_cond;//生产者的条件变量
pthread_cond_t _c_cond;//消费者的条件变量
};
void* produce(void* args)
{
blocking_queue<int>* bq=static_cast<blocking_queue<int>*>(args);
for (;;)
{
int data=rand()%10;
bq->push(data);
printf("生产者生产了数据%d\n",data);
}
return nullptr;
}
void* consume(void* args)
{
blocking_queue<int>* bq=static_cast<blocking_queue<int>*>(args);
for (;;)
{
int data=bq->pop();
printf("消费者消费了数据%d\n",data);
}
return nullptr;
}
int main()
{
srand((unsigned int)time(NULL));
blocking_queue<int>* bq=new blocking_queue<int>();
pthread_t producer,consumer;
pthread_create(&producer,nullptr,produce,bq);
pthread_create(&consumer,nullptr,consume,bq);
pthread_join(producer,nullptr);
pthread_join(consumer,nullptr);
delete bq;
return 0;
}
如果生产者线程或消费者线程在条件变量下的等待队列中排队了被误唤醒了怎么办?
例如不小心调用pthread_cond_broadcast唤醒了条件变量下全部等待的线程,那么线程还要重新持有锁
现在将上方代码改成多生产者、多消费者版本,将pthread_cond_signal改成pthread_cond_broadcast
注意: 多生产者、多消费者模型在2025快手日常实习考过:https://www.nowcoder.com/feed/main/detail/b6e58f067c184875a1c7a03e4039d30d
感谢等闲牛友提供面试题!
#include <pthread.h>
#include <unistd.h>
#include <cstdio>
#include <cstdlib>
#include <queue>
#define PRODUCER_CNT 5
#define COMSUMER_CNT 5
template <class T>
class blocking_queue
{
public:
blocking_queue(size_t max_capacity=20)
:_max_capacity(max_capacity)
{
pthread_mutex_init(&_lock,nullptr);
pthread_cond_init(&_p_cond,nullptr);
pthread_cond_init(&_c_cond,nullptr);
}
T& pop()
{
pthread_mutex_lock(&_lock);
if (_q.empty())
pthread_cond_wait(&_c_cond,&_lock);
auto& obj=_q.front();
_q.pop();
pthread_cond_broadcast(&_p_cond);
pthread_mutex_unlock(&_lock);
return obj;
}
void push(const T& obj)
{
pthread_mutex_lock(&_lock);
if (_q.size()>=_max_capacity)
pthread_cond_wait(&_p_cond,&_lock);
_q.push(obj);
pthread_cond_broadcast(&_c_cond);
pthread_mutex_unlock(&_lock);
}
~blocking_queue()
{
pthread_mutex_destroy(&_lock);
pthread_cond_destroy(&_p_cond);
pthread_cond_destroy(&_c_cond);
while(!_q.empty())
_q.pop();
}
private:
std::queue<T> _q;
size_t _max_capacity;
pthread_mutex_t _lock;
pthread_cond_t _p_cond;//生产者的条件变量
pthread_cond_t _c_cond;//消费者的条件变量
};
void* produce(void* args)
{
pthread_detach(pthread_self());
blocking_queue<int>* bq=static_cast<blocking_queue<int>*>(args);
for (;;)
{
int data=rand()%10;
bq->push(data);
printf("生产者生产了数据%d\n",data);
}
return nullptr;
}
void* consume(void* args)
{
pthread_detach(pthread_self());
blocking_queue<int>* bq=static_cast<blocking_queue<int>*>(args);
for (;;)
{
int data=bq->pop();
printf("消费者消费了数据%d\n",data);
}
return nullptr;
}
int main()
{
srand((unsigned int)time(NULL));
blocking_queue<int>* bq=new blocking_queue<int>();
pthread_t producer,consumer;
for (int i=0;i<PRODUCER_CNT;i++)
pthread_create(&producer,nullptr,produce,bq);
for (int i=0;i<COMSUMER_CNT;i++)
pthread_create(&consumer,nullptr,consume,bq);
for(;;){sleep(100);}
delete bq;
return 0;
}
运行结果: 段错误

分析错误唤醒线程导致的问题
假设用pthread_cond_broadcast唤醒了2个消费者线程,其中一个线程完成消费,释放了锁,此时阻塞队列恰好空了,而且恰好另外一个被唤醒的消费者线程又抢到锁了(这里假设生产者线程运气不好,没有抢到锁),那么pop不会成功,因为队列为空,pop会出问题,导致进程崩溃!
资源没有就绪,此时唤醒访问该资源的线程就称为伪唤醒问题
解决方法: 改成循环
资源没有就绪,如果误唤醒访问该资源的线程,就让该访问资源的线程再判断资源是否就绪,写成循环就行
T& pop()
{
pthread_mutex_lock(&_lock);
while (_q.empty())
pthread_cond_wait(&_c_cond,&_lock);
auto& obj=_q.front();
_q.pop();
pthread_cond_broadcast(&_p_cond);
pthread_mutex_unlock(&_lock);
return obj;
}
void push(const T& obj)
{
pthread_mutex_lock(&_lock);
while (_q.size()>=_max_capacity)
pthread_cond_wait(&_p_cond,&_lock);
_q.push(obj);
pthread_cond_broadcast(&_c_cond);
pthread_mutex_unlock(&_lock);
}
这样进程就不会崩溃了
6.多生产多消费的意义
当一个生产者写入数据时,其它生产者获取数据,效率高; 当一个消费者读取数据时,其它消费者加工处理数据,效率高,并发度好
7.leetcode练习: 交替打印FooBar
https://leetcode.cn/problems/print-foobar-alternately/description/
给你一个类:
class FooBar { public void foo() { for (int i = 0; i < n; i++) { print("foo"); } } public void bar() { for (int i = 0; i < n; i++) { print("bar"); } } }两个不同的线程将会共用一个
FooBar实例:
- 线程 A 将会调用
foo()方法,而- 线程 B 将会调用
bar()方法请设计修改程序,以确保
"foobar"被输出n次。示例 1:
输入:n = 1 输出:"foobar" 解释:这里有两个线程被异步启动。其中一个调用 foo() 方法, 另一个调用 bar() 方法,"foobar" 将被输出一次。示例 2:
输入:n = 2 输出:"foobarfoobar" 解释:"foobar" 将被输出两次。提示:
1 <= n <= 1000typedef struct { int n; } FooBar; // Function declarations. Do not change or remove this line void printFoo(); void printBar(); FooBar* fooBarCreate(int n) { FooBar* obj = (FooBar*) malloc(sizeof(FooBar)); obj->n = n; return obj; } void foo(FooBar* obj) { for (int i = 0; i < obj->n; i++) { // printFoo() outputs "foo". Do not change or remove this line. printFoo(); } } void bar(FooBar* obj) { for (int i = 0; i < obj->n; i++) { // printBar() outputs "bar". Do not change or remove this line. printBar(); } } void fooBarFree(FooBar* obj) { }
分析
由于需要让两个线程按一定顺序打印字符串,那么需要同步,显然用条件变量,条件变量需要配合锁使用
假设线程A调用foo,线程B调用bar
foo函数内部调用n次printFoo,bar函数内部调用n次printBar,这里printFoo和printBar不需要自己手写,leetcode评测系统已经准备过了
这里需要修改foo和bar的打印逻辑,以及补写fooBarFree
这里将锁和条件变量定义为全局的:
pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond=PTHREAD_COND_INITIALIZER;
全局锁不需要手动调用pthread_mutex_destroy,全局变量不需要手动调用pthread_cond_destroy,那么fooBarFree仅需释放obj指向的对象:
void fooBarFree(FooBar* obj)
{
free(obj);
}
改写foo和bar函数
必须先线程A打印再线程B打印,如果按顺序打印,那么必须加锁后打印,如果不满足打印条件,需要放到条件变量下的等待队列中排队
错误思路
由于打印foo比打印bar先执行,那么可不可以这样做:
线程B执行bar时就让线程B先到条件变量下的等待队列中排队,等待线程A打印完foo后,再让线程A唤醒线程B
于是这样写:
void foo(FooBar* obj)
{
for (int i = 0; i < obj->n; i++)
{
pthread_mutex_lock(&lock);
// printFoo() outputs "foo". Do not change or remove this line.
printFoo();
pthread_mutex_unlock(&lock);
pthread_cond_signal(&cond);
}
}
void bar(FooBar* obj)
{
for (int i = 0; i < obj->n; i++)
{
pthread_mutex_lock(&lock);
pthread_cond_wait(&cond,&lock);
// printBar() outputs "bar". Do not change or remove this line.
printBar();
pthread_mutex_unlock(&lock);
}
}
不能像上面这样做,会产生死锁
添加如下调试信息:
void foo(FooBar* obj)
{
for (int i = 0; i < obj->n; i++)
{
printf("[线程A] 循环 i=%d 开始\n", i);
pthread_mutex_lock(&lock);
printf("[线程A] 获得锁\n");
printFoo();
printf("[线程A] 打印 foo\n");
pthread_mutex_unlock(&lock);
printf("[线程A] 释放锁\n");
if (!pthread_cond_signal(&cond))
printf("[线程A] 发送信号成功\n");
else
printf("[线程A] 发送信号失败\n");
}
}
void bar(FooBar* obj)
{
for (int i = 0; i < obj->n; i++)
{
printf("[线程B] 循环 i=%d 开始\n", i);
pthread_mutex_lock(&lock);
printf("[线程B] 获得锁\n");
printf("[线程B] 即将进入cond_wait\n");
pthread_cond_wait(&cond, &lock);
printf("[线程B] 从cond_wait返回\n");
printBar();
printf("[线程B] 打印 bar\n");
pthread_mutex_unlock(&lock);
printf("[线程B] 释放锁\n");
}
}
当n=1时,看看leetcode标准输出:

画个表格分析:

从图中看: 线程A打印一次foo后,调用pthread_cond_signal,线程B被唤醒后,从等待队列中离开,但是线程B没有及时获取锁,导致线程A再次获取锁,打印foo(这已经不满足题意了,foo被线程A连续打印2次)
之后线程A再次调用pthread_cond_signal,但是此时B不在等待队列中,导致信号丢失,线程A退出
线程B打印一次bar后,进入等待队列,由于线程A退出,那么线程B处于永久等待状态,导致超出时间限制
bar函数写法是错误的,不能无条件等待!!!!!!!!!!
void bar(FooBar* obj)
{
for (int i = 0; i < obj->n; i++)
{
printf("[线程B] 循环 i=%d 开始\n", i);
pthread_mutex_lock(&lock);
printf("[线程B] 获得锁\n");
printf("[线程B] 即将进入cond_wait\n");
pthread_cond_wait(&cond, &lock);
printf("[线程B] 从cond_wait返回\n");
printBar();
printf("[线程B] 打印 bar\n");
pthread_mutex_unlock(&lock);
printf("[线程B] 释放锁\n");
}
}
为了防止线程A连续取得锁,需要引入一个共享状态变量来指示当前该由哪个线程执行,并让线程在条件不满足时主动等待,而不是仅仅依赖锁的竞争
比如引入一个status变量:
如果status为0,那么让线程A打印foo,之后线程A设置status=1,这样下次让线程B可以打印bar
如果status为1,那么让线程B打印bar,之后线程B设置status=0,这样下次让线程A可以打印foo
当status为0,线程B不能打印bar,条件不满足,线程B需要等待
当status为1,线程A不能打印foo,条件不满足,线程A需要等待
为了防止伪唤醒,使用循环等待
那么foo、bar函数为:
void foo(FooBar* obj)
{
for (int i = 0; i < obj->n; i++)
{
pthread_mutex_lock(&lock);
// printFoo() outputs "foo". Do not change or remove this line.
while (status!=0)
pthread_cond_wait(&cond,&lock);
printFoo();
status=1;
pthread_mutex_unlock(&lock);
pthread_cond_signal(&cond);
}
}
void bar(FooBar* obj)
{
for (int i = 0; i < obj->n; i++)
{
pthread_mutex_lock(&lock);
while (status!=1)
pthread_cond_wait(&cond,&lock);
// printBar() outputs "bar". Do not change or remove this line.
printBar();
status=0;
pthread_mutex_unlock(&lock);
pthread_cond_signal(&cond);
}
}
代码
互斥锁+条件变量
由于先打印foo,那么status初始值为0
#include <pthread.h>
typedef struct
{
int n;
} FooBar;
int status=0;
//必须让打印foo的线程先持有锁
pthread_mutex_t lock=PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond=PTHREAD_COND_INITIALIZER;
// Function declarations. Do not change or remove this line
void printFoo();
void printBar();
FooBar* fooBarCreate(int n)
{
FooBar* obj = (FooBar*) malloc(sizeof(FooBar));
obj->n = n;
return obj;
}
void foo(FooBar* obj)
{
for (int i = 0; i < obj->n; i++)
{
pthread_mutex_lock(&lock);
// printFoo() outputs "foo". Do not change or remove this line.
while (status!=0)
pthread_cond_wait(&cond,&lock);
printFoo();
status=1;
pthread_mutex_unlock(&lock);
pthread_cond_signal(&cond);
}
}
void bar(FooBar* obj)
{
for (int i = 0; i < obj->n; i++)
{
pthread_mutex_lock(&lock);
while (status!=1)
pthread_cond_wait(&cond,&lock);
// printBar() outputs "bar". Do not change or remove this line.
printBar();
status=0;
pthread_mutex_unlock(&lock);
pthread_cond_signal(&cond);
}
}
void fooBarFree(FooBar* obj)
{
free(obj);
}
提交结果

当然本题也可以用自旋锁来解决,后面的文章会再解此题
8.多生产者、多消费者的生产者-消费模型的例子
多线程爬虫+下载器
多个生产者线程: 从URL队列取链接,下载网页,解析出新链接入队
多个消费者线程: 处理下载的网页内容
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐




所有评论(0)