生产者消费者模型

生产者消费者模型是一个经典的同步与互斥问题,描述的是:

两个或多个进程/线程共享一个固定大小的缓冲区,其中"生产者"负责往缓冲区添加数据,"消费者"负责从缓冲区取出数据,需要确保生产者不会在缓冲区满时继续添加,消费者不会在缓冲区空时继续取出。

线程的创建

创建线程的函数为pthread_create()。
1.第一个参数为:线程标识符。类型:pthread_t *
作用:用于存储新创建线程的唯一标识符。该标识符后续可用于线程管理
(如pthread_join,等待进程结束)。
2.第二个参数为:线程属性。类型:const pthread_attr_t *
作用:指定线程的额外属性(如栈大小、调度策略等)。若为NULL,则使用默认属性。
3.第三个参数为:线程函数。型:void ()(void )
作用:新线程执行的入口函数。该函数必须返回void
并接受一个void*参数。
4.最后一个参数为:参数传递。类型:void *
作用:传回一个参数,无要求默认使用NULL。

返回值:成功时返回0,失败时返回错误码。

#include <pthread.h>

       int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
                          void *(*start_routine) (void *), void *arg);

线程创建:

pthread_t pthid1;

pthread_t pthid2;

//创建线程
pthread_create(&pthid1,NULL,producer,NULL); //线程1执行生产者函数

pthread_create(&pthid2,NULL,consumer,NULL); //线程2执行消费者函数

创建互斥锁(mutex)

互斥锁是一种同步机制,用于保护共享资源在多线程或多进程环境下的独占访问。当一个线程持有互斥锁时,其他线程尝试获取该锁会被阻塞,直到锁被释放。
互斥锁类型为pthread_mutex_t。

互斥锁常见函数

  1. pthread_mutex_init: 初始化互斥锁。
  2. pthread_mutex_lock: 尝试获取互斥锁。若锁已被其他线程持有,则调用线程会阻塞。
  3. pthread_mutex_trylock: 非阻塞尝试获取互斥锁。若锁不可用,立即返回错误码而非阻塞。
  4. pthread_mutex_unlock: 释放互斥锁。
  5. pthread_mutex_destroy: 销毁互斥锁,释放相关资源。

创建信号量

信号量和互斥锁都是用于多线程或多进程同步的机制,主要用于控制对共享资源的访问。

信号量通常支持以下基本操作:

初始化(Init):设置信号量的初始值(通常为非负整数)。
等待(Wait/P):也称为“P操作”,尝试减少信号量的值。若值为0,则阻塞调用者。
释放(Post/V):也称为“V操作”,增加信号量的值,唤醒等待的线程/进程。

int sem_init(sem_t *sem, int pshared, unsigned int value);
 /*
 sem: 指向信号量变量的指针。
pshared: 指定信号量的共享方式:
0 表示信号量只能由当前进程的线程共享。
非 0 表示信号量可以在多个进程间共享(需存储在共享内存中)。
value: 信号量的初始值。
*/

创建信号量


sem_t producer_sem;
sem_t consumer_sem;

//初始化信号量

sem_init(&producer_sem, 0, 5);		//0表示该信号量只供当前进程使用,5表示缓冲区有5个空间可存放生产物
sem_init(&consumer_sem, 0, 0);		//表示初始时缓冲区为空,消费者需等待生产者生成数据。

生产者函数和消费者函数

在该模型中,我们需要一个东西用来存储生产数据,可以是链表,也可以是数组,这里用链表来存储。


typedef struct node{
    int num;
    struct node* next;
}Node;		//定义节点

Node* head = NULL;		//头指针为空,表示没有元素

这样就定义了一个空链表。

除此之外,我们还需要定义出生产者线程执行的函数和消费者线程执行的函数。

producer:

void* producer(void* arg){
    while(1){

        sem_wait(&producer_sem);       //对producer_sem进行--操作,用于“加锁”或“等待”,P操作
        Node* node = malloc(sizeof(Node));    //创建新节点
        node->num = rand()%1000;                //随机赋值
        pthread_mutex_lock(&mutex);    //添加之前上锁
        node->next = head;                      //头插法
        head = node;                            //head指向头节点
        printf("id:%ld,   data:%d\n",pthread_self(),head->num);
        pthread_mutex_unlock(&mutex);  //添加之后解锁
        sem_post(&consumer_sem);        //对consumer_sem进行++操作,用于“解锁”或“发出”信号。V操作

        sleep(rand()%3);
    }
}

在对共享资源进行修改访问前,需要对共享资源进行上锁,避免多线程之间共同访问。

并在修改之后进行解锁,以便其他进程访问共享资源。

		pthread_mutex_lock(&mutex);    //添加之前上锁
        node->next = head;                      //头插法
        head = node;                            //head指向头节点
        printf("id:%ld,   data:%d\n",pthread_self(),head->num);
        pthread_mutex_unlock(&mutex);  //添加之后解锁

consumer

几乎与生产者函数相差无几 。

void* consumer(void* arg){
    while(1){
            sem_wait(&consumer_sem);
            pthread_mutex_lock(&mutex);
            Node* p = head;
            head = head->next;
            printf("id:%ld,  cons:%d\n",pthread_self(),p->num);
            free(p);
            pthread_mutex_unlock(&mutex);
            sem_post(&producer_sem);
            sleep(rand() % 3);

            
        }    
}

主函数

信号量的初始化、线程的创建、销毁信号量等都在主函数中进行。
表现为:

int main(){
    
    //初始化信号量
    sem_init(&producer_sem, 0, 5);
    sem_init(&consumer_sem, 0, 0);
    
    //创建线程
    pthread_create(&pthid1,NULL,producer,NULL);
    pthread_create(&pthid2,NULL,consumer,NULL);

    //等待线程结束
    pthread_join(pthid1,NULL);
    pthread_join(pthid2,NULL);

    //销毁信号量
    sem_destroy(&consumer_sem);
    sem_destroy(&producer_sem);
    pthread_mutex_destroy(&mutex);

    return 0;
}
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