目录

条件变量

基于阻塞队列(BlockQueue)的生产消费模型

关于设置唤醒阈值

生产消费模型高效在哪?

POSIX信号量

接口

基于环形队列(CircularQueue)的生产消费模型

线程池

读者写者问题

读写锁原理


假设一个场景,学生当做消费者去超市买东西,供货商当做生产者将货提供给超市,这就是一个典型的生产者消费者模型

一般来说,消费者和供货商之间不会直接通信,就像学生要买一桶泡面不会直接去工厂里买一样,若消费者和生产者直接进行通信,那就只要当消费者和生产者都同时在工作时才会有交易。例如只有学生去工厂并且工厂也正好在工作时,才能产出一桶泡面,若工厂在工作但学生没有去工厂,或学生去了工厂但工厂休息,都不能产出一桶泡面。

此时有了超市(缓冲区)的出现,生产者可以一直生产,将生产出来是商品放去超市,消费者何时想买,只需要到超市直接买就行,不需要等待生产者生产出来,这样就完成了生产过程和消费过程的解耦,即生产消费的过程不需要一致

虽说缓冲区放到现在来说,可以想到很多应用到缓冲区而解耦的例子(例如输入输出),但也有没有缓冲区,没有解耦的情况:
当触发函数调用时,就可以理解为调用函数的线程生产了数据(生产者),目标函数消费了这些数据(消费者),而在执行函数时,调用函数的一方只能等函数结束,那么它们两者就是强耦合关系

由于学生想要买东西和工厂想要供货都需要超市这一资源,那么超市就是需要被两者同时看到的共享资源,那就依旧会有数据不一致问题(例如消费者和生产者同时操作一个资源时)。
因此消费者、交易场所、生产者之间就需要都需要被保护起来,以确保数据一致:

  • 生产者和生产者之间是互斥关系,如果两个生产者同时想要为一个商品加数量,就会造成数据不一致
  • 消费者和消费者之间也是互斥关系,若两个消费者同时想要为一个商品减数量,也会造成数据不一致问题
  • 生产者和消费者之间是互斥&&同步,互斥是为了生产者和消费者不同时操作一块资源,而同步是为了让它们之间在执行顺序上相互配合。例如当交易场所里没有商品时,消费者如果还是像以前一样去交易场所询问有没有商品,就会浪费资源和时间,应该当商品满时,不要让生产者来进货,而是让消费者来消费,当商品空时,不要让消费者来消费,而是让生产者来进货

生产消费模型特点:

  1. 生产线程和消费线程之间解耦
  2. 支持生产和消费的一段时间忙闲不均的问题
  3. 提高效率(有交易场所负责集中商品和兜售,就提高了生产者和消费者的效率)

生产消费模型怎么提高效率呢?

例如下面结构,main函数获取用户输入,并传给fun函数,fun函数通过传递进来的数据进行相关数据的打印

void fun(int x)
{
    //通过x打印相关数据
}

int main()
{
    int x;
    while(true)
    {
        cin >> x;
        fun(x);
    }
    return 0;
}

那么fun函数就很依赖用户的输入,只有用户每输入一次,fun函数才能打印一次,在fun函数打印时,maini函数也无法获取用户输入数据,它们是串行工作的。
如果将结构换成生产消费模型,main函数为生产者,负责将用户输入的数据放入缓冲区,fun函数负责通过缓冲区中的数据打印相关结果,它们两者之间没有了依赖关系,即使fun函数在打印,main函数也可以获取用户数据;即使main函数在获取用户数据,fun函数也可以打印,这就变成了并行工作,提高效率

条件变量

若生产者和消费者之间只有互斥关系,如果生产者的优先级比消费者高,就会出现只有生产者在操作共享资源的情况,即使交易场所满了,生产者退出后也会因为优先级高而再次操作共享资源。

为了实现生产者和消费者之间的同步关系,就需要用到条件变量

当操作共享资源的线程因为不满足于某个条件而不能操作,就可以让他进入队列等待,只有当可以操作时再从队列中唤醒该线程,这样就可以避免即使操作不了共享资源也在一直访问判断的情况

条件变量和互斥锁类似,也是一个数据类型,名为pthread_cond_t,对它进行初始化和销毁的函数接口就是 pthread_cond_init() pthread_cond_destory() 

若条件变量的全局或静态的,就只需要在定义时赋值为PTHREAD_COND_INITIALIZER

条件变量就是在临界区内加一个判断,查看生产者或消费者的条件是否满足,若不满足,就让当前线程从运行状态变为阻塞状态,并且被放入条件变量的等待队列中,在被唤醒前不会再运行

若有多个线程都不符合操作共享资源的条件,就都会进入该条件变量的等待队列,直到其他线程唤醒

 pthread_cond_wait() 会将调用它的线程放入条件变量的等待队列并且将状态设为阻塞
 pthread_cond_timedwait() 可以等待到指定时间戳自动唤醒

需要注意的是,它们的参数中都有一个pthread_mutex_t类型的互斥锁,因为调用pthread_cond_wait()的位置一定是在锁内,如果只是单纯的去阻塞,线程会带着锁一起走,这会导致后面上来的线程还是不能申请锁成功。因此在调用pthread_cond_wait()时会自动释放锁,并且在唤醒时自动加锁

 pthread_cond_signal() 可以唤醒在某个条件变量等待队列下的线程
 pthread_cond_broadcast() 可以一次性唤醒所有在该条件变量下等待的线程

下面简单写一个等待和唤醒线程的程序

#include <iostream>
#include <pthread.h>
#include <unistd.h>
using namespace std;

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond1 = PTHREAD_COND_INITIALIZER;
void* start(void* v1)
{
    while (true)
    {
        pthread_mutex_lock(&mutex);
        pthread_cond_wait(&cond1, &mutex); // 在cond1条件变量下等待
        cout << (char *)v1 << "被唤醒" << endl;
        pthread_mutex_unlock(&mutex);
    }

    return nullptr;
}

int main()
{
    pthread_t pid[5];
    //创建5个线程
    for(int i = 0; i < 5; i++)
    {
        char* buffer = new char[64];//动态开辟是为了防止内容被覆盖,使得5个线程的名字都叫线程5
        snprintf(buffer,sizeof(buffer),"线程%d",i+1);
        pthread_create(pid+i,nullptr,start,(void*)buffer);
    }
        
    //每隔1秒唤醒一个线程
    while(true)
    {
        pthread_cond_signal(&cond1);
        sleep(1);
    }
    //等待线程
    for(int i = 0; i < 5; i++)
    {
        pthread_join(pid[i],nullptr);
    }
    return 0;
}

若用pthread_cond_broadcast(),会一次性唤醒全部线程

while(true)
{
    printf("------唤醒一次------\n");
    pthread_cond_broadcast(&cond1);
    sleep(3);
}

