互斥和同步(生产者消费者模型)
目录
互斥
为什么要有互斥
一些共享资源在多执行流并发访问中会出现错误,为了防止错误的发生,我们需要控制访问这些共享资源的执行流数量<=1,即多个执行流互斥地访问共享资源。
关于会产生什么错误,这里就不再赘述,相关文章有很多。
实现互斥的方式-互斥锁(C线程库为例)
当然,互斥的实现方式多种多样,比如中断屏蔽,比如OS提供的原子操作接口,而我给读者介绍的只是其中一种——互斥锁
互斥锁的原理
定义一个变量var(表示可以访问该共享资源的执行流的数量),将它的初始值设置为1。让所有想要访问共享资源的执行流都去检查var的值,并规定:如果var为1,说明可以访问该资源,本执行流将var置0后进行访问,访问结束后再把var置1;如果var为0,说明不可以访问该资源,本执行流继续检查。这样一来,只有先检查到var = 1 的执行流可以访问共享资源。
类比:先检查到var = 1 的执行流会将var置为0,此时共享资源如同被加锁,后来的执行流无法访问它。访问结束后var又被置为1,此时共享资源如同被解锁,其余执行流可以争抢并访问这个资源。因此这种实现互斥的方式叫做互斥锁。
C语言线程库提供的有关互斥锁的接口
创建锁:
#include<pthread.h>
pthread_mutex_t mutex;
- pthread_mutex_t是linux提供的结构体类型,这里面包含有实现互斥所以用的var变量。
静态初始化:
#include<pthread.h>
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
- 静态初始化专用于静态存储期(全局变量,静态局部变量等)的锁变量,而局部变量或者动态创建的锁不可用这种方式初始化。
动态初始化:
#include<pthread.h>
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr);
//mutex:锁的地址
//mutexattr:可以给锁设置属性,一般为nullptr
//函数调用成功返回0
- 动态初始化可以用于初始化静态存储期和非静态存储期(包括堆空间,栈空间等)的锁变量,但是使用这种方式初始化锁必须使用pthread_mutex_destroy函数销毁锁。
加锁:
#include<pthread.h>
int pthread_mutex_lock(pthread_mutex_t *mutex);
- 对应于检查var变量
解锁:
#include<pthread.h>
int pthread_mutex_unlock(pthread_mutex_t *mutex);
- 对应于var重置为0
锁的销毁:
#include<pthread.h>
int pthread_mutex_destroy(pthread_mutex_t *mutex);
- 如果mutex是堆空间上的变量,不要忘记调用delete或者free释放锁变量。
简单使用一下互斥锁
// 伪代码:使用互斥锁保护共享数据
// 定义共享资源
int account_balance = 1000; // 银行账户余额
//定义并初始化互斥锁
pthread_mutex_t balance_lock = PTHREAD_MUTEX_INITIALIZER;
// 存款操作(线程安全)
void deposit(int amount) {
pthread_mutex_lock(&balance_lock); // 加锁(进入临界区)
// 临界区开始:操作共享数据
int old_balance = account_balance;
account_balance = old_balance + amount;
printf("存款:%d -> %d\n", old_balance, account_balance);
// 临界区结束
pthread_mutex_unlock(&balance_lock); // 解锁(离开临界区)
}
// 取款操作(线程安全)
bool withdraw(int amount) {
pthread_mutex_lock(&balance_lock); // 加锁
// 临界区
bool success = false;
if (account_balance >= amount) {
int old_balance = account_balance;
account_balance = old_balance - amount;
printf("取款:%d -> %d\n", old_balance, account_balance);
success = true;
} else {
printf("取款失败:余额不足\n");
}
pthread_mutex_unlock(&balance_lock); // 解锁
return success;
}
// 主函数:创建多个线程并发操作
int main() {
// 创建多个线程同时进行存款和取款
create_thread(deposit, 100); // 线程1:存100
create_thread(withdraw, 200); // 线程2:取200
create_thread(deposit, 50); // 线程3:存50
// 等待所有线程完成
wait_all_threads();
printf("最终余额:%d\n", account_balance); // 应该是950
return 0;
}
重新理解互斥锁
在互斥锁的原理中,有两处疑点:
- var本身也是会被多个执行流访问的共享资源,所以多个执行流不能同时对var的操作。
- 如果在某执行流访问共享资源的同时其余执行流都不断轮询检查var的值,那么有很多CPU资源都被浪费了。
那么实际上互斥锁的原理如下:
mutex相当于之前提到的var,lock是加锁函数的伪代码,unlock是解锁函数的伪代码

