Linux线程:基于环形队列的生产消费模型
目录

一、引言
在之前的文章里,我们已经聊过生产者和消费者模型了。下面就来聊聊 POSIX 信号量是怎么让线程们排队同步的。
线程同步,说白了,就是当多个线程都想用同一个东西(共享资源)的时候,得有个规矩,让它们按顺序来,不然大家都一股脑的去改这个东西,最后结果就会乱七八糟。
条件变量和信号量都是让线程同步的工具。不过,条件变量实现线程同步,相对来说,理解起来更容易。就好比,条件变量像是有个“信号灯”,当某个条件满足了,它就亮起绿灯,通知另一个线程可以“出发”了,这种同步方式很直观。而信号量,就好像是让多个线程一起“举手”申请资源,然后通过锁来“排队”,有秩序地去访问共享资源,这其实也是一种同步方式。
二、POSIX信号量
不管是SystemV信号量还是POSIX信号量,本质上都是一个计数器,记录可用资源的个数。所以二元信号量也可以当做互斥锁来使用,对信号量进行P操作,本质上就是减减计数器,然后计数器的值为0,表示目前共享资源正在被某个线程访问,其他线程如果也想访问,必须等待该线程访问结束进行V操作后(本质上就是计数器加加,然后计数器的值由0变1),才可以访问。
下面来谈谈POSIX信号量的几个基本操作。
1)初始化信号量
int sem_init(sem_t *sem, int pshared, unsigned int value);
- sem :指向信号量的指针。
- pshared :设置为 0 ,表示信号量仅在当前进程的线程间共享。
- value :信号量的初始值,设为 1 表示互斥锁,也可以设为共享资源的数量。
- 返回值:成功返回 0 ,失败返回 -1 。
2)销毁信号量
int sem_destroy(sem_t *sem);
- sem :指向信号量的指针。
- 返回值:成功返回 0 ,失败返回 -1 。
3)等待信号量
int sem_wait(sem_t *sem);
- sem :指向信号量的指针。
- 返回值:成功返回 0 ,失败返回 -1 。
阻塞等待信号量的值大于0,然后申请到以后将其值减1。
4)释放信号量
int sem_post(sem_t *sem);
- sem :指向信号量的指针。
- 返回值:成功返回 0 ,失败返回 -1
释放信号量,将其值加1。
5)获取信号量的值
int sem_getvalue(sem_t *sem, int *sval);
- sem :指向信号量的指针。
- sval :存储信号量当前值的指针。
- 返回值:成功返回 0 ,失败返回 -1 。
POSIX信号量的操作就介绍到这,不过也基本上覆盖了,下面我们进入下一个话题。
三、环形队列

我们知道,环形队列存在一个叫假溢出的问题,常见的解决方案有两种。
第一,假设环形队列的最大容量为N,那么我们只用N - 1个,当 tail + 1 == head时,我们认为此时环形队列已满。第二,维护一个计数器,计数器的初始值是环形队列的最大容量,根据计数器的值我们就可以知道环形队列的使用情况。
我们知道,信号量本质上就是一个计数器,所以很适合用来维护环形队列。
下面我们开始实操。
四、代码
#ifndef _RINGQUEUE_HPP_
#define _RINGQUEUE_HPP_
#include <vector>
#include <pthread.h>
#include <semaphore.h>
const int default_size = 10;
class RingQueue {
public:
RingQueue(int size = default_size)
:_array(size)
,_capacity(size)
,_head(0)
,_tail(0)
{
sem_init(&_space_sem, 0, _capacity);
sem_init(&_data_sem, 0, 0);
pthread_mutex_init(&_producer_mutex, nullptr);
pthread_mutex_init(&_consumer_mutex, nullptr);
}
void push(int data) {
//这里先申请信号量在申请锁,相当于先买票在排队
//如果反过来,先申请锁,在申请信号量,那么,如果锁被占用,那么就会一直白白等待
sem_wait(&_space_sem);//P操作
pthread_mutex_lock(&_producer_mutex);
_array[_tail] = data;
_tail = (_tail + 1) % _capacity;
pthread_mutex_unlock(&_producer_mutex);
sem_post(&_data_sem);//V操作
}
void pop(int& data) {
sem_wait(&_data_sem);//P操作
pthread_mutex_lock(&_consumer_mutex);
data = _array[_head];
_head = (_head + 1) % _capacity;
pthread_mutex_unlock(&_consumer_mutex);
sem_post(&_space_sem);//V操作
}
~RingQueue() {
sem_destroy(&_space_sem);
sem_destroy(&_data_sem);
pthread_mutex_destroy(&_producer_mutex);
pthread_mutex_destroy(&_consumer_mutex);
}
private:
std::vector<int> _array;
int _capacity;
int _head;
int _tail;
sem_t _space_sem;//空余空间信号量
sem_t _data_sem;//数据信号量
pthread_mutex_t _producer_mutex;
pthread_mutex_t _consumer_mutex;
};
#endif // _RINGQUEUE_HPP_
五、结语
POSIX信号量就介绍到这了,感兴趣的可以去了解一下POSIX的有名信号量。
完~
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)