信号量和互斥锁实现生产者消费者模型
信号量和互斥锁实现生产者消费者模型
生产者消费者模型
生产者消费者模型是一个经典的同步与互斥问题,描述的是:
两个或多个进程/线程共享一个固定大小的缓冲区,其中"生产者"负责往缓冲区添加数据,"消费者"负责从缓冲区取出数据,需要确保生产者不会在缓冲区满时继续添加,消费者不会在缓冲区空时继续取出。
线程的创建
创建线程的函数为pthread_create()。
1.第一个参数为:线程标识符。类型:pthread_t *
作用:用于存储新创建线程的唯一标识符。该标识符后续可用于线程管理
(如pthread_join,等待进程结束)。
2.第二个参数为:线程属性。类型:const pthread_attr_t *
作用:指定线程的额外属性(如栈大小、调度策略等)。若为NULL,则使用默认属性。
3.第三个参数为:线程函数。型:void ()(void )
作用:新线程执行的入口函数。该函数必须返回void并接受一个void*参数。
4.最后一个参数为:参数传递。类型:void *
作用:传回一个参数,无要求默认使用NULL。
返回值:成功时返回0,失败时返回错误码。
#include <pthread.h>
int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
void *(*start_routine) (void *), void *arg);
线程创建:
pthread_t pthid1;
pthread_t pthid2;
//创建线程
pthread_create(&pthid1,NULL,producer,NULL); //线程1执行生产者函数
pthread_create(&pthid2,NULL,consumer,NULL); //线程2执行消费者函数
创建互斥锁(mutex)
互斥锁是一种同步机制,用于保护共享资源在多线程或多进程环境下的独占访问。当一个线程持有互斥锁时,其他线程尝试获取该锁会被阻塞,直到锁被释放。
互斥锁类型为pthread_mutex_t。
互斥锁常见函数
- pthread_mutex_init: 初始化互斥锁。
- pthread_mutex_lock: 尝试获取互斥锁。若锁已被其他线程持有,则调用线程会阻塞。
- pthread_mutex_trylock: 非阻塞尝试获取互斥锁。若锁不可用,立即返回错误码而非阻塞。
- pthread_mutex_unlock: 释放互斥锁。
- pthread_mutex_destroy: 销毁互斥锁,释放相关资源。
创建信号量
信号量和互斥锁都是用于多线程或多进程同步的机制,主要用于控制对共享资源的访问。
信号量通常支持以下基本操作:
初始化(Init):设置信号量的初始值(通常为非负整数)。
等待(Wait/P):也称为“P操作”,尝试减少信号量的值。若值为0,则阻塞调用者。
释放(Post/V):也称为“V操作”,增加信号量的值,唤醒等待的线程/进程。
int sem_init(sem_t *sem, int pshared, unsigned int value);
/*
sem: 指向信号量变量的指针。
pshared: 指定信号量的共享方式:
0 表示信号量只能由当前进程的线程共享。
非 0 表示信号量可以在多个进程间共享(需存储在共享内存中)。
value: 信号量的初始值。
*/
创建信号量
sem_t producer_sem;
sem_t consumer_sem;
//初始化信号量
sem_init(&producer_sem, 0, 5); //0表示该信号量只供当前进程使用,5表示缓冲区有5个空间可存放生产物
sem_init(&consumer_sem, 0, 0); //表示初始时缓冲区为空,消费者需等待生产者生成数据。
生产者函数和消费者函数
在该模型中,我们需要一个东西用来存储生产数据,可以是链表,也可以是数组,这里用链表来存储。
typedef struct node{
int num;
struct node* next;
}Node; //定义节点
Node* head = NULL; //头指针为空,表示没有元素
这样就定义了一个空链表。
除此之外,我们还需要定义出生产者线程执行的函数和消费者线程执行的函数。
producer:
void* producer(void* arg){
while(1){
sem_wait(&producer_sem); //对producer_sem进行--操作,用于“加锁”或“等待”,P操作
Node* node = malloc(sizeof(Node)); //创建新节点
node->num = rand()%1000; //随机赋值
pthread_mutex_lock(&mutex); //添加之前上锁
node->next = head; //头插法
head = node; //head指向头节点
printf("id:%ld, data:%d\n",pthread_self(),head->num);
pthread_mutex_unlock(&mutex); //添加之后解锁
sem_post(&consumer_sem); //对consumer_sem进行++操作,用于“解锁”或“发出”信号。V操作
sleep(rand()%3);
}
}
在对共享资源进行修改访问前,需要对共享资源进行上锁,避免多线程之间共同访问。
并在修改之后进行解锁,以便其他进程访问共享资源。
pthread_mutex_lock(&mutex); //添加之前上锁
node->next = head; //头插法
head = node; //head指向头节点
printf("id:%ld, data:%d\n",pthread_self(),head->num);
pthread_mutex_unlock(&mutex); //添加之后解锁
consumer
几乎与生产者函数相差无几 。
void* consumer(void* arg){
while(1){
sem_wait(&consumer_sem);
pthread_mutex_lock(&mutex);
Node* p = head;
head = head->next;
printf("id:%ld, cons:%d\n",pthread_self(),p->num);
free(p);
pthread_mutex_unlock(&mutex);
sem_post(&producer_sem);
sleep(rand() % 3);
}
}
主函数
信号量的初始化、线程的创建、销毁信号量等都在主函数中进行。
表现为:
int main(){
//初始化信号量
sem_init(&producer_sem, 0, 5);
sem_init(&consumer_sem, 0, 0);
//创建线程
pthread_create(&pthid1,NULL,producer,NULL);
pthread_create(&pthid2,NULL,consumer,NULL);
//等待线程结束
pthread_join(pthid1,NULL);
pthread_join(pthid2,NULL);
//销毁信号量
sem_destroy(&consumer_sem);
sem_destroy(&producer_sem);
pthread_mutex_destroy(&mutex);
return 0;
}
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)