基于阻塞队列(BlockQueue)的生产消费模型

生产者将生产出的数据push到队列中,消费者从队列top出数据进行消费

在阻塞队列中,若队列为满,就停止让生产者生产,转而进入等待,并唤醒消费者以促进消费;
队列为空,就停止让消费者消费,转而进入等待,并唤醒生产者以促进生产

在这个模型中,阻塞队列就是共享资源本身

 

下面就简单实现一下逻辑:

BlockQueue.hpp:负责管理阻塞队列

#pragma once

#include <pthread.h>
#include <queue>

static const int maxg = 5;//阻塞队列中可以存的数据个数
//设置为static是为了防止其他文件中有同名变量,让maxg只在本文件中生效

template <class T>
class BlockQueue
{
public:
    BlockQueue(const int& capacity = maxg)//构造函数
    :_capacity(capacity)
    {
        //初始化互斥锁和条件变量
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_pcond,nullptr);
        pthread_cond_init(&_ccond,nullptr);
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    }

    void pop(T* data)//取出队头并出队(消费)
    {
        pthread_mutex_lock(&_mutex);
        while(empty())//若队列为空,则不符合消费条件,进入等待队列
        {//用while的原因:如果线程醒后没有第一时间获取到锁,又进入了阻塞状态,等再次醒来后队列的数据可能会发生变化
            pthread_cond_wait(&_ccond,&_mutex);
        }
        *data = _DataQ.front();//取出队头
        _DataQ.pop();//出队
        //若队列数据空出了三分之一,就唤醒生产者
        if(SignalProducer())
            pthread_cond_signal(&_pcond);
        pthread_mutex_unlock(&_mutex);
    }

    void push(const T& data)//入队元素(生产)
    {
        pthread_mutex_lock(&_mutex);
        while(full())//若队列已满,则暂时无法再生产数据,进入等待队列
        {
            pthread_cond_wait(&_pcond,&_mutex);
        }
        _DataQ.push(data);
        if(SignalCosumer())//如果队列元素超过了三分之一,就唤醒消费者
            pthread_cond_signal(&_ccond);
        pthread_mutex_unlock(&_mutex);
    }

    bool empty()//是否为空
    {
        return _DataQ.empty();
    }

    bool full()//是否为满
    {
        return _DataQ.size() == _capacity;
    }

    bool SignalProducer()//判断是否该唤醒生产者
    {
        return _DataQ.size() < (_capacity - _capacity/3);
    }

    bool SignalCosumer()//判断是否该唤醒消费者
    {
        return _DataQ.size() > _capacity/3;
    }

private:
    std::queue<T> _DataQ;
    int _capacity;//队的容量
    pthread_mutex_t _mutex;//互斥锁
    pthread_cond_t _pcond;//producer生产者条件变量
    pthread_cond_t _ccond;//cosumer消费者条件变量
};

Task.hpp:负责管理阻塞队列中要放的数据类型Task

#pragma once

#include <functional>
#include <iostream>
const std::string oper = "+-*/%";

int Math(int x,int y,char op)//计算函数
{
    switch (op)
    {
    case '+':
        return x + y;
        break;
    case '-':
        return x - y;
        break;
    case '*':
        return x * y;
        break;
    case '/':
        if(y == 0)
        {
            std::cerr << "除零错误\n";
            return -1;
        }
        else
        {
            return x / y;
        }
        break;
    case '%':
        if(y == 0)
        {
            std::cerr << "除零错误\n";
            return -1;
        }
        else
        {
            return x % y;
        }
        break;
    
    default:
        break;
    }       
}

class Task
{
    using func_t = std::function<int(int,int,char)>;
public:
    Task(int x = 0,int y = 0,char op = 0,func_t fun = nullptr)
    :_x(x)
    ,_y(y)
    ,_op(op)
    ,_callback(fun)
    {}
    std::string operator()()//仿函数,返回结果描述
    {
        int result = _callback(_x,_y,_op);
        char buffer[64];
        snprintf(buffer,sizeof(buffer),"%d %c %d = %d",_x,_op,_y,result);
        return buffer;
    }
    std::string toop()//返回要处理的任务描述
    {
        char buffer[64];
        snprintf(buffer,sizeof(buffer),"%d %c %d = ?",_x,_op,_y);
        return buffer;
    }
private:
    int _x;
    int _y;
    char _op;
    func_t _callback;//回调函数
};

MainPC.cpp:主程序

#include "BlockQueue.hpp"
#include "Task.hpp"
#include <iostream>
#include <unistd.h>
#include <ctime>
using namespace std;

void* consumer(void* _bq)//消费者
{
    BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(_bq);
    while(true)
    {
        Task x;
        bq->pop(&x);
        printf("消费任务:%s\n",x().c_str());//打印结果
    }

}

void* producer(void* _bq)//生产者
{
    BlockQueue<Task>* bq = static_cast<BlockQueue<Task>*>(_bq);
    while(true)
    {
        int x = rand() % 100;
        int y = rand() % 100 % 3;
        int op = rand() % oper.size();//+-*/%随机一个
        Task t(x,y,oper[op],Math);
        bq->push(t);
        printf("生产任务:%s\n",t.toop().c_str());//打印要处理的任务
    }
}

int main()
{
    srand(time(nullptr)^getpid());
    BlockQueue<Task> bq;
    pthread_t pid1,pid2;
    pthread_create(&pid1,nullptr,consumer,(void*)&bq);
    pthread_create(&pid2,nullptr,producer,(void*)&bq);

    pthread_join(pid1,nullptr);
    pthread_join(pid2,nullptr);
    return 0;
}

这样就完成了一个简单的生产消费模型

当生产慢,消费快时,消费者就会等生产者生产任务

当生产慢,消费快时,生产者将队列填满后也会陷入等待,当消费者消费了一些时生产者才会被唤醒

如果现在要多加一个步骤,将计算结果记录在文件中:可以再创建一个线程,用于将结果记录在文件中,再创建一个阻塞队列,之前的消费者往队列中传数据记录结果线程再往队列中读数据,这就实现了一个Pipeline模式

那就需要计算任务的线程再将结果传入第二个阻塞队列,再由存储任务的线程拿出结果

在Task.hpp中新增关于存储任务的阻塞队列代码:

const std::string filepath = "./log.txt";

void save(const std::string& message)//存储函数
{
    FILE* fp = fopen(filepath.c_str(),"a");//将结果追加到文件中(若没有就创建)
    if(!fp)
    {
        std::cerr << "fopen错误\n";
        return;
    }
    fprintf(fp,"%s\n",message.c_str());
    fclose(fp);
}

class SavTask//存储任务类型
{
    using func_t = std::function<void(const std::string&)>;
public:
    SavTask()
    {}
    SavTask(const std::string& savstr,func_t func)
    :_savstr(savstr)
    ,_func(func)
    {}
    void operator()()//仿函数
    {
        _func(_savstr);
    }
private:
    std::string _savstr;
    func_t _func;
};