- xchgb是一条交换指令,所有执行流都执行这条指令——将自己cpu上下文中的al(0)与mutex的值交换。如果发现交换后al是1,说明该执行流检查到mutex为1并且将mutext置为了0,实现了"加锁";如果发现交换后al是0,该执行流检查到mutex为0。可以发现,检查mutex的值和修改mutex的值仅仅通过一条指令就做到了,而指令的执行是原子的,不可被中断,因此虽然mutex是共享资源,但由于对mutex的操作都是原子的,所以访问mutex不用加以保护。
- movb是一条赋值指令,访问过共享资源的执行流执行这条指令将mutex重新置为1,这就是解锁操作。同理,原子操作保证访问mutex不用加以保护。
- 加锁失败后,本执行流会挂起等待。被挂起的执行流的PCB/TCB会被放到内核中的某等待队列中,并且PCB/TCB的状态会被修改为睡眠,所以本执行流不再被调度。当某执行流释放了锁后,才会唤醒所有挂起等待的执行流,所有执行流重新开始竞争。正是因此,不会浪费太多CPU资源。
同步
为什么要有同步
其一,资源竞争与合理调度。在使用锁进行并发控制时,持有锁的线程释放后可能立即再次参与竞争,从而连续获得锁资源。这会导致其他等待锁的线程长期处于饥饿状态,无法获得执行机会(例如在生产-消费者模型中,若消费者持续争抢锁,将阻塞生产者放入数据的动作,造成流程僵持与系统效率下降)。因此,同步机制也需考虑竞争的合理性,避免单一执行流无作为地占用资源。这是共享资源与执行流之间的同步关系。
其二,任务依赖与顺序控制。多个执行流在协作处理任务时,往往存在严格的先后依赖关系(例如炒菜流程:必须先完成洗菜,才能进行烧油,之后方可开始炒菜。若无同步机制协调,执行顺序可能错乱,导致任务无法正确完成)。这是执行流与执行流之间的同步关系。
条件变量实现同步(C线程库为例)
条件变量的核心数据结构
条件变量内维护一个等待队列,执行流可以调用接口主动在该队列上睡眠等待,也可以调用接口唤醒队列上等待的一个或者所有执行流。
C语言线程库提供的有关条件变量的接口
对条件变量进一步的叙述需要结合接口的使用来进行,所以在这里先来介绍一下条件变量接口的使用,希望读者不要感到突兀。
创建条件变量:
#include<pthread.h>
pthread_cond_t cond;
静态初始化:
#include<pthread.h>
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
- 静态初始化专用于静态存储期(全局变量,静态局部变量等)的条件变量,而局部变量或者动态创建的条件变量不可用这种方式初始化。
动态初始化:
#include<pthread.h>
int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *cond_attr);
//cond_attr是设置属性的参数,一般设置为nullptr
//初始化成功返回0
- 动态初始化可以用于初始化静态存储期和非静态存储期(包括堆空间,栈空间等)的条件变量,但是使用这种方式初始化的条件变量必须使用pthread_cond_destroy函数销毁条件变量。
销毁条件变量:
#include<pthread.h>
int pthread_cond_destroy(pthread_cond_t *cond);
在条件变量下睡眠等待:
#include<pthread.h>
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
//mutex是互斥锁
- 调用pthread_cond_wait函数时需要绑定一个互斥锁,且这个互斥锁必须是加锁状态(否则导致未定义行为)。
- 调用pthread_cond_wait函数后绑定的互斥锁会自动释放,然后执行流阻塞等待,等到执行流被唤醒的时候,会自动申请加锁,如果申请成功,会从pthread_cond_wait语句后面继续执行;如果申请失败,会在锁的等待队列上等待继续申请加锁。
唤醒条件变量下等待执行流:
#include<pthread.h>
int pthread_cond_signal(pthread_cond_t *cond);//唤醒一个
int pthread_cond_broadcast(pthread_cond_t *cond);//唤醒所有
- 如果使用pthread_cond_signal唤醒单个等待的执行流,唤醒的是最先入队列的执行流(先进先出)
为什么条件变量要依赖互斥锁
条件变量顾名思义:不满足条件就阻塞,满足条件就继续执行。对于单个执行流来说,在阻塞等待时,不会无缘无故的就不再等待了,必然是由于其他执行流的操作导致条件满足。因此这个条件本身就有共享性。而如果涉及到了共享,那么就必须通过互斥锁来保护这个过程。所以我们可以明白,只要使用条件变量这种手段,那么就必然会有互斥锁的出现和配合,因此在条件变量机制中,调用pthread_cond_wait干脆直接绑定一个互斥锁,由函数帮我们做加锁和解锁的工作。
条件变量实现同步
条件变量这种手段实现同步的方式就是:通过判断被互斥锁保护的共享资源是否满足条件,决定执行流是否在条件变量上等待。因此天然的,条件变量很适合实现资源与执行流之间的同步,也就是保证合理调度。
这在生产者-消费者模型中有体现:在缓冲区空的时候,消费者不要再竞争锁,而是让生产者放;在缓冲区满的时候,生产者不要再竞争锁,而是让消费者拿。
伪唤醒
if(条件)//满足条件即说明共享资源没准备好,该执行流需要等待
{
pthread_cond_wait(&cond,&mutex);
}
有这样一种情况:共享资源准备好了,因此唤醒等待在条件变量下的执行流,执行流1被唤醒后没有成功加锁,而执行流2成功加锁,等到执行流2访问共享资源结束,共享资源又变成没有准备好的状态了,此时执行流1才成功加锁,因为执行流1是被唤醒的,执行流1会从pthread_cond_wait语句后面开始执行,即,执行流1此时本来应该等待,但却越过条件判断访问了共享资源。
解决方法就是让被唤醒并成功加锁的执行流再进行条件判断,如下:
while(条件)//满足条件即说明共享资源没准备好,该执行流需要等待
{
pthread_cond_wait(&cond,&mutex);
}
信号量实现同步(C线程库为例)
信号量的核心数据结构
信号量是资源计数器,它表示可用资源的数目。因此它的核心数据结构就是计数器(可能就是一个整型变量),而且,信号量本身也是需要被多执行流共同访问的共享资源,因此也需要互斥保护,所以信号量内部应该还有互斥锁和等待队列(防止循环等待)一类的数据结构,当然如果可以依靠别的方式实现计数器的互斥访问,就不需要使用互斥锁了,比如用中断屏蔽,或者系统提供的原子操作接口。
总而言之,信号量的核心数据结构包括:计数器+保证计数器互斥访问的机制+等待队列。
C语言线程库提供的信号量的相关接口
创建信号量:
#include <semaphore.h>
sem_t sem;
初始化信号量:
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
//sem:信号量
//pthread:一般不关心,设置为0
//value:信号量代表的资源数目
//成功返回0
销毁信号量:
#include <semaphore.h>
int sem_destroy(sem_t *sem);
申请资源:
#include <semaphore.h>
int sem_wait(sem_t *sem);
//成功返回0
- 申请成功,信号量的资源计数器减一,申请资源失败就会阻塞
释放资源:
#include<semaphore.h>
int sem_post(sem_t *sem);
//成功返回0
- 释放成功,信号量的资源计数器加一
信号量实现同步
虽然我上述提到,信号量表示某种可用资源的数目,听起来信号量就是为了实现资源与执行流之间的同步关系(合理调度)。但事实恰恰相反,信号量最适合的是实现执行流与执行流之间的同步关系(顺序控制)。
比如可以这样实现多个执行流的顺序控制:

//由三个同步关系,设置三个信号量(资源计数器为1)
sem_t A
sem_t B
sem_t C
P表示申请资源,V表示释放资源
//执行流1
---相关操作
V(A)
//执行流2
---相关操作
V(B)
//执行流3
p(A)
p(B)
---相关操作
V(C)
//执行流4
p(C)
---相关操作
此时,V操作如同发送信号,P操作如同等待信号,只有某个事件收到了所有前驱事件发送的信号,该事件才会执行。
除此之外,一旦资源耗尽,申请资源的操作就会阻塞,因此单纯使用信号量也可以实现共享资源与执行流之间的同步(合理调度),但是这是有局限性的,不如条件变量那样适用范围广——条件变量的阻塞条件可以多种多样,而信号量的阻塞条件只有资源耗尽。
生产者消费者模型
生产者消费者模型:两个或多个执行流之间通过共享缓冲区进行数据传输。
基于阻塞队列的生产者-消费者模型
#include<pthread.h>
#include<queue>
#include<mutex>
#include<iostream>
using namespace std;
/*这是一个阻塞队列,它的特点是:
1.队列为空,取出操作阻塞;队列为满,插入操作阻塞。
2.线程安全
*/
template<class T>
class BQ
{
private:
queue<T> _que;
pthread_mutex_t _mutex;//保证对阻塞队列的操作线程安全的锁
pthread_cond_t _cond1;//生产者的等待队列
pthread_cond_t _cond2;//消费者的等待队列
int _cap;//阻塞队列的容量
public:
bool isfull(){return _que.size() == _cap;}
bool isempty(){return _que.size() == 0;}
BQ(int capacity):
_cap(capacity)
{
pthread_mutex_init(&_mutex,nullptr);
pthread_cond_init(&_cond1,nullptr);
pthread_cond_init(&_cond2,nullptr);
}
~BQ()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond1);
pthread_cond_destroy(&_cond2);
}
void push(const T& arg)
{
pthread_mutex_lock(&_mutex);//加锁
while(isfull())//用while而不是if,防止伪唤醒
{
cout<<"生产者等待"<<endl;
pthread_cond_wait(&_cond1,&_mutex);//缓冲区满了就让执行流睡眠等待
}
_que.push(arg);
pthread_cond_signal(&_cond2);//唤醒消费者等待队列中的消费者
pthread_mutex_unlock(&_mutex);//解锁
}
void pop(T& value)
{
pthread_mutex_lock(&_mutex);//加锁
while(isempty())//用while而不是if,防止伪唤醒
{
cout<<"消费者等待"<<endl;
pthread_cond_wait(&_cond2,&_mutex);//缓冲区满了就让执行流睡眠等待
}
value = _que.front();
_que.pop();
pthread_cond_signal(&_cond1);//唤醒消费者等待队列中的消费者
pthread_mutex_unlock(&_mutex);//解锁
}
};
-----------------------------------------------------------------------------------------
#include<functional>
#include"blockqueue.hpp"
#include<unistd.h>
using task_t = function<void()>;
void* product(void* arg)//生产者的函数
{
BQ<task_t> * buffer = static_cast<BQ<task_t>*>(arg);
int x = 5;
while(1)
{
sleep(1);
cout<<"生产任务"<<endl;
buffer->push([x]()->void{cout<<"任务"<<x<<"执行"<<endl;});
x++;
}
}
void* consume(void* arg)//消费者的函数
{
BQ<task_t>* buffer = static_cast<BQ<task_t>*>(arg);
while(1)
{
task_t task;
buffer->pop(task);
task();
}
}
int main()
{
BQ<task_t> buffer(5);//使用阻塞队列做缓冲区,队列中的"产品"是任务函数,在这种情况下生产者不断生产"任务"而消费者不断执行拿到的"任务"函数
pthread_t id1;
pthread_t id2;
pthread_t id3;
pthread_t id4;
//两个生产者
pthread_create(&id1,nullptr,product,&buffer);
pthread_create(&id2,nullptr,product,&buffer);
//两个消费者
pthread_create(&id3,nullptr,consume,&buffer);
pthread_create(&id4,nullptr,consume,&buffer);
pthread_join(id1,nullptr);
pthread_join(id2,nullptr);
pthread_join(id3,nullptr);
pthread_join(id4,nullptr);
}
使用了两个条件变量,利用共享资源与执行流之间的同步关系,保证了合理调度(即,缓冲区为空的时候,消费者不会争抢锁导致无效加锁以及生产者无法放入;缓冲区为满时,生产者不会争抢锁导致无效加锁以及消费者无法拿取)。
注:该类型的生产者消费者模型将缓冲区当作整体使用,因此同一时间访问缓冲区执行流只能有一个。此外,它使用条件变量实现合理调度。
基于环形阻塞队列的生产者消费者模型
#include<semaphore.h>
#include<vector>
#include<mutex>
#include<iostream>
using namespace std;
/*这是一个环形阻塞队列,它的特点是:
1.队列为空,取出操作阻塞;队列为满,插入操作阻塞。
2.线程安全
*/
/*
信号量如何实现同步:
当队列为空时,数据资源数量为0,因此消费者无法消费,
当队列为满时,空间资源数量为0,因此生产者无法生产。
由此实现合理调度
同时,由于信号量控制了,为空未满的时候读写不能同时操作(互斥),因此,读写之间可以自然而然并发(并行)
*/
template<class T>
class RQ
{
private:
vector<T> _que;//数组模拟循环队列
pthread_mutex_t _mutexp;//保证多个生产者互斥访问的锁
pthread_mutex_t _mutexc;//保证多个消费者互斥访问的锁
sem_t _data;//表示可读取的资源数量的信号量
sem_t _empty;//表示可写入的资源(空位置)数量的信号量
int _cap;//队列的容量
int _head;//指向下一个可放数据的位置
int _tail;//指向下一个可读取数据的位置
public:
RQ(int capacity):
_cap(capacity),
_head(0),_tail(0)
{
pthread_mutex_init(&_mutexp,nullptr);
pthread_mutex_init(&_mutexc,nullptr);
sem_init(&_data,0,0);
sem_init(&_empty,0,_cap);
_que.resize(_cap);
}
~RQ()
{
pthread_mutex_destroy(&_mutexp);
pthread_mutex_destroy(&_mutexc);
sem_destroy(&_data);
sem_destroy(&_empty);
}
void push(const T& arg)
{
sem_wait(&_empty);//申请空间资源
pthread_mutex_lock(&_mutexp);//加锁
_que[_head++] = arg;
_head %= _cap;
sem_post(&_data);//增加一个数据资源
pthread_mutex_unlock(&_mutexp);//解锁
}
void pop(T& value)
{
sem_wait(&_data);//申请数据资源
pthread_mutex_lock(&_mutexc);//加锁
value = _que[_tail++];
_tail%=_cap;
sem_post(&_empty);//增加一个空间资源
pthread_mutex_unlock(&_mutexc);//解锁
}
};
-----------------------------------------------------------------------------------------
#include<functional>
#include<pthread.h>
#include"ringqueue.hpp"
#include<unistd.h>
using task_t = function<void()>;
void* product(void* arg)//生产者的函数
{
RQ<task_t> * buffer = static_cast<RQ<task_t>*>(arg);
int x = 5;
while(1)
{
cout<<"生产任务"<<endl;
task_t mytask = [x]()->void{cout<<"任务"<<x<<"执行"<<endl;};
buffer->push(mytask);
x++;
}
}
void* consume(void* arg)//消费者的函数
{
RQ<task_t>* buffer = static_cast<RQ<task_t>*>(arg);
while(1)
{ sleep(1);
task_t task;
buffer->pop(task);
task();
}
}
int main()
{
RQ<task_t> buffer(5);
pthread_t id1;
pthread_t id2;
pthread_t id3;
pthread_t id4;
//两个生产者
pthread_create(&id1,nullptr,product,&buffer);
pthread_create(&id2,nullptr,product,&buffer);
//两个消费者
pthread_create(&id3,nullptr,consume,&buffer);
pthread_create(&id4,nullptr,consume,&buffer);
pthread_join(id1,nullptr);
pthread_join(id2,nullptr);
pthread_join(id3,nullptr);
pthread_join(id4,nullptr);
}
注:该类型的生产者消费者模型将缓冲区当作局部使用(但也不是完全局部,至少没有实现生产者之间的并发(并行)放入,只实现了生产者和消费者之间的并发(并行)访问)。因此同一时间访问缓冲区执行流可以有两个,且一个生产者,一个消费者。此外,它使用信号量实现合理调度。
生产者消费者模型究竟为何高效

