#生产着消费者模型来理解进程间通信

321原则

3种关系

生产者和生产者是互斥关系;

生产者和消费者是同步关系;

消费者和消费者是互斥关系;

2种角色

生产者,消费者

1个交易场所

eg:阻塞队列,环形队列等;

我们实现下面两种版本,关于信号量条件变量+互斥锁进行通信的区别,看这篇文章;

条件变量+互斥锁版本

相关接口:

#include<pthread.h>

//互斥量(锁)
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrictattr);//初始化
int pthread_mutex_destroy(pthread_mutex_t *mutex)//销毁
int pthread_mutex_lock(pthread_mutex_t *mutex);//加锁
int pthread_mutex_unlock(pthread_mutex_t *mutex);//解锁

//条件变量
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrictattr);//初始化
int pthread_cond_destroy(pthread_cond_t *cond);//销毁
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);//暂时释放锁,并等待唤醒(重新获得锁)

int pthread_cond_signal(pthread_cond_t *cond);//唤醒等待
int pthread_cond_broadcast(pthread_cond_t *cond);

mutex原理

在这里插入图片描述

基于阻塞队列的生产者消费者模型

阻塞队列

多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别 在于,当队列为空时,从队列获取元素的操作将会被阻塞直到队列中被放入了元素;当队列满时,往队列里存放元 素的操作也会被阻塞直到有元素被从队列中取出

(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

在这里插入图片描述

C++模拟实现(单生产,单消费版本):

#include <iostream>
#include <queue>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
using namespace std;

#define NUM 8 //

class BlockQueue
{
private:
    std::queue<int> qu; // stl队列
    int cap;
    pthread_mutex_t lock; //互斥锁
    pthread_cond_t full;  //满cond
    pthread_cond_t empty; //空cond

private:
    //封装下列private函数便于理解;熟悉模型以后,可以去掉这么麻烦的步骤;
    void LockQueue()
    {
        pthread_mutex_lock(&lock);
    }
    void UnLockQueue()
    {
        pthread_mutex_unlock(&lock);
    }
    void ProductWait()
    {
        pthread_cond_wait(&full, &lock);
    }
    void ConsumeWait()
    {
        pthread_cond_wait(&empty, &lock);
    }
    void NotifyProduct()
    {
        pthread_cond_signal(&full);
    }
    void NotifyConsume()
    {
        pthread_cond_signal(&empty);
    }
    bool IsEmpty()
    {
        return (qu.size() == 0 ? true : false);
    }
    bool IsFull()
    {
        return (qu.size() == cap ? true : false);
    }

public:
    BlockQueue(int _cap = NUM) : cap(_cap)
    {
        pthread_mutex_init(&lock, NULL);
        pthread_cond_init(&full, NULL);
        pthread_cond_init(&empty, NULL);
    }
    void PushData(const int &data)
    {
        LockQueue();
        while(IsFull()){//为什么用while,cond为什么配合mutex使用参考上面的文章
           cout << "queue full, notify consume data, product stop." << endl;
            NotifyConsume();

            ProductWait();
        }
        qu.push(data);
        NotifyConsume();

        UnLockQueue();
    }
    void PopData(int &data)//值通过data带出去
    {
        LockQueue();
        while(IsEmpty()){
            cout << "queue Empty, notify product data, consume stop." << endl;
            NotifyProduct();

            ConsumeWait();
        }

        data = qu.front();
        qu.pop();
        NotifyProduct();
        UnLockQueue();
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&lock);
        pthread_cond_destroy(&full);
        pthread_cond_destroy(&empty);
    }
};

void *producter(void *arg) //消费者线程调用的handler
{
    auto ptr = (BlockQueue*)arg;

    for(int i = 0;i<10;i++){
        ptr->PushData(i);
        cout<<"push : "<<i<<endl;
        
    }
}