MainPC.cpp:

#include "BlockQueue.hpp"
#include "Task.hpp"
#include <iostream>
#include <unistd.h>
#include <ctime>
using namespace std;

struct BQS//用于将两个阻塞队列当作参数传给线程
{
    BlockQueue<CalTask> _cbq;
    BlockQueue<SavTask> _sbq;
};

void* consumer(void* _bqs)//消费者
{
    BlockQueue<CalTask>* cbq = &(static_cast<BQS*>(_bqs))->_cbq;
    BlockQueue<SavTask>* sbq = &(static_cast<BQS*>(_bqs))->_sbq;
    while(true)
    {
        CalTask x;
        cbq->pop(&x);
        std::string s = x();
        printf("消费任务:%s\n",s.c_str());//打印结果
        SavTask y(s,save);
        sbq->push(y);
        printf("推送保存任务完成\n");
    }

}

void* producer(void* _bqs)//生产者
{
    BlockQueue<CalTask>* cbq = &(static_cast<BQS*>(_bqs))->_cbq;
    while(true)
    {
        int x = rand() % 100;
        int y = rand() % 100 % 3;
        int op = rand() % oper.size();//+-*/%随机一个
        CalTask t(x,y,oper[op],Math);
        cbq->push(t);
        printf("生产任务:%s\n",t.toop().c_str());//打印要处理的任务
    }
}

void* saver(void* _bqs)//存储者
{
    BlockQueue<SavTask>* sbq = &(static_cast<BQS*>(_bqs))->_sbq;
    while(true)
    {
        SavTask s;
        sbq->pop(&s);
        s();
        printf("保存任务成功\n");
    }
}

int main()
{
    srand(time(nullptr)^getpid());
    BQS bqs;
    pthread_t c,p,s;
    pthread_create(&c,nullptr,consumer,(void*)&bqs);//consumer消费者
    pthread_create(&p,nullptr,producer,(void*)&bqs);//producer生产者
    pthread_create(&s,nullptr,saver,(void*)&bqs);//saver存储者

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);
    pthread_join(s,nullptr);
    return 0;
}

运行起来后,生产任务,计算任务,存储任务即为并行工作

关于设置唤醒阈值

在上面的生产消费模型中,为生产者线程和消费者线程的唤醒都设置了阈值

bool SignalProducer()//判断是否该唤醒生产者
{
    return _DataQ.size() < (_capacity - _capacity/3);
}

bool SignalCosumer()//判断是否该唤醒消费者
{
    return _DataQ.size() > _capacity/3;
}

void pop(T* data)//取出队头并出队(消费)
{
    //......
    /若队列数据空出了三分之一,就唤醒生产者
    if(SignalProducer())
        pthread_cond_signal(&_pcond);
    //......
}

void push(const T& data)//入队元素(生产)
{
    //......
    if(SignalCosumer())//如果队列元素超过了三分之一,就唤醒消费者
        pthread_cond_signal(&_ccond);
    //......
}

其实这样会有导致死锁的可能性:若生产者放了一个任务在队列后因为某种原因停止生产,此时消费者如果还在等待队列中,就会因为唤醒阈值不满足而继续等待,那么队列里明明有任务,消费者却永远不会被唤醒,造成了死锁。

要想设置唤醒阈值,可以将pthread_cond_wait()换成 pthread_cond_timedwait() ,当长时间没有被唤醒时,自己醒来看看, 若有数据就直接去消费,若没数据就返回false交给消费者线程处理

最终BlockQueue.hpp:

#pragma once

#include <pthread.h>
#include <queue>
#include <ctime>
#include <cerrno>

static const int maxg = 5;//阻塞队列中可以存的数据个数

template <class T>
class BlockQueue
{
public:
    BlockQueue(const int& capacity = maxg)//构造函数
    :_capacity(capacity)
    {
        //初始化互斥锁和条件变量
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_pcond,nullptr);
        pthread_cond_init(&_ccond,nullptr);
    }

    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    }

    bool pop(T* data)//取出队头并出队(消费)
    {

        pthread_mutex_lock(&_mutex);
        //设置定时等待的时间
        struct timespec ts;
        clock_gettime(CLOCK_REALTIME, &ts); // 获取当前绝对时间
        ts.tv_nsec += 100000000;            // 在当前基础上增加 100ms (10^8 纳秒)

        // 进位处理:如果纳秒超过了 1秒 (10^9 纳秒)
        if (ts.tv_nsec >= 1000000000)
        {
            ts.tv_sec += 1;
            ts.tv_nsec -= 1000000000;
        }
        while(empty())//若队列为空,则不符合消费条件,进入等待队列
        {//用while的原因:如果线程醒后没有第一时间获取到锁,又进入了阻塞状态,等再次醒来后队列的数据可能会发生变化
            int rc = pthread_cond_timedwait(&_ccond,&_mutex,&ts);
            if(rc == ETIMEDOUT)//timedwait因为超时自己醒来,返回值为ETIMEDOUT;timedewait是被唤醒,返回值为0
            {
                if(empty())//如果还为空,就返回false
                {
                    pthread_mutex_unlock(&_mutex);//退出前解锁
                    return false;
                }
                break;//如果有数据,强制让消费者去处理,避免死锁
            }
        }
        *data = _DataQ.front();//取出队头
        _DataQ.pop();//出队
        //若队列数据空出了三分之一,就唤醒生产者
        if(SignalProducer())
            pthread_cond_signal(&_pcond);
        pthread_mutex_unlock(&_mutex);
        return true;
    }

    void push(const T& data)//入队元素(生产)
    {
        pthread_mutex_lock(&_mutex);
        while(full())//若队列已满,则暂时无法再生产数据,进入等待队列
        {
            pthread_cond_wait(&_pcond,&_mutex);
        }
        _DataQ.push(data);
        if(SignalCosumer())//如果队列元素超过了三分之一,就唤醒消费者
            pthread_cond_signal(&_ccond);
        pthread_mutex_unlock(&_mutex);
    }

    bool empty()//是否为空
    {
        return _DataQ.empty();
    }

    bool full()//是否为满
    {
        return _DataQ.size() == _capacity;
    }

    bool SignalProducer()//判断是否该唤醒生产者
    {
        return _DataQ.size() < (_capacity - _capacity/3);
    }

    bool SignalCosumer()//判断是否该唤醒消费者
    {
        return _DataQ.size() > _capacity/3;
    }

private:
    std::queue<T> _DataQ;
    int _capacity;//队的容量
    pthread_mutex_t _mutex;//互斥锁
    pthread_cond_t _pcond;//producer生产者条件变量
    pthread_cond_t _ccond;//cosumer消费者条件变量
};