生产者的数据不是凭空而来,也不能直接交给缓冲区,因此生产者需要较长时间来获取并封装数据;消费者拿到数据后也不会什么都不做,而是用较长时间来处理数据。
由于缓冲区的存在,在生产者获取并封装数据的同时就算没有把数据交给消费者,消费者也可以从缓冲区里拿取数据并处理数据;而在消费者处理数据的同时就算没有处理结束,生产者也可以不断获取和处理数据并放入缓冲区。那么显而易见,生产者和消费者的操作:获取数据,封装数据,处理数据,是并行的,整个数据流动的过程宏观上是没有丝毫停滞的(对比与串行),非常高效。
而且对于基于环形阻塞队列的生产者消费者模型来说,生产者和消费者是可以同时访问缓冲区的,这样一来,获取数据,封装任务,获取任务,处理数据,整个过程都是并行的,效率极其之高。
在基于阻塞队列的生产者消费者模型中生产者和消费者对于缓冲区的访问依然是互斥的(串行的),它的效率是不是就和不用该模型一样呢?关于这一点做如下解释:
- 由于缓冲区的缓冲作用,在生产者获取数据的同时,消费者依然可以并行的处理数据,因此它依然是高效的,宏观上数据流动是没有停滞的。
- 对缓冲区执行拿取和放入操作的时间,相比于生产者和消费者处理数据的时间,是较少的,我们让大部分时间是并发(并行),而少部分时间串行,这同样不失高效性。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)