void *consumer(void *arg) //生产者线程调用的handler
{
    auto ptr = (BlockQueue*)arg;
    int tmp;
    for(int i = 0;i<10;i++){
        ptr->PopData(tmp);
        cout<<"pop : "<<tmp<<endl;
        sleep(1);//为了区分效果
    }
}

int main()
{
    BlockQueue bq;//一个交易场所;
    pthread_t c, p;//两个角色
    //创建线程(生产者,消费者)
    pthread_create(&c, NULL, consumer, (void *)&bq);
    pthread_create(&p, NULL, producter, (void *)&bq);
    //等待线程
    pthread_join(c, NULL);
    pthread_join(p, NULL);
    return 0;
}

可以看到我们设置了生产者生产不限速,消费者1秒消费一个的情况;

生产者瞬间将阻塞队列生产满,再生产提示full,等待消费之后有空间才能继续生产;

在这里插入图片描述

信号量版本

#include <semaphore.h>

int sem_init(sem_t *sem, int pshared, unsigned int value);//初始化信号量; pshared:0表示 线程间 共享,非零表示 进程间 共享//value:信号量初始值
int sem_destroy(sem_t *sem);//销毁

int sem_wait(sem_t *sem); //P()操作;      功能:等待信号量,会将信号量的值 减 1
int sem_post(sem_t *sem);//V()操作         功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值 加 1。

P,V操作原理

由于这个P V 操作,我们的sem不需要配合mutex使用了,底层是一个计数器;

在这里插入图片描述

P操作发现可用信号量为0,那就返回begin,挂起(暂时释放锁给V操作)并插入等待队列;
V操作每sem++之后,就通知等待队列被挂起的P操作,有东西了,可以消费了,把它拿入就绪队列…

基于环形队列的生产消费模型

上面那个mutex+cond的阻塞队列模型,基于queue,其空间可以动态分配;

现在基于固定大小的环形队列重写这个程序

  • 环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来 判断满或者空。
  • 另外也可以预留一个空的位置,作为满的状态

在这里插入图片描述

要符合环形队列的生产者消费者模型,生产者和消费者必须满足一下三个条件:

(1)生产者必须快于消费者

(2)生产者不能将消费者套圈

(3)消费者不能消费时,生产者先走;生产者满时,不能生产,消费者先走。

C++模拟实现(单生产,单消费版本):

#include <iostream>
#include <vector>
#include <stdlib.h>
#include <semaphore.h>
#include <pthread.h>
#include <unistd.h>


using namespace std;

#define NUM 16

class RingQueue
{
private:
    vector<int> q;
    int cap;
    sem_t data_sem;  //数据
    sem_t space_sem; //空间

    int consume_step; //用于环形队列标记下标;
    int product_step;

public:
    RingQueue(int _cap = NUM) : q(_cap), cap(_cap)
    {
        //初始0data,cap容量个空间;
        sem_init(&data_sem, 0, 0);
        sem_init(&space_sem, 0, cap);

        consume_step = 0; //初始位置;
        product_step = 0;
    }

    void PutData(const int &data)
    {
        sem_wait(&space_sem); // P() --

        q[product_step] = data;
        product_step++;
        product_step %= cap; //环形

        sem_post(&data_sem); // V() ++
    }
    void GetData(int &data)
    {
        sem_wait(&data_sem); // P() --

        data = q[consume_step];
        consume_step++;
        consume_step %= cap; //环形

        sem_post(&space_sem); // V() ++
    }

    ~RingQueue()
    {
        sem_destroy(&data_sem);
        sem_destroy(&space_sem);
    }
};
void *consumer(void *arg)
{
    auto ptr = (RingQueue*)arg;
    int tmp;
    for(int i = 0;i<10;i++){
        ptr->GetData(tmp);
        cout<<"get data : "<< tmp <<endl;
    }
}
void *producter(void *arg)
{
    auto ptr = (RingQueue*)arg;
    for(int i = 0;i<10;i++){
        ptr->PutData(i);
        cout<<"put data : "<< i <<endl;
          sleep(1);//生产者一秒生产一个;
    }
  
}