最终Task.hpp:

#pragma once

#include <functional>
#include <iostream>
#include <string>
const std::string oper = "+-*/%";
const std::string filepath = "./log.txt";

int Math(int x,int y,char op)//计算函数
{
    switch (op)
    {
    case '+':
        return x + y;
        break;
    case '-':
        return x - y;
        break;
    case '*':
        return x * y;
        break;
    case '/':
        if(y == 0)
        {
            std::cerr << "除零错误\n";
            return -1;
        }
        else
        {
            return x / y;
        }
        break;
    case '%':
        if(y == 0)
        {
            std::cerr << "除零错误\n";
            return -1;
        }
        else
        {
            return x % y;
        }
        break;
    
    default:
        break;
    }       
}

void save(const std::string& message)//存储函数
{
    FILE* fp = fopen(filepath.c_str(),"a");//将结果追加到文件中(若没有就创建)
    if(!fp)
    {
        std::cerr << "fopen错误\n";
        return;
    }
    fprintf(fp,"%s\n",message.c_str());
    fclose(fp);
}

class CalTask//计算任务类型
{
    using func_t = std::function<int(int,int,char)>;
public:
    CalTask(int x = 0,int y = 0,char op = 0,func_t fun = nullptr)
    :_x(x)
    ,_y(y)
    ,_op(op)
    ,_callback(fun)
    {}
    std::string operator()()//仿函数,返回结果描述
    {
        int result = _callback(_x,_y,_op);
        char buffer[64];
        snprintf(buffer,sizeof(buffer),"%d %c %d = %d",_x,_op,_y,result);
        return buffer;
    }
    std::string toop()//返回要处理的任务描述
    {
        char buffer[64];
        snprintf(buffer,sizeof(buffer),"%d %c %d = ?",_x,_op,_y);
        return buffer;
    }
private:
    int _x;
    int _y;
    char _op;
    func_t _callback;//回调函数
};

class SavTask//存储任务类型
{
    using func_t = std::function<void(const std::string&)>;
public:
    SavTask()
    {}
    SavTask(const std::string& savstr,func_t func)
    :_savstr(savstr)
    ,_func(func)
    {}
    void operator()()//仿函数
    {
        _func(_savstr);
    }
private:
    std::string _savstr;
    func_t _func;
};

最终MainPC.cpp:

#include "BlockQueue.hpp"
#include "Task.hpp"
#include <iostream>
#include <unistd.h>
#include <ctime>
using namespace std;

struct BQS//用于将两个阻塞队列当作参数传给线程
{
    BlockQueue<CalTask> _cbq;
    BlockQueue<SavTask> _sbq;
};

void* consumer(void* _bqs)//消费者
{
    BlockQueue<CalTask>* cbq = &(static_cast<BQS*>(_bqs))->_cbq;
    BlockQueue<SavTask>* sbq = &(static_cast<BQS*>(_bqs))->_sbq;
    while(true)
    {
        CalTask x;
        bool succees = cbq->pop(&x);
        if(!succees)//此时队列为空,直接进入下一次循环
        {
            continue;
        }
        std::string s = x();
        printf("消费任务:%s\n",s.c_str());//打印结果
        SavTask y(s,save);
        sbq->push(y);
        printf("推送保存任务完成\n");
    }

}

void* producer(void* _bqs)//生产者
{
    BlockQueue<CalTask>* cbq = &(static_cast<BQS*>(_bqs))->_cbq;
    while(true)
    {
        int x = rand() % 100;
        int y = rand() % 100 % 3;
        int op = rand() % oper.size();//+-*/%随机一个
        CalTask t(x,y,oper[op],Math);
        cbq->push(t);
        printf("生产任务:%s\n",t.toop().c_str());//打印要处理的任务
    }
}

void* saver(void* _bqs)//存储者
{
    BlockQueue<SavTask>* sbq = &(static_cast<BQS*>(_bqs))->_sbq;
    while(true)
    {
        SavTask s;
        int success = sbq->pop(&s);
        if(!success)//此时队列为空,直接进入下一次循环。后续可以根据需求判断队列为空时该怎么做
        {
            continue;
        }

        s();
        printf("保存任务成功\n");
    }
}

int main()
{
    srand(time(nullptr)^getpid());
    BQS bqs;
    pthread_t c,p,s;
    pthread_create(&c,nullptr,consumer,(void*)&bqs);//consumer消费者
    pthread_create(&p,nullptr,producer,(void*)&bqs);//producer生产者
    pthread_create(&s,nullptr,saver,(void*)&bqs);//saver存储者

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);
    pthread_join(s,nullptr);
    return 0;
}

这段程序也同样适配于多生产多消费的情况!

int main()
{
    srand(time(nullptr)^getpid());
    BQS bqs;
    pthread_t c[3],p[2],s;
    pthread_create(c + 0,nullptr,consumer,(void*)&bqs);//consumer消费者
    pthread_create(c + 1,nullptr,consumer,(void*)&bqs);//consumer消费者
    pthread_create(c + 2,nullptr,consumer,(void*)&bqs);//consumer消费者
    pthread_create(p + 0,nullptr,producer,(void*)&bqs);//producer生产者
    pthread_create(p + 1,nullptr,producer,(void*)&bqs);//producer生产者
    //pthread_create(&s,nullptr,saver,(void*)&bqs);//saver存储者

    pthread_join(c[0],nullptr);
    pthread_join(c[1],nullptr);
    pthread_join(c[2],nullptr);
    pthread_join(p[0],nullptr);
    pthread_join(p[1],nullptr);
    //pthread_join(s,nullptr);
    return 0;
}

生产消费模型高效在哪?

在生产消费模型中,不管有几个生产者几个消费者,同一时刻只有一个线程能访问交易场所(共享资源),那么创建多生产多消费的意义何在?即使只有一个生产者一个消费者,访问共享资源的效率不都一样吗?

实际上,生产消费模型的高效从来都不是访问共享资源时高效,生产者在将数据放到交易场所前需要先构建任务,消费者在将数据从交易场所拿出来后需要处理数据,生产消费模型高效的地方就是可以在生产之前和消费之后让线程并行运行

POSIX信号量

在上面的阻塞队列实现的生产消费模型中,想要查看共享资源的状态就需要进入共享资源内部(即阻塞队列),也就是要先加锁
公共资源是否满足生产或消费条件,在线程进入共享资源内部前无从得知,即使不符合条件,也必须要加锁,检测,解锁

这是因为在阻塞队列的生产消费模型中,是对共享资源整体加锁,使用时就是对资源整体使用。只要将公共资源分成多个部分,就可以让不同的线程同时访问公共资源的不同区域,这时就可以用到信号量。

在Linux中信号量标准有两套:System V和POSIX,下面介绍的是POSIX标准信号量

