前言

在多线程并发编程中,如何协调 “数据生成” 与 “数据处理” 的节奏,避免因供需失衡导致的效率浪费或资源竞争?生产者消费者模型给出了经典答案。它通过 “解耦生产者与消费者”“统一交易场所” 的设计,让数据生成和处理可以独立并发执行,既解决了忙闲不均的问题,又最大化利用了系统资源,成为分布式系统、消息队列、服务器后台等场景的核心底层模型。

本文将从模型本质出发,层层拆解生产者消费者模型的核心逻辑:首先详解 “321 原则”(3 种关系、2 种角色、1 个交易场所),厘清生产者与生产者、消费者与消费者、生产者与消费者之间的互斥与同步关系;随后聚焦实战,分别实现两种经典的交易场所 —— 基于条件变量的阻塞队列,以及基于 POSIX 信号量的环形队列,完整呈现锁与条件变量、信号量的协同使用技巧;最后补充 POSIX 信号量的核心接口与线程间同步原理,帮你吃透模型的底层支撑技术。

无论你是想理解并发协作的设计思想,还是需要落地实际项目中的数据供需协调场景,本文都能为你提供清晰的思路与可直接复用的代码:从理论原则到代码实现,从细节注意事项(如伪唤醒处理、信号量与锁的调用顺序)到场景适配选择,让你彻底掌握生产者消费者模型的设计精髓与实战技巧,为构建高效、稳定的并发系统打下坚实基础。

生产者消费者模型

概念:作用是协调 “数据生成方” 和 “数据处理方” 的工作节奏,实现高效、解耦的并发协作

这个模型的优点:

1.解耦 2.支持并发 3.支持忙闲不均

在这里插入图片描述

这个的话有点像供货商–超市–购买者那种关系

这个模型可以简记为321原则

3是3种关系

生产者跟生产者 --互斥 消费者跟消费者 --互斥 生产者跟消费者 --同步和互斥

2是2种角色

1是1个交易场所 --特定结构的内存空间(一般用的是阻塞队列)

生产者的作用:1.获取数据 2.生产数据到特定的内存空间

消费者的作用:1.消费数据 2.加工处理数据

虽然只有生产者和消费者不能同时进入临界区

–但是可以一个在临界区(获取或者消费数据),一个在非临界区(生产数据或者加工处理数据)

生产消费是高效的

模拟实现生产者消费者模型

这里模拟实现的是其中特定的内存空间是阻塞队列的情况

普通的阻塞队列的模拟实现:
template <class T>
class BlockQueue
{
    static const int defalutnum = 20;//这是阻塞队列的容量
public:
    BlockQueue(int maxcap = defalutnum):maxcap_(maxcap)
    {
        pthread_mutex_init(&mutex_, nullptr);
        pthread_cond_init(&c_cond_, nullptr);
        pthread_cond_init(&p_cond_, nullptr);
        // low_water_ = maxcap_/3;
        // high_water_ = (maxcap_*2)/3;
    }

    T pop()
    {
        pthread_mutex_lock(&mutex_);
        while(q_.size() == 0) 
        {
            pthread_cond_wait(&c_cond_, &mutex_); 
        }
  
        T out = q_.front();
        q_.pop();

        // if(q_.size()<low_water_) pthread_cond_signal(&p_cond_);
        pthread_cond_signal(&p_cond_);
        pthread_mutex_unlock(&mutex_);

        return out;
    }

    void push(const T &in)
    {
        pthread_mutex_lock(&mutex_);
        while(q_.size() == maxcap_){ // 这里的while防止了线程被伪唤醒
            pthread_cond_wait(&p_cond_, &mutex_); 
        }
        // 1. 队列没满 2.被唤醒 
        q_.push(in);
        // if(q_.size() > high_water_) pthread_cond_signal(&c_cond_);
        pthread_cond_signal(&c_cond_);
        pthread_mutex_unlock(&mutex_);
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&mutex_);
        pthread_cond_destroy(&c_cond_);
        pthread_cond_destroy(&p_cond_);
    }
private:
    std::queue<T> q_; // 共享资源, q被当做整体使用的,q只有一份,加锁。但是共享资源也可以被看做多份!
    int maxcap_;      // 极值
    pthread_mutex_t mutex_;
    pthread_cond_t c_cond_;
    pthread_cond_t p_cond_;
    // int low_water_;
    // int high_water_;
};

注意点:

1.要先加锁再判断临界资源是否满足条件

原因:因为判断临界资源是否就绪,也是在访问临界资源

2.这里的low_water_high_water_是模拟实现的低于某个值就生产,高于某个值就消费

–没有这俩的时候是生产一个就消费一个