int main()
{
    RingQueue rq;
    pthread_t c, p;
    pthread_create(&c, NULL, consumer, (void *)&rq);
    pthread_create(&p, NULL, producter, (void *)&rq);

    pthread_join(c, NULL);
    pthread_join(p, NULL);
}

可以看没有数据的时候get信号量作用,阻塞,每秒生产一个数据,消费者就拿一个数据!实现同步!

在这里插入图片描述

基于环形队列的多生产多消费模型

在单生产单消费模型的基础上增加了消费者与生产者

实现这个模型我们只需要加入互斥锁实现生产者与生产者之间的互斥消费者与消费者之间的互斥

在这里插入图片描述

改static公共,类外初始化;

#include <iostream>
#include <vector>
#include <stdlib.h>
#include <semaphore.h>
#include <pthread.h>
#include <unistd.h>

using namespace std;

#define NUM 16

pthread_mutex_t lock1; //用于完成生产者与生产者之间的互斥
pthread_mutex_t lock2; //用于完成消费者与消费者之间的互

class RingQueue
{
private:
    vector<int> q;
    int cap;
    sem_t data_sem;  //数据
    sem_t space_sem; //空间

   static int consume_step; //用于环形队列标记下标;
   static int product_step;

public:
    RingQueue(int _cap = NUM) : q(_cap), cap(_cap)
    {
        //初始0data,cap容量个空间;
        sem_init(&data_sem, 0, 0);
        sem_init(&space_sem, 0, cap);

        static int consume_step; //初始位置;
        static int product_step;
    }

    void PutData(const int &data)
    {
        sem_wait(&space_sem); // P() --

        q[product_step] = data;
        product_step++;
        product_step %= cap; //环形

        sem_post(&data_sem); // V() ++
    }
    void GetData(int &data)
    {
        sem_wait(&data_sem); // P() --

        data = q[consume_step];
        consume_step++;
        consume_step %= cap; //环形

        sem_post(&space_sem); // V() ++
    }

    ~RingQueue()
    {
        sem_destroy(&data_sem);
        sem_destroy(&space_sem);
    }
};
int RingQueue:: consume_step = 0;
int RingQueue:: product_step = 0;
void *consumer(void *arg)
{
    auto ptr = (RingQueue *)arg;
    int tmp;
    for (int i = 0; i < 10; i++)
    {
        pthread_mutex_lock(&lock1);

        ptr->GetData(tmp);
        cout << "get data : " << tmp <<"thread : "<<pthread_self()<< endl;

        pthread_mutex_unlock(&lock1);
    }
}
void *producter(void *arg)
{
    auto ptr = (RingQueue *)arg;
    for (int i = 0; i < 10; i++)
    {
        pthread_mutex_lock(&lock2);

        ptr->PutData(i);
        cout << "put data : " << i << "thread : "<<pthread_self()<<endl;
        //sleep(1); //生产者一秒生产一个;

        pthread_mutex_unlock(&lock2);
    }
}

int main()
{
    pthread_mutex_init(&lock1, NULL);
    pthread_mutex_init(&lock2, NULL);

    pthread_t p[5]; //生产者数组
    pthread_t c[5]; //消费者数组
    RingQueue rq;

    for (int i = 0; i < 5; i++)
    {
        pthread_create(&p[i], NULL, producter, (void *)&rq);
        pthread_create(&c[i], NULL, consumer, (void *)&rq);
    }

    for (int i = 0; i < 5; i++)
    {
        pthread_join(p[i], NULL);
        pthread_join(c[i], NULL);
    }
}

在这里插入图片描述

GitHub 加速计划 / li / linux-dash
10.39 K
1.2 K
下载
A beautiful web dashboard for Linux
最近提交(Master分支:1 个月前 )
186a802e added ecosystem file for PM2 4 年前
5def40a3 Add host customization support for the NodeJS version 4 年前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