信号量本质是一把计数器,如果想将公共资源分成10部分,就可以将信号量的初值设为10,只要有线程想要访问公共资源,在访问之前必须先申请信号量,若申请成功,就相当于预定了某个部分的公共资源,即使该线程后来没有访问公共资源,这个部分也永远给线程留着。若申请失败,就阻塞等待

每次申请成功信号量都会让该值-1,代表可用的共享资源数量减少了一个。那么若共享资源全被使用,信号量的值就变成了0,再申请时就会阻塞等待,就实现了在不进入共享资源的前提下得知共享资源的使用情况!

既然任何线程在访问共享资源前都要先申请信号量,就需要让所有线程都能看到信号量,因此信号量本身也是共享资源。信号量的操作有两种:P操作和V操作

  • P操作:让信号量--的操作被称为P操作,意为申请资源;
  • V操作:让信号量++的操作被称为V操作,意为归还资源。

由于信号量本身也是共享资源,它的PV操作就需要保证原子性,因此信号量不是简单的整型,而是sem_t类型,PV操作又叫PV原语

接口

信号量的使用和互斥锁、条件变量类似:

初始化和释放:

int sem_init(sem_t *sem, int pshared, unsigned int value);//初始化
int sem_destroy(sem_t *sem);//释放

需要注意的是,sem_t不能像pthread_mutex_t和pthread_cond_t一样赋值初始化,即使是全局的或静态的,也必须用sem_init()。并且用信号量也需要pthread库!

PV操作:

int sem_wait(sem_t *sem);//P操作(阻塞式)
int sem_trywait(sem_t *sem);//P操作(尝试P操作)
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);//P操作(定时唤醒)

int sem_post(sem_t *sem)//V操作

基于环形队列(CircularQueue)的生产消费模型

关于环形队列的细节可以看这篇文章:

力扣题目分享:622.设计循环队列(C语言)_扣子 c语言题目推送-CSDN博客https://blog.csdn.net/suimingtao/article/details/143688457?https://blog.csdn.net/suimingtao/article/details/143688457?

在设计环形队列时,队列为满或为空都会指向同一个位置,就会导致无法判断哪种情况是空那种情况是满

解决方案是让队列为满的情况空出来一个位置(本篇不关心)

如果把环形队列中的两个指针当作生产者和消费者,生产者往里放数据,消费者往外拿数据。

若生产者慢,消费者快,就很容易为空,此时只能让生产者先走;若生产者快,消费者慢,就很容易为满,此时只能让消费者先走

对于其他情况,都可以确保生产者和消费者一定不在一个位置,那么生产者在往里放数据时,并不影响消费者往别的位置拿数据,它们就可以并发执行!只有当为空或为满时有互斥与同步问题

要解决这里的互斥与同步问题就需要信号量

  • 对于生产者而言,看中的是队列中的剩余空间,就可以为空间资源定义一个信号量
  • 对于消费者而言,看中的是队列中的数据,就可以为数据资源定义一个信号量

生产者信号量producer_sem,因为一开始队列肯定为空,那么它的初始值就是队列的空间个数,当申请成功就执行对应的生产工作,生产完后由于数据在队列中,生产者还在用这份资源,但数据也因此多了一个,因此归还的是消费者的信号量

P(producer_sem);//若申请失败,则阻塞
//生产活动
V(consumer_sem);

消费者信号量consumer_sem,队列为空所以数据为空,那么它的初始值就是0,当申请成功时证明在这之前有生产者为消费者的信号量执行了V操作。消费完数据后,由于队列又空了一个位置,所以归还的是生产者的信号量

P(consumer_sem);//若申请失败,则阻塞
//消费活动
V(producer_sem);

队列为满,就是生产者的信号量为0,无法再生产,只有消费者的信号量是队列的空间个数,符合只有消费者能走的要求;
队列为空,就是消费者的信号量为0,无法再消费,只有生产者的信号量是队列的空间个数,符合只有生产者能走的条件。
这样就保证了在空/满时的同步与互斥的访问

下面先实现单生产单消费的生产消费模型:

CircularQueue.hpp:

#pragma once
#include <iostream>
#include <vector>
#include <semaphore.h>
#include <cassert>

static const int def_cap = 5;//默认容量

template<class T>
class CircularQueue
{
private:
    void P(sem_t& sem)//P操作封装
    {
        int re = sem_wait(&sem);
        assert(re == 0);
        (void)re;
    }
    void V(sem_t& sem)//V操作封装
    {
        int re = sem_post(&sem);
        assert(re == 0);
        (void)re;
    }
public:
    CircularQueue(int capacity = def_cap)
    :_capacity(capacity)
    ,_queue(_capacity)
    ,_producerStep(0)
    ,_consumerStep(0)
    {
        int re = sem_init(&_space_sem,0,_capacity);//一开始队列为空,所以空间资源为满
        assert(re == 0);
        re = sem_init(&_data_sem,0,0);//一开始队列为空,所以数据资源为空
        assert(re == 0);

    }

    ~CircularQueue()
    {
        sem_destroy(&_space_sem);
        sem_destroy(&_data_sem);
    }

    void push(const T& in)
    {
        P(_space_sem);//申请空间
        _queue[_producerStep++] = in;//生产数据
        _producerStep %= _capacity;//因为是环形队列,当走到_capacity下标时,要回到0下标
        V(_data_sem);//为数据+1
    }

    void pop(T* out)
    {
        P(_data_sem);//申请数据
        *out = _queue[_consumerStep++];//消费数据
        _consumerStep %= _capacity;//因为是环形队列,当走到_capacity下标时,要回到0下标
        V(_space_sem);//为空间+1
    }

private:
    int _capacity;//环形队列的容量
    std::vector<T> _queue;//数组充当环形队列
    sem_t _space_sem;//描述空间资源的信号量(生产者信号量)
    sem_t _data_sem;//描述数据资源的信号量(消费者信号量)
    int _producerStep;//生产者步调(下标)
    int _consumerStep;//消费者步调(下标)
};

Task.hpp:

#pragma once

#include <functional>
#include <iostream>
#include <string>
static const std::string oper = "+-*/%";//static防止重名

int Math(int x,int y,char op)//计算函数
{
    switch (op)
    {
    case '+':
        return x + y;
        break;
    case '-':
        return x - y;
        break;
    case '*':
        return x * y;
        break;
    case '/':
        if(y == 0)
        {
            std::cerr << "除零错误\n";
            return -1;
        }
        else
        {
            return x / y;
        }
        break;
    case '%':
        if(y == 0)
        {
            std::cerr << "除零错误\n";
            return -1;
        }
        else
        {
            return x % y;
        }
        break;
    
    default:
        break;
    }       
}