3.pthread_cond_wait要在加锁之后去搞

4.这个模板参数T也可以是对象那种

5.pthread_cond_signal放在unlock里面或者外面都是可以的

为什么会有伪唤醒情况:

由于系统调度的意外产生的,没有收到signal也进行下去

有点像客户等外卖–还没收到外卖电话也去门口看一看

生产者和消费者的模拟实现:

分线程下来执行生产者的函数(函数里面就是获取数据和生产数据)

分线程下来执行消费者的函数(函数里面就是消费数据和处理数据)

–想要实现多生产者多消费者的话,就eg:for循环去产生

eg:
 void *Consumer(void *args)
{
    ThreadData *td = static_cast<ThreadData*>(args);
    RingQueue<Task> *rq = td->rq;
    std::string name = td->threadname;

    while (true)
    {
        // 1. 消费数据
        Task t;
        rq->Pop(&t);
   
        // 2. 处理数据
       .........
    }
    return nullptr;
}


for (int i = 0; i < 1; i++)
 {
     ThreadData *td = new ThreadData();
     td->rq = rq;
     pthread_create(i, nullptr, Consumer, td);
 }

基于环形队列的生产消费模型

在这里插入图片描述

理念:head表示生产者将要存的位置 tail表示消费者将要消费的位置

向后移时需要(a+1)%...

单生产者单消费者时需要满足3个条件(多生产多消费者时要用POSIX信号量)

1.指向同一个位置时只能有一个在访问 2.消费者不能超过生产者 3.生产者不能套圈

模拟实现:

const static int defaultcap = 20;

template<class T>
class RingQueue{
private:
    void P(sem_t &sem)
    {
        sem_wait(&sem);
    }
    void V(sem_t &sem)
    {
        sem_post(&sem);
    }
    void Lock(pthread_mutex_t &mutex)
    {
        pthread_mutex_lock(&mutex);
    }
    void Unlock(pthread_mutex_t &mutex)
    {
        pthread_mutex_unlock(&mutex);
    }
public:
    RingQueue(int cap = defaultcap)
    :ringqueue_(cap), cap_(cap), c_step_(0), p_step_(0)
    {
        sem_init(&cdata_sem_, 0, 0);
        sem_init(&pspace_sem_, 0, cap);

        pthread_mutex_init(&c_mutex_, nullptr);
        pthread_mutex_init(&p_mutex_, nullptr);
    }
    void Push(const T &in) // 生产
    {
        P(pspace_sem_);

        Lock(p_mutex_); 
        ringqueue_[p_step_] = in;
        // 位置后移,维持环形特性
        p_step_++;
        p_step_ %= cap_;
        Unlock(p_mutex_); 

        V(cdata_sem_);

    }
    void Pop(T *out)       // 消费
    {
        P(cdata_sem_);

        Lock(c_mutex_); 
        *out = ringqueue_[c_step_];
        // 位置后移,维持环形特性
        c_step_++;
        c_step_ %= cap_;
        Unlock(c_mutex_); 

        V(pspace_sem_);
    }
    ~RingQueue()
    {
        sem_destroy(&cdata_sem_);
        sem_destroy(&pspace_sem_);

        pthread_mutex_destroy(&c_mutex_);
        pthread_mutex_destroy(&p_mutex_);
    }
private:
    std::vector<T> ringqueue_;
    int cap_;

    int c_step_;       // 消费者下标
    int p_step_;       // 生产者下标

    sem_t cdata_sem_;  // 消费者关注的数据资源
    sem_t pspace_sem_; // 生产者关注的空间资源

    pthread_mutex_t c_mutex_;
    pthread_mutex_t p_mutex_;
};

如果想要多生产者多消费者的话,跟上面的那种产生方法一样

注意:要先PLock 原因:相当于先排队再买票进入,而不是买完票再排队进入

POSIX信号量

相较于SystemV信号量,POSIX信号量还可以用于线程间同步

在这里插入图片描述

作用:初始化信号量

返回值:成功返回0,失败返回-1

sem:需要初始化的那个的指针

pshared:0表示线程间共享,非零表示进程间共享

value:信号量初始值

注意:在初始化之前,要先定义信号量 eg:sem_t cdata_sem_

在这里插入图片描述

作用:销毁信号量

返回值:成功返回0,失败返回-1

在这里插入图片描述

作用:把sem1,减后的值>=0就继续执行;<0就阻塞,直到>0

成功返回0,失败返回-1

在这里插入图片描述

作用:对信号量 sem 的值加 1,若有线程因 sem_wait阻塞等待该信号量,则唤醒其中一个线程。

成功返回0,失败返回-1

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