class Task//计算任务类型
{
    using func_t = std::function<int(int,int,char)>;
public:
    Task(int x = 0,int y = 0,char op = 0,func_t fun = nullptr)
    :_x(x)
    ,_y(y)
    ,_op(op)
    ,_callback(fun)
    {}
    std::string operator()()//仿函数,返回结果描述
    {
        if(!_callback)//防止崩溃
            return "无效任务";
        int result = _callback(_x,_y,_op);
        char buffer[64];
        snprintf(buffer,sizeof(buffer),"%d %c %d = %d",_x,_op,_y,result);
        return buffer;
    }
    std::string toop()//返回要处理的任务描述
    {
        char buffer[64];
        snprintf(buffer,sizeof(buffer),"%d %c %d = ?",_x,_op,_y);
        return buffer;
    }
private:
    int _x;
    int _y;
    char _op;
    func_t _callback;//回调函数
};

 MainCP.cpp:

#include <iostream>
#include <pthread.h>
#include "CircularQueue.hpp"
#include "Task.hpp"
#include <ctime>
#include <unistd.h>
using namespace std;

void* producer(void* cq)//生产者线程
{
    CircularQueue<Task>* cirque = static_cast<CircularQueue<Task>*>(cq);
    while(true)
    {
        //构建任务
        int x = rand()%10;
        int y = rand()%20;
        Task t(x,y,oper[rand()%oper.size()],Math);
        //生产任务
        cirque->push(t);//生产数据
        printf("生产任务:%s\n",t.toop().c_str());
        sleep(1);//模拟真实业务场景
    }
}

void* consumer(void* cq)//消费者线程
{
    CircularQueue<Task>* cirque = static_cast<CircularQueue<Task>*>(cq);
    while(true)
    {
        Task t;
        cirque->pop(&t);//消费数据
        printf("消费任务:%s\n",t().c_str());
    }
}

int main()
{
    srand(time(nullptr) ^ 114514);
    CircularQueue<Task> cq;
    pthread_t c,p;
    pthread_create(&c,nullptr,consumer,(void*)&cq);
    pthread_create(&p,nullptr,producer,(void*)&cq);

    pthread_join(c,nullptr);
    pthread_join(p,nullptr);
    return 0;
}

上述程序因为是单生产单消费,因此只需要处理生产者和消费者之间的互斥&&同步关系,要想适用于多生产多消费,还需要处理生产者和生产者之间的互斥关系、消费者和消费者之间的互斥关系。

同一时刻最多只有一个生产者和一个消费者在访问临界资源(环形队列)(在阻塞队列中是同一时刻最多只有一个线程(不管是生产者还是消费者)在访问临界资源)。为了维护这个关系,就要给生产者之间加一把锁,使得只有一个生产者线程能够进入临界资源给消费者之间加一把锁,使得只有一个消费者线程能够进入临界资源

多生产多消费的生产消费模型:(改后的CircularQueue类)

template<class T>
class CircularQueue
{
private:
    void P(sem_t& sem)//P操作封装
    {
        int re = sem_wait(&sem);
        assert(re == 0);
        (void)re;
    }
    void V(sem_t& sem)//V操作封装
    {
        int re = sem_post(&sem);
        assert(re == 0);
        (void)re;
    }
public:
    CircularQueue(int capacity = def_cap)
    :_capacity(capacity)
    ,_queue(_capacity)
    ,_producerStep(0)
    ,_consumerStep(0)
    {
        int re = sem_init(&_space_sem,0,_capacity);//一开始队列为空,所以空间资源为满
        assert(re == 0);
        re = sem_init(&_data_sem,0,0);//一开始队列为空,所以数据资源为空
        assert(re == 0);

        pthread_mutex_init(&_pmutex,nullptr);
        pthread_mutex_init(&_cmutex,nullptr);
    }

    ~CircularQueue()//析构
    {
        sem_destroy(&_space_sem);
        sem_destroy(&_data_sem);
        pthread_mutex_destroy(&_pmutex);
        pthread_mutex_destroy(&_cmutex);
    }

    void push(const T& in)
    {
        P(_space_sem);//申请空间
        //PV操作都是原子性的,因此无需被锁保护
        pthread_mutex_lock(&_pmutex);//生产者锁
        _queue[_producerStep++] = in;//生产数据
        _producerStep %= _capacity;//因为是环形队列,当走到_capacity下标时,要回到0下标
        pthread_mutex_unlock(&_pmutex);
        V(_data_sem);//为数据+1
    }

    void pop(T* out)
    {
        P(_data_sem);//申请数据
        //PV操作都是原子性的,因此无需被锁保护
        pthread_mutex_lock(&_cmutex);//消费者锁
        *out = _queue[_consumerStep++];//消费数据
        _consumerStep %= _capacity;//因为是环形队列,当走到_capacity下标时,要回到0下标
        pthread_mutex_unlock(&_cmutex);
        V(_space_sem);//为空间+1
    }

private:
    int _capacity;//环形队列的容量
    std::vector<T> _queue;//数组充当环形队列
    sem_t _space_sem;//描述空间资源的信号量(生产者信号量)
    sem_t _data_sem;//描述数据资源的信号量(消费者信号量)
    int _producerStep;//生产者步调(下标)
    int _consumerStep;//消费者步调(下标)
    pthread_mutex_t _pmutex;//生产者互斥锁
    pthread_mutex_t _cmutex;//消费者互斥锁
};

线程池

在线程池中,可以维护着多个线程,当有任务分配过来时,线程池就可以调用线程来执行任务。

线程池的应用场景:

1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
2. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误

下面简单实现一下:

//ThreadPool.hpp:
#pragma once
#include <iostream>
#include "Thread.hpp"
#include "Mutex.hpp"
#include "Task.hpp"
#include <queue>
#include <vector>
#include <unistd.h>
#include <memory>
using namespace ThreadClass;

static const int deThNum = 5;//默认线程池的线程个数

template<class T>
class ThreadPool
{
private:
    static void *handlerTask(void *args)
    {
        ThreadData *td = static_cast<ThreadData *>(args);
        std::string* name = static_cast<std::string*>(td->_name);
        ThreadPool<T>* threadpool = static_cast<ThreadPool<T>*>(td->_args);//handlerTask是静态成员函数,无法访问类内private成员

        //获取并处理任务
        while(true)
        {
            T t;
            if(!threadpool->pop(&t))//若线程池被关闭,就结束线程
                break;
            printf("%s 获取任务-->%s,并处理任务-->%s\n",name->c_str(),t.toop().c_str(),t().c_str());
            //std::cout << name << " 获取任务-->" << t.toop() << ",并处理任务-->" << t() << std::endl;
        }
        return nullptr;
    }

public:
    ThreadPool(int thnum = deThNum)
    :_thnum(thnum)
    ,_isRunning(true)//将线程池设为开启
    {
        //初始化互斥锁和条件变量
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_cond,nullptr);
        //构建线程并放入_threads
        for(int i = 0; i < _thnum; i++)
        {
            std::unique_ptr<Thread> t(new Thread);
            _threads.push_back(std::move(t));
        }
    }

    ~ThreadPool()
    {
        {//锁的生命周期
            Mutex m(&_mutex);
            _isRunning = false; // 将线程池设为关闭
        }
        pthread_cond_broadcast(&_cond);//唤醒所有线程
        for(auto& e : _threads)//回收所有线程
        {
            e->join();
        }
        printf("所有线程全部关闭!\n");

        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }

    void runc()//将线程池内线程跑起来
    {
        for(auto& e : _threads)
            e->run(handlerTask,this);
    }

    void push(T& in)//往_queue放数据
    {
        Mutex m(&_mutex);
        if(!_isRunning) return;//若线程池被关闭,则拒绝新任务
        _queue.push(in);
        pthread_cond_signal(&_cond);
    }

    bool pop(T* out)//从_queue中拿任务
    {
        Mutex m(&_mutex);
        while(_queue.empty() && _isRunning)
            pthread_cond_wait(&_cond,&_mutex);
        
        if(_queue.empty() && !_isRunning)//如果队列为空且线程池被关闭,让线程结束
            return false;
            
        *out = std::move(_queue.front());
        _queue.pop();
        return true;
    }

private:
    int _thnum;//线程池内线程个数
    std::vector<std::unique_ptr<Thread>> _threads;//用数组管理线程 | 智能指针更安全
    std::queue<T> _queue;//用队列管理任务
    pthread_mutex_t _mutex;//互斥访问_queue
    pthread_cond_t _cond;//_queue为空时进入条件变量
    bool _isRunning;//判断线程池是否关闭
};

//Thread.hpp:
#pragma once
#include <iostream>
#include <pthread.h>
#include <string>
#include <functional>
#include <cstring>

namespace ThreadClass
{

    class ThreadData//将名字和参数一起传给线程
    {
    public:
        ThreadData(void *args, std::string *name)
            : _args(args), _name(name)
        {
        }
        void *_args;
        std::string *_name;
    };

    using func_t = std::function<void *(void *)>; // 将void* (void*)类型重命名为func_t
    class Thread // 线程类
    {
    public:
        Thread() // 构造函数
        {
            _tid = 0;
            char buffer[64];
            snprintf(buffer, sizeof(buffer), "thread--%d", _pnum++);
            _name = buffer;
        }

        void run(func_t start, void *args = nullptr)// 创建线程,因为在构造函数中就会调用,所以不需要被用户看到
        {
            _callback = start;
            _args = args;
            int n = pthread_create(&_tid, nullptr, _start_routine, this);
            if (n != 0) // 创建线程失败
            {
                std::cout << "创建线程失败!错误码:" << n << "错误信息:" << strerror(n) << std::endl;
                return;
            }
        }

        void *join() // 等待线程
        {
            void *ret;
            int n = pthread_join(_tid, &ret);
            if (n != 0) // 等待失败
            {
                std::cout << "等待失败,错误码:" << n << ",错误信息:" << strerror(n);
                return nullptr;
            }
            else          // 等待成功
                _tid = 0; // 让tid为零,这样当析构时就不会再次等待
            return ret;
        }

        ~Thread()
        {
            // 没有什么需要释放的资源
        }

    private:
        static void *_start_routine(void *args)
        {
            Thread *_this = static_cast<Thread *>(args); // 安全的类型转换
            ThreadData t(_this->_args,&(_this->_name));
            return _this->_callback(&t);
        }
        std::string _name; // 线程名称
        pthread_t _tid;    // 线程tid
        func_t _callback;  // 线程要执行的函数
        void *_args;       // 传给_start的参数
        static int _pnum;  // 用于线程编号
    };
    int Thread::_pnum = 1;//静态成员变量必须在类外初始化
}//end ThreadClass

//Task.hpp:
#pragma once

#include <functional>
#include <iostream>
#include <string>
const std::string oper = "+-*/%";
const std::string filepath = "./log.txt";

int Math(int x,int y,char op)//计算函数
{
    switch (op)
    {
    case '+':
        return x + y;
        break;
    case '-':
        return x - y;
        break;
    case '*':
        return x * y;
        break;
    case '/':
        if(y == 0)
        {
            std::cerr << "除零错误\n";
            return -1;
        }
        else
        {
            return x / y;
        }
        break;
    case '%':
        if(y == 0)
        {
            std::cerr << "除零错误\n";
            return -1;
        }
        else
        {
            return x % y;
        }
        break;
    
    default:
        return 0;
    }       
}

class Task//计算任务类型
{
    using func_t = std::function<int(int,int,char)>;
public:
    Task(int x = 0,int y = 0,char op = 0,func_t fun = nullptr)
    :_x(x)
    ,_y(y)
    ,_op(op)
    ,_callback(fun)
    {}
    std::string operator()()//仿函数,返回结果描述
    {
        int result = _callback(_x,_y,_op);
        char buffer[64];
        snprintf(buffer,sizeof(buffer),"%d %c %d = %d",_x,_op,_y,result);
        return buffer;
    }
    std::string toop()//返回要处理的任务描述
    {
        char buffer[64];
        snprintf(buffer,sizeof(buffer),"%d %c %d = ?",_x,_op,_y);
        return buffer;
    }
private:
    int _x;
    int _y;
    char _op;
    func_t _callback;//回调函数
};

//Mutex.hpp:
#pragma once
#include <iostream>
#include <pthread.h>

class Mutex //RAII风格
{
public:
    Mutex(pthread_mutex_t* mutex)
    :_mutex(mutex)
    {
        pthread_mutex_lock(_mutex);
    }
    ~Mutex()
    {
        if(_mutex)
            pthread_mutex_unlock(_mutex);
    }
private:
    pthread_mutex_t* _mutex;

};

//Main.cpp
#include <iostream>
#include "Thread.hpp"
#include "ThreadPool.hpp"
#include "Task.hpp"
#include <unistd.h>
#include <ctime>
#include <memory>
using namespace std;
using namespace ThreadClass;

int main()
{
    srand(time(nullptr));
    ThreadPool<Task> threadpool;
    threadpool.runc();
    int cnt = 1000;
    while(cnt--)
    {
        int x = rand()%100;
        int y = rand()%101;
        char o = oper[rand()%oper.size()];
        Task t(x,y,o,Math);
        threadpool.push(t);
        sleep(1);
    }
    return 0;
}

对于线程池而言,一个进程中一般只会存在一个,也就是单例模式,下面将线程池实现为懒汉的单例模式:

//ThreadPool.hpp:
#pragma once
#include <iostream>
#include "Thread.hpp"
#include "Mutex.hpp"
#include "Task.hpp"
#include <queue>
#include <vector>
#include <unistd.h>
#include <memory>
using namespace ThreadClass;

static const int deThNum = 5; // 默认线程池的线程个数

template <class T>
class ThreadPool
{
private:
    //将构造和析构私有化,防止外部使用
    ThreadPool(int thnum = deThNum)
        : _thnum(thnum), _isRunning(true) // 将线程池设为开启
    {
        // 初始化互斥锁和条件变量
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
        // 构建线程并放入_threads
        for (int i = 0; i < _thnum; i++)
        {
            std::unique_ptr<Thread> t(new Thread);
            _threads.push_back(std::move(t));
        }
    }

    ThreadPool(const ThreadPool<T>& t) = delete;
    ThreadPool<T>& operator=(const ThreadPool<T>& t) = delete;

    ~ThreadPool()
    {
        { // 锁的生命周期
            Mutex m(&_mutex);
            _isRunning = false; // 将线程池设为关闭
        }
        pthread_cond_broadcast(&_cond); // 唤醒所有线程
        for (auto &e : _threads)        // 回收所有线程
        {
            e->join();
        }
        printf("所有线程全部关闭!\n");

        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }
    static void *handlerTask(void *args)
    {
        ThreadData *td = static_cast<ThreadData *>(args);
        std::string *name = static_cast<std::string *>(td->_name);
        ThreadPool<T> *threadpool = static_cast<ThreadPool<T> *>(td->_args); // handlerTask是静态成员函数,无法访问类内private成员

        // 获取并处理任务
        while (true)
        {
            T t;
            if (!threadpool->pop(&t)) // 若线程池被关闭,就结束线程
                break;
            printf("%s 获取任务-->%s,并处理任务-->%s\n", name->c_str(), t.toop().c_str(), t().c_str());
            // std::cout << name << " 获取任务-->" << t.toop() << ",并处理任务-->" << t() << std::endl;
        }
        return nullptr;
    }

public:
    static ThreadPool<T>& getInstance()//获取单例
    {
        static ThreadPool<T> singleton;//静态成员变量,不会重复创建,且C++11起是线程安全的
        return singleton;
    }

    void runc() // 将线程池内线程跑起来
    {
        for (auto &e : _threads)
            e->run(handlerTask, this);
    }

    void push(T &in) // 往_queue放数据
    {
        Mutex m(&_mutex);
        if (!_isRunning)
            return; // 若线程池被关闭,则拒绝新任务
        _queue.push(in);
        pthread_cond_signal(&_cond);
    }

    bool pop(T *out) // 从_queue中拿任务
    {
        Mutex m(&_mutex);
        while (_queue.empty() && _isRunning)
            pthread_cond_wait(&_cond, &_mutex);

        if (_queue.empty() && !_isRunning) // 如果队列为空且线程池被关闭,让线程结束
            return false;

        *out = std::move(_queue.front());
        _queue.pop();
        return true;
    }

private:
    int _thnum;                                    // 线程池内线程个数
    std::vector<std::unique_ptr<Thread>> _threads; // 用数组管理线程 | 智能指针更安全
    std::queue<T> _queue;                          // 用队列管理任务
    pthread_mutex_t _mutex;                        // 互斥访问_queue
    pthread_cond_t _cond;                          //_queue为空时进入条件变量
    bool _isRunning;                               // 判断线程池是否关闭
};

读者写者问题

在上面的生产消费模型中,生产者负责生产数据,消费者负责消费数据(拿走数据),若多个同时进入共享资源,可能会导致数据不一致问题。

写者就可以理解为生产者,负责写入数据,而读者只会读取数据,不会消费(拿走)数据,因此即使多个读者一起读取共享资源,也不会有问题。但若读者和写者一起进入共享资源,依旧有可能导致数据不一致问题。读者写者模型和生产消费模型最本质的区别就是消费者会拿走数据,而读者不会。

为了确保数据一致,也需要将读者、写者、共享资源保护起来:

  • 写者和写者之间是互斥关系
  • 读者和写者之间是互斥关系
  • 读者和读者之间没有关系!!

对于读者写者模型,适用的场景就是当一次发布后,很长时间不做修改,大部分都是被读取的,比如文章,数据库等等

支撑这一问题的,就是读写锁,在Linux中读写锁的类型是 pthread_rwlock_t ,接口如下:

int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,const pthread_rwlockattr_t *restrict attr);//初始化
pthread_rwlock_t rwlock = PTHREAD_RWLOCK_INITIALIZER;//若锁是全局/静态的,可以静态初始化

int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);//释放


int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);//读加锁
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);//尝试读加锁

int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);//写加锁
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);//尝试写加锁

int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);//解锁

可以发现,当读者加锁时,可以用读加锁,当写者加锁时,可以用写加锁

对于读写锁,对应的状态如下:

读写锁的行为
当前锁状态 读锁请求 写锁请求
无锁 可以 可以
读锁 可以 阻塞
写锁 阻塞 阻塞

也就是说,在任何一个时刻,只允许一个写者写入(此时读者阻塞),或允许多个读者读取(此时写者阻塞)

读写锁原理

读写锁是怎么保证一把锁可以给读者写者分别加锁的呢?下面简单实现一下伪代码逻辑:

class rwlock
{
public:
    rwlock()
    {
        pthread_mutex_init(&reader,nullptr);
        pthread_mutex_init(&writer,nullptr);
    }
public:
    int readerCount = 0;//同时进入共享资源的读者数量
    pthread_mutex_t reader;//读者锁
    pthread_mutex_t writer;//写者锁
    
};

int main()
{
    rwlock RD;
    //读锁rdlock
    pthread_mutex_lock(&RD.reader);
    if(++RD.readerCount == 1)//第一个进来的读者记得关门(给写者上锁)
    {
        pthread_mutex_lock(&RD.writer);
    }
    pthread_mutex_unlock(&RD.reader);

    //读取数据

    pthread_mutex_lock(&RD.reader);
    if(--RD.readerCount == 0)//最后一个走的读者记得开门(给写者解锁)
    {
        pthread_mutex_unlock(&RD.writer);
    }
    pthread_mutex_unlock(&RD.reader);
    //读锁rdlock end...

    //写锁wrlock
    pthread_mutex_lock(&RD.writer);
    //写入数据
    pthread_mutex_unlock(&RD.writer);
    //写锁wrlock end...
    return 0;
}

当有读者在共享资源内时,写者锁始终处于加锁状态,即使写者要写入,也会一直阻塞;
当写者在共享资源内时,读者也会因为申请写者锁失败而阻塞

而如果有渊源不断的读者进入共享资源,写者会一直阻塞,造成写者的饥饿问题,这种模式就叫读者优先,也是读写锁默认的模式。

要想解决写者的饥饿问题,可以用写者优先模式:当写者要进入共享资源时,虽然不能把已经在共享资源里的读者拽出来,但可以挡住门口防止有新的读者进去,这里不做实现

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