1. 线程互斥

1.1 进程线程间的互斥相关背景概念

  • 临界资源:多线程执⾏流共享的资源就叫做临界资源
  • 临界区:每个线程内部,访问临界资源的代码,就叫做临界区
  • 互斥:任何时刻,互斥保证有且只有⼀个执⾏流进⼊临界区,访问临界资源,通常对临界资源起保护作⽤
  • 原⼦性:不会被任何调度机制打断的操作,该操作只有两态,要么完成,要么未完成

1.2 互斥量mutex

  • ⼤部分情况,线程使⽤的数据都是局部变量,变量的地址空间在线程栈空间内,这种情况,变量归属单个线程,其他线程⽆法获得这种变量。
  • 但有时候,很多变量都需要在线程间共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之间的交互。
  • 多个线程并发的操作共享变量,会带来⼀些问题。
// 操作共享变量会有问题的售票系统代码
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>

int ticket = 100;

void *route(void *arg)
{
    char *id = (char *)arg;
    while (1)
    {
        if (ticket > 0)
        {
            usleep(1000);
            printf("%s sells ticket:%d\n", id, ticket);
            ticket--;
        }
        else
        {
            break;
        }
    }
}

int main(void)
{
    pthread_t t1, t2, t3, t4;
    pthread_create(&t1, NULL, route, (void*)"thread 1");
    pthread_create(&t2, NULL, route, (void*)"thread 2");
    pthread_create(&t3, NULL, route, (void*)"thread 3");
    pthread_create(&t4, NULL, route, (void*)"thread 4");
    pthread_join(t1, NULL);
    pthread_join(t2, NULL);
    pthread_join(t3, NULL);
    pthread_join(t4, NULL);
}

⼀次执⾏结果:
		thread 4 sells ticket:100
		...
		thread 4 sells ticket:1
		thread 2 sells ticket:0
		thread 1 sells ticket:-1
		thread 3 sells ticket:-2

为什么可能⽆法获得正确结果?

  • if 语句判断条件为真以后,代码可以并发的切换到其他线程
  • usleep 这个模拟漫⻓业务的过程,在这个漫⻓的业务过程中,可能有很多个线程会进⼊该代码段
  • ticket-- 操作本⾝就不是⼀个原⼦操作
取出ticket--部分的汇编代码
objdump -d a.out > test.objdump
152   40064b:   8b 05 e3 04 20 00	mov	0x2004e3(%rip),%eax	#600b34 <ticket>
153   400651:   83 e8 01			sub	$0x1,%eax
154   400654:   89 05 da 04 20 00	mov	%eax,0x2004da(%rip)	#600b34 <ticket>

--操作并不是原⼦操作,⽽是对应三条汇编指令:

  • load:将共享变量ticket从内存加载到寄存器中
  • update:更新寄存器⾥⾯的值,执⾏-1操作
  • store:将新值,从寄存器写回共享变量ticket的内存地址

要解决以上问题,需要做到三点:

  • 代码必须要有互斥⾏为:当线程进⼊临界区执⾏代码时,不允许其他线程进⼊该临界区。
  • 如果多个线程同时要求执⾏临界区的代码,并且临界区没有线程在执⾏,那么只能允许⼀个线程进⼊该临界区。
  • 如果线程不在临界区中执⾏,那么该线程不能阻⽌其他线程进⼊临界区。

要做到这三点,本质上就是需要⼀把锁。Linux上提供的这把锁叫互斥量。

在这里插入图片描述

互斥量的接⼝

初始化互斥量

初始化互斥量有两种⽅法:

  • ⽅法1,静态分配:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
  • ⽅法2,动态分配:
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
	参数:
		mutex:要初始化的互斥量
		attr:NULL

销毁互斥量

销毁互斥量需要注意:

  • 使⽤PTHREAD_ MUTEX_ INITIALIZER初始化的互斥量不需要销毁
  • 不要销毁⼀个已经加锁的互斥量
  • 已经销毁的互斥量,要确保后⾯不会有线程再尝试加锁
int pthread_mutex_destroy(pthread_mutex_t *mutex);

互斥量加锁和解锁

int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
返回值:成功返回0,失败返回错误号

调⽤pthread_mutex_lock时,可能会遇到以下情况:

  • 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功
  • 发起函数调⽤时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么pthread_mutex_lock调⽤会陷⼊阻塞(执⾏流被挂起),等待互斥量解锁。

改进上⾯的售票系统:

// 操作共享变量会有问题的售票系统代码
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>

int ticket = 100;
pthread_mutex_t mutex;

void *route(void *arg)
{
    char *id = (char *)arg;
    while (1)
    {
        pthread_mutex_lock(&mutex);
        if (ticket > 0)
        {
            usleep(1000);
            printf("%s sells ticket:%d\n", id, ticket);
            ticket--;
            pthread_mutex_unlock(&mutex);
        }
        else
        {
            pthread_mutex_unlock(&mutex);
            break;
        }
    }
}

int main(void)
{
    pthread_t t1, t2, t3, t4;
    pthread_mutex_init(&mutex, NULL);
    pthread_create(&t1, NULL, route, "thread 1");
    pthread_create(&t2, NULL, route, "thread 2");
    pthread_create(&t3, NULL, route, "thread 3");
    pthread_create(&t4, NULL, route, "thread 4");
    pthread_join(t1, NULL);
    pthread_join(t2, NULL);
    pthread_join(t3, NULL);
    pthread_join(t4, NULL);
    pthread_mutex_destroy(&mutex);
}

1.3 互斥量实现原理探究

  • 经过上⾯的例⼦,⼤家已经意识到单纯的i++或者++i都不是原⼦的,有可能会有数据不⼀致问题
  • 为了实现互斥锁操作,⼤多数体系结构都提供了swap或exchange指令,该指令的作⽤是把寄存器和内存单元的数据相交换,由于只有⼀条指令,保证了原⼦性,即使是多处理器平台,访问内存的总线周期也有先后,⼀个处理器上的交换指令执⾏时另⼀个处理器的交换指令只能等待总线周期。现在我们把lock和unlock的伪代码改⼀下

在这里插入图片描述

1.4 互斥量的封装

Mutex.hpp

#pragma once
#include <iostream>
#include <pthread.h>

class Mutex
{
public:
    Mutex()
    {
        pthread_mutex_init(&_lock, nullptr);
    }
    void Lock()
    {
        pthread_mutex_lock(&_lock);
    }
    void Unlock()
    {
        pthread_mutex_unlock(&_lock);
    }
    ~Mutex()
    {
        pthread_mutex_destroy(&_lock);
    }
private:
    pthread_mutex_t _lock;
};

class LockGuard // RAII风格代码
{
public:
    LockGuard(Mutex& lock):_lockref(lock)
    {
        _lockref.Lock();
    }
    ~LockGuard()
    {
        _lockref.Unlock();
    }
private:
    Mutex& _lockref;
};
// 抢票的代码就可以更新成为
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include "Mutex.hpp"

int ticket = 1000;
Mutex mutex;

void *route(void *arg)
{
    char *id = (char *)arg;
    while (1)
    {
        LockGuard lockguard(mutex); // 使⽤RAII⻛格的锁
        if (ticket > 0)
        {
            usleep(1000);
            printf("%s sells ticket:%d\n", id, ticket);
            ticket--;
        }
        else
        {
            break;
        }
    }
    return nullptr;
}

int main(void)
{
    pthread_t t1, t2, t3, t4;
    pthread_create(&t1, NULL, route, (void *)"thread 1");
    pthread_create(&t2, NULL, route, (void *)"thread 2");
    pthread_create(&t3, NULL, route, (void *)"thread 3");
    pthread_create(&t4, NULL, route, (void *)"thread 4");
    pthread_join(t1, NULL);
    pthread_join(t2, NULL);
    pthread_join(t3, NULL);
    pthread_join(t4, NULL);
}

RAII⻛格的互斥锁,C++11也有,⽐如:
std::mutex mtx;
std::lock_guard< std::mutex > guard(mtx);

2. 线程同步

2.1 条件变量

  • 当⼀个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了。
  • 例如⼀个线程访问队列时,发现队列为空,它只能等待,直到其它线程将⼀个节点添加到队列中。这种情况就需要⽤到条件变量。

2.2 同步概念与竞态条件

  • 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从⽽有效避免饥饿问题,叫做同步
  • 竞态条件:因为时序问题,⽽导致程序异常,我们称之为竞态条件。

2.3 条件变量函数

初始化

int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);
参数:
		cond:要初始化的条件变量
		attr:NULL

销毁

int pthread_cond_destroy(pthread_cond_t *cond)

等待条件满⾜

int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);
参数:
		cond:要在这个条件变量上等待
		mutex:互斥量,后⾯详细解释

唤醒等待

int pthread_cond_broadcast(pthread_cond_t *cond); // 唤醒所有在这个条件变量上等待的线程
int pthread_cond_signal(pthread_cond_t *cond); // 唤醒其中一个在这个条件变量上等待的线程

简单案例:

  • 我们先使⽤PTHREAD_COND/MUTEX_INITIALIZER进⾏测试,对其他细节暂不追究
  • 然后将接⼝更改成为使⽤pthread_cond_init/pthread_cond_destroy的⽅式,⽅便后续进⾏封装
#include <iostream>
#include <string.h>
#include <unistd.h>
#include <pthread.h>

pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

void *active(void *arg)
{
    std::string name = static_cast<const char *>(arg);
    while (true)
    {
        pthread_mutex_lock(&mutex);
        pthread_cond_wait(&cond, &mutex);
        std::cout << name << " 活动... " << std::endl;
        pthread_mutex_unlock(&mutex);
    }
}

int main(void)
{
    pthread_t t1, t2;
    pthread_create(&t1, NULL, active, (void *)"thread-1");
    pthread_create(&t2, NULL, active, (void *)"thread-2");
    sleep(3); // 可有可⽆,这⾥确保两个线程已经在运⾏

    while (true)
    {
        // 对⽐测试
        // pthread_cond_signal(&cond); // 唤醒⼀个线程
        pthread_cond_broadcast(&cond); // 唤醒所有线程

        sleep(1);

        pthread_join(t1, NULL);
        pthread_join(t2, NULL);
    }
}

$ ./cond
thread-1 活动...
thread-2 活动...
thread-1 活动...
thread-1 活动...
thread-2 活动...

2.4 生产者消费者模型

  • 321原则(便于记忆)
  • 3种关系:生产者与生产者、消费者与消费者、生产者与消费者
  • 2种角色:生产者和消费者
  • 1个交易场所:例如阻塞队列

2.4.1 为何要使用生产者消费者模型

⽣产者消费者模式就是通过⼀个容器来解决⽣产者和消费者的强耦合问题。⽣产者和消费者彼此之间不直接通讯,⽽通过阻塞队列来进⾏通讯,所以⽣产者⽣产完数据之后不⽤等待消费者处理,直接扔给阻塞队列,消费者不找⽣产者要数据,⽽是直接从阻塞队列⾥取,阻塞队列就相当于⼀个缓冲区,平衡了⽣产者和消费者的处理能⼒。这个阻塞队列就是⽤来给⽣产者和消费者解耦的。

2.4.2 生产者消费者模型优点

  • 解耦
  • ⽀持并发
  • ⽀持忙闲不均

在这里插入图片描述

2.5 基于BlockingQueue的生产者消费者模型

2.5.1 BlockingQueue

在多线程编程中阻塞队列(BlockingQueue)是⼀种常⽤于实现⽣产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放⼊了元素;当队列满时,往队列⾥存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

在这里插入图片描述

2.5.2 C++ queue模拟阻塞队列的生产者消费者模型

代码:

BlockQueue.hpp

#ifndef __BLOCK_QUEUE_HPP
#define __BOLCK_QUEUE_HPP

#include <iostream>
#include <queue>
#include <pthread.h>
#include <string>

const int defaultcap = 5;

template <typename T>
class BlockQueue
{
public:
    BlockQueue(int cap = defaultcap) :_cap(cap)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_consumer_cond, nullptr);
        pthread_cond_init(&_productor_cond, nullptr);
        _blockqueue_low_water = _cap * 1 / 3;
        _blockqueue_high_water = _cap * 2 / 3;
        _sleep_consumer_num = _sleep_productor_num = 0;
    }
    void Enqueue(T &in) // 生产者
    {
        pthread_mutex_lock(&_mutex);
        // 增强代码的健壮性,鲁棒性,防御性编程
        while(_bq.size() == _cap)
        {
            _sleep_productor_num++;
            pthread_cond_wait(&_productor_cond, &_mutex);
            _sleep_productor_num--;
        }
        _bq.push(in);
        if(_bq.size() > _blockqueue_high_water && _sleep_consumer_num > 0)
            pthread_cond_signal(&_consumer_cond);
        pthread_mutex_unlock(&_mutex);
    }
    void Pop(T *out) // 消费者
    {
        pthread_mutex_lock(&_mutex);
        // 增强代码的健壮性,鲁棒性,防御性编程
        while(_bq.empty())
        {
            // 0. 为什么要判断?
            // 1. 线程为什么会在临界区内部等待?
            // 访问临界资源,必然在临界区内部访问,判断资源不就绪,本质也是访问临界资源
            // 2. 为什么要把锁传递进去?
            // 等待的时候,是在临界区内部等待的,需要把锁传递进入,让pthread_cond_wait
            // 自动释放_mutex锁。当线程唤醒的时候,是在临界区内部醒来的,把锁传递进入
            // 让pthread_cond_wait自动竞争并获取_mutex锁.
            _sleep_consumer_num++;
            pthread_cond_wait(&_consumer_cond, &_mutex); // 1. 过量的唤醒信息 2. 函数调用失败 3. 伪唤醒
            _sleep_consumer_num--;
        }
        *out = _bq.front();
        _bq.pop();
        if(_bq.size() < _blockqueue_low_water && _sleep_productor_num > 0)
            pthread_cond_signal(&_productor_cond);
        pthread_mutex_unlock(&_mutex);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_consumer_cond);
        pthread_cond_destroy(&_productor_cond);
    }

private:
    std::queue<T> _bq;
    int _cap;
    pthread_mutex_t _mutex;
    pthread_cond_t _consumer_cond;
    pthread_cond_t _productor_cond;

    int _blockqueue_low_water; // bq低水位线
    int _blockqueue_high_water; // bq高水位线
    int _sleep_productor_num; // 生产者休眠数量
    int _sleep_consumer_num; // 消费者休眠数量
};

#endif

注意:这⾥采⽤模版,是想告诉我们,队列中不仅仅可以放置内置类型,⽐如int,对象也可以作为任务来参与⽣产消费的过程

#include "BlockQueue.hpp"
#include <unistd.h>

int num = 1;
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;

int GetNumber()
{
    pthread_mutex_lock(&lock);
    int number = num++;
    pthread_mutex_unlock(&lock);
    return number;
}

void* ConsumerRoutine(void* args)
{
    int number = GetNumber();
    std::string name = "Consumer-" + std::to_string(number);
    pthread_setname_np(pthread_self(), name.c_str());
    BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);
    while(true)
    {
        sleep(5);
        int data;
        bq->Pop(&data);
        std::cout << name << " 消费: " << data << std::endl;
    }
}

void* ProductorRoutine(void* args)
{
    int number = GetNumber();
    std::string name = "Productor-" + std::to_string(number);
    pthread_setname_np(pthread_self(), name.c_str());
    BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);
    int data = 10;
    while(true)
    {
        // sleep(2);

        bq->Enqueue(data);
        std::cout << name << " 生产: " << data++ << std::endl;
    }
}

int main()
{
	// 单生产者、单消费者
    BlockQueue<int>* bq = new BlockQueue<int>();
    pthread_t c, p;
    pthread_create(&c, nullptr, ConsumerRoutine, bq);
    pthread_create(&p, nullptr, ProductorRoutine, bq);
    pthread_join(c, nullptr);
    pthread_join(p, nullptr);


	// 多生产者、多
	消费者
    // pthread_t c[3], p[2];
                 // pthread_create(c, nullptr, ConsumerRoutine, bq);
    // pthread_create(c + 1, nullptr, ConsumerRoutine, bq);
    // pthread_create(c + 2, nullptr, ConsumerRoutine, bq);
    // pthread_create(p, nullptr, ProductorRoutine, bq);
    // pthread_create(p + 1, nullptr, ProductorRoutine, bq);

    // pthread_join(c[0], nullptr);
    // pthread_join(c[1], nullptr);
    // pthread_join(c[2], nullptr);
    // pthread_join(p[0], nullptr);
    // pthread_join(p[1], nullptr);

    return 0;
}

2.6 为什么pthread_cond_wait 需要互斥量?

  • 条件等待是线程间同步的⼀种⼿段,如果只有⼀个线程,条件不满⾜,⼀直等下去都不会满⾜,所以必须要有⼀个线程通过某些操作,改变共享变量,使原先不满⾜的条件变得满⾜,并且友好的通知等待在条件变量上的线程。
  • 条件不会⽆缘⽆故的突然变得满⾜了,必然会牵扯到共享数据的变化。所以⼀定要⽤互斥锁来保护。没有互斥锁就⽆法安全的获取和修改共享数据。

在这里插入图片描述

  • 按照上⾯的说法,我们设计出如下的代码:先上锁,发现条件不满⾜,解锁,然后等待在条件变量上不就⾏了,如下代码:
// 错误的设计
pthread_mutex_lock(&mutex);
while (condition_is_false) {
	pthread_mutex_unlock(&mutex);
	//解锁之后,等待之前,条件可能已经满⾜,信号已经发出,但是该信号可能被错过
	pthread_cond_wait(&cond);
	pthread_mutex_lock(&mutex);
}
pthread_mutex_unlock(&mutex);
  • 由于解锁和等待不是原⼦操作。调⽤解锁之后,pthread_cond_wait之前,如果已经有其他线程获取到互斥量,摒弃条件满⾜,发送了信号,那么pthread_cond_wait将错过这个信号,可能会导致线程永远阻塞在这个pthread_cond_wait。所以解锁和等待必须是⼀个原⼦操作。
  • int pthread_cond_wait(pthread_cond_ t *cond,pthread_mutex_ t * mutex);进⼊该函数后,会去看条件量等于0不?等于,就把互斥量变成1,直到cond_wait返回,把条件量改成1,把互斥量恢复成原样。

2.7 条件变量使用规范

  • 等待条件代码
pthread_mutex_lock(&mutex);
while (条件为假)
	pthread_cond_wait(&cond, &mutex);
修改条件
pthread_mutex_unlock(&mutex);
  • 给条件发送信号代码
pthread_mutex_lock(&mutex);
设置条件为真
pthread_cond_signal(cond);
pthread_mutex_unlock(&mutex);

2.8 条件变量的封装

Cond.hpp

#ifndef __COND_HPP
#define __COND_HPP

#include <iostream>
#include <pthread.h>
#include "Mutex.hpp"

class Cond
{
public:
    Cond()
    {
        pthread_cond_init(&_cond, nullptr);
    }
    void Wait(Mutex& mutex)
    {
        pthread_cond_wait(&_cond, mutex.Ptr());
    }
    void Signal()
    {
        pthread_cond_signal(&_cond);
    }
    void Broadcast()
    {
        pthread_cond_broadcast(&_cond);
    }
    ~Cond()
    {
        pthread_cond_destroy(&_cond);
    }
private:
    pthread_cond_t _cond;
};

#endif

为了让条件变量更具有通⽤性,建议封装的时候,不要在Cond类内部引⽤对应的封装互斥量,要不然后⾯组合的时候,会因为代码耦合的问题难以初始化,因为⼀般⽽⾔Mutex和Cond基本是⼀起创建的。

2.9 POSIX信号量

POSIX信号量和SystemV信号量作⽤相同,都是⽤于同步操作,达到⽆冲突的访问共享资源⽬的。但POSIX可以⽤于线程间同步。

初始化信号量

#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
	pshared:0表⽰线程间共享,⾮零表⽰进程间共享
	value:信号量初始值

销毁信号量

int sem_destroy(sem_t *sem);

等待信号量

功能:等待信号量,会将信号量的值减
int sem_wait(sem_t *sem); //P()

发布信号量

功能:发布信号量,表⽰资源使⽤完毕,可以归还资源了。将信号量值加。
int sem_post(sem_t *sem);//V()

⽣产者-消费者的例⼦是基于queue的,其空间可以动态分配,现在基于固定⼤⼩的环形队列重写这个程序(POSIX信号量):

2.9.1 基于环形队列的生产者消费者模型

  • 环形队列采⽤数组模拟,⽤模运算来模拟环状特性

在这里插入图片描述

  • 环形结构起始状态和结束状态都是⼀样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留⼀个空的位置,作为满的状态

在这里插入图片描述

  • 但是我们现在有信号量这个计数器,就很简单的进⾏多线程间的同步过程。

Sem.hpp

#ifndef __SEM_HPP
#define __SEM_HPP

#include <iostream>
#include <semaphore.h>

class Sem
{
public:
    Sem(int init_val)
    {
        if(init_val >= 0)
            sem_init(&_sem, 0, init_val);
    }
    void P()
    {
        sem_wait(&_sem);
    }
    void V()
    {
        sem_post(&_sem);
    }
    ~Sem()
    {
        sem_destroy(&_sem);
    }
private:
    sem_t _sem;
};

#endif

RingQueue.hpp

#pragma once

#include <iostream>
#include <string>
#include <vector>
#include <pthread.h>
#include "Sem.hpp"
#include "Mutex.hpp"

const int defaultcap = 5;

template <typename T>
class RingQueue
{
public:
    RingQueue(int cap = defaultcap)
        : _cap(cap), _rq(cap), _consumer_step(0), _productor_step(0), _blank_sem(cap), _data_sem(0)
    {
    }
    void Enqueue(T &in) // 生产者调用
    {
        // 1. 预定资源
        _blank_sem.P(); // 买票
        {
            LockGuard lockguard(_pmutex); // 排队
            // 2. 找位置生产
            _rq[_productor_step++] = in;
            _productor_step %= _cap;
        }
        // 3. 释放数据资源
        _data_sem.V();
    }
    void Pop(T *out) // 消费者调用
    {
        _data_sem.P();
        {
            LockGuard lockguard(_cmutex);
            *out = _rq[_consumer_step++];
            _consumer_step %= _cap;
        }
        _blank_sem.V();
    }
    ~RingQueue()
    {
    }

private:
    int _cap;           // 环形队列的容量
    std::vector<T> _rq; // 环形队列

    int _consumer_step;  // 消费位置
    int _productor_step; // 生产位置

    Sem _blank_sem; // 格子资源计数器,生产者关心
    Sem _data_sem;  // 数据信号量,消费者关心

    Mutex _cmutex;
    Mutex _pmutex;
};

Main.cc

#include "RingQueue.hpp"
#include <unistd.h>
#include "Task.hpp"
#include <ctime>
#include <cstdlib>

void *ConsumerRoutine(void *args)
{
    RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
    while (true)
    {
        sleep(1);
        Task t;
        // 消费任务
        rq->Pop(&t);
        // 处理任务
        t();
        std::cout << "消费任务: " << t.GetResult() << std::endl;
    }
    return nullptr;
}

void *ProductorRoutine(void *args)
{
    RingQueue<Task> *rq = static_cast<RingQueue<Task> *>(args);
    while (true)
    {
        // 1. 获取任务
        int x = rand() % 10 + 1;
        usleep(1234);
        int y = rand() % 10 + 1;
        Task t(x, y);
        // 2. 生产任务
        rq->Enqueue(t);
        std::cout << "生产任务: " << t.Question() << std::endl;
    }
    return nullptr;
}

int main()
{
    srand(time(nullptr) ^ getpid());
    RingQueue<Task> *rq = new RingQueue<Task>();
    pthread_t c, p;
    pthread_create(&c, nullptr, ConsumerRoutine, rq);
    pthread_create(&p, nullptr, ProductorRoutine, rq);

    pthread_join(c, nullptr);
    pthread_join(p, nullptr);
}

// Mutex cnt_lock;
// Mutex screen_lock;

// int data = 1;

// int GetData()
// {
//     cnt_lock.Lock();
//     int result = data++;
//     cnt_lock.Unlock();
//     return result;
// }

// void Print(const std::string &name, const std::string& info)
// {
//     screen_lock.Lock();
//     std::cout << name << " : " << info << std::endl;
//     screen_lock.Unlock();
// }

// class ThreadData
// {
// public:
//     ThreadData(RingQueue<int> *r, const std::string &n)
//         : rq(r), name(n)
//     {
//     }
//     ~ThreadData() {}

// public:
//     std::string name;
//     RingQueue<int> *rq;
// };

// void *ConsumerRoutine(void *args)
// {
//     ThreadData *td = static_cast<ThreadData *>(args);
//     pthread_setname_np(pthread_self(), td->name.c_str());
//     int data = 0;
//     while (true)
//     {
//         td->rq->Pop(&data);
//         Print(td->name, " 消费数据: " + std::to_string(data));
//     }
// }

// void *ProductorRoutine(void *args)
// {
//     ThreadData *td = static_cast<ThreadData *>(args);
//     pthread_setname_np(pthread_self(), td->name.c_str());
//     while (true)
//     {
//         int data = GetData();
//         sleep(3);
//         td->rq->Enqueue(data);
//         Print(td->name, " 生产数据: " + std::to_string(data));
//     }
// }

// int main()
// {
//     // 多生产多消费
//     RingQueue<int> *rq = new RingQueue<int>();
//     pthread_t c[2], p[3];

//     ThreadData *td0 = new ThreadData(rq, "consumer-1");
//     pthread_create(c, nullptr, ConsumerRoutine, td0);
//     ThreadData *td1 = new ThreadData(rq, "consumer-2");
//     pthread_create(c + 1, nullptr, ConsumerRoutine, td1);
//     ThreadData *td2 = new ThreadData(rq, "productor-1");
//     pthread_create(p, nullptr, ProductorRoutine, td2);
//     ThreadData *td3 = new ThreadData(rq, "productor-2");
//     pthread_create(p + 1, nullptr, ProductorRoutine, td3);
//     ThreadData *td4 = new ThreadData(rq, "productor-3");
//     pthread_create(p + 2, nullptr, ProductorRoutine, td4);
//     pthread_join(c[0], nullptr);
//     pthread_join(c[1], nullptr);

//     pthread_join(p[0], nullptr);
//     pthread_join(p[1], nullptr);
//     pthread_join(p[2], nullptr);

//     // 单生产单消费
//     // RingQueue<int>* rq = new RingQueue<int>();
//     // pthread_t c, p;
//     // pthread_create(&c, nullptr, ConsumerRoutine, rq);
//     // pthread_create(&p, nullptr, ProductorRoutine, rq);

//     // pthread_join(c, nullptr);
//     // pthread_join(p, nullptr);

//     return 0;
// }

3. 线程池

下⾯开始,我们结合我们之前所做的所有封装,进⾏⼀个线程池的设计。在写之前,我们要做如下准备

  • 准备线程的封装
  • 准备锁和条件变量的封装
  • 引⼊⽇志,对线程进⾏封装

3.1 日志与策略模式

什么是设计模式

IT⾏业这么⽕,涌⼊的⼈很多.俗话说林⼦⼤了啥⻦都有.⼤佬和菜鸡们两极分化的越来越严重.为了让菜鸡们不太拖⼤佬的后腿,于是⼤佬们针对⼀些经典的常⻅的场景,给定了⼀些对应的解决⽅案,这个就是设计模式

⽇志认识

计算机中的⽇志是记录系统和软件运⾏中发⽣事件的⽂件,主要作⽤是监控运⾏状态、记录异常信息,帮助快速定位问题并⽀持程序员进⾏问题修复。它是系统维护、故障排查和安全管理的重要⼯具。

⽇志格式以下⼏个指标是必须得有的

  • 时间戳
  • ⽇志等级
  • ⽇志内容

以下⼏个指标是可选的

  • ⽂件名⾏号
  • 进程,线程相关id信息等

⽇志有现成的解决⽅案,如:spdlog、glog、Boost.Log、Log4cxx等等,我们依旧采⽤⾃定义⽇志的⽅式。

这⾥我们采⽤设计模式-策略模式来进⾏⽇志的设计。

我们想要的⽇志格式如下:

[可读性很好的时间] [⽇志等级] [进程pid] [打印对应⽇志的⽂件名][⾏号] - 消息内容,⽀持可变参数
[2024-08-04 12:27:03] [DEBUG] [202938] [main.cc] [16] - hello world
[2024-08-04 12:27:03] [DEBUG] [202938] [main.cc] [17] - hello world
[2024-08-04 12:27:03] [DEBUG] [202938] [main.cc] [18] - hello world
[2024-08-04 12:27:03] [DEBUG] [202938] [main.cc] [20] - hello world
[2024-08-04 12:27:03] [DEBUG] [202938] [main.cc] [21] - hello world
[2024-08-04 12:27:03] [WARNING] [202938] [main.cc] [23] - hello world

Logger.hpp

#ifndef __LOGGER_HPP
#define __LOGGER_HPP

#include <iostream>
#include <cstdio>
#include <string>
#include <sys/time.h>
#include <ctime>
#include <unistd.h>
#include <filesystem> // C++17
#include <fstream>
#include <memory>
#include <sstream>
#include "Mutex.hpp"

namespace NS_LOG_MODULE
{
    enum class LogLevel
    {
        INFO,
        WARNING,
        ERROR,
        FATAL,
        DEBUG
    };
    LogLevel ExtractLevelFromMessage(const std::string &message)
    {
        // 寻找第二个 '[' 和对应的 ']' 之间的内容
        size_t first_bracket = message.find('[');
        if (first_bracket == std::string::npos)
            return LogLevel::INFO; // 默认
        size_t second_bracket = message.find('[', first_bracket + 1);
        if (second_bracket == std::string::npos)
            return LogLevel::INFO;
        size_t end_bracket = message.find(']', second_bracket);
        if (end_bracket == std::string::npos)
            return LogLevel::INFO;

        std::string level_str = message.substr(second_bracket + 1, end_bracket - second_bracket - 1);

        // 转换为 LogLevel
        if (level_str == "Info")
            return LogLevel::INFO;
        if (level_str == "Warning")
            return LogLevel::WARNING;
        if (level_str == "Error")
            return LogLevel::ERROR;
        if (level_str == "Fatal")
            return LogLevel::FATAL;
        if (level_str == "Debug")
            return LogLevel::DEBUG;
        return LogLevel::INFO; // 默认
    }

    // 根据等级获取对应的文件名
    std::string GetLevelFileName(LogLevel level)
    {
        switch (level)
        {
        case LogLevel::INFO:
            return "log.info";
        case LogLevel::WARNING:
            return "log.warning";
        case LogLevel::ERROR:
            return "log.error";
        case LogLevel::FATAL:
            return "log.fatal";
        case LogLevel::DEBUG:
            return "log.debug";
        default:
            return "log.txt";
        }
    }
    std::string LogLevel2Message(LogLevel level)
    {
        switch (level)
        {
        case LogLevel::INFO:
            return "Info";
        case LogLevel::WARNING:
            return "Warning";
        case LogLevel::ERROR:
            return "Error";
        case LogLevel::FATAL:
            return "Fatal";
        case LogLevel::DEBUG:
            return "Debug";
        default:
            return "Unknown";
        }
    }
    std::string GetCurrentTime()
    {
        struct timeval current_time;
        gettimeofday(&current_time, nullptr);

        struct tm struct_time;
        localtime_r(&(current_time.tv_sec), &struct_time); // r: 可重入函数
        char timestr[128];
        snprintf(timestr, sizeof(timestr), "%04d-%02d-%02d %02d:%02d:%02d.%ld",
                 struct_time.tm_year + 1900,
                 struct_time.tm_mon + 1,
                 struct_time.tm_mday,
                 struct_time.tm_hour,
                 struct_time.tm_min,
                 struct_time.tm_sec,
                 current_time.tv_usec);
        return timestr;
    }

    // 输出角度 -- 刷新策略
    // 1. 显示器打印
    // 2. 文件写入

    // 策略模式,策略接口
    class LogStrategy
    {
    public:
        virtual ~LogStrategy() = default;
        virtual void SyncLog(const std::string &message) = 0;
    };
    // 控制台日志刷新策略,日志将来要向显示器打印
    class ConsoleStrategy : public LogStrategy
    {
    public:
        void SyncLog(const std::string &message) override
        {
            LockGuard lockguard(_mutex);
            std::cerr << message << std::endl;
        }
        ~ConsoleStrategy()
        {
        }

    private:
        Mutex _mutex;
    };

    const std::string defaultpath = "./log";
    const std::string defaultfilename = "log.txt";

    // 文件策略
    class FileLogStrategy : public LogStrategy
    {
    public:
        FileLogStrategy(const std::string &path = defaultpath, const std::string &filename = defaultfilename)
            : _logpath(path), _logfilename(filename)
        {
            LockGuard lockguard(_mutex);
            if (std::filesystem::exists(_logpath))
                return;
            try
            {
                std::filesystem::create_directories(_logpath);
            }
            catch (const std::filesystem::filesystem_error &e)
            {
                std::cerr << e.what() << "\n";
            }
        }
        void SyncLog(const std::string &message) override
        {
            LockGuard lockguard(_mutex);
            if (!_logpath.empty() && _logpath.back() != '/')
            {
                _logpath += "/";
            }
            std::string targetlog = _logpath + _logfilename;
            std::ofstream out(targetlog, std::ios::app); // 追加
            if (!out.is_open())
            {
                return;
            }
            out << message << "\n";
            out.close();
        }
        ~FileLogStrategy()
        {
        }

    private:
        std::string _logpath;
        std::string _logfilename;
        Mutex _mutex;
    };

    // const std::string defaultfilename = "log.info";
    // const std::string defaultfilename = "log.warning";
    // const std::string defaultfilename = "log.fatal";
    // const std::string defaultfilename = "log.error";
    // const std::string defaultfilename = "log.debug";
    // 文件策略&&分日志等级来进行保存
    class FileLogLevelStrategy : public LogStrategy
    {
    public:
        FileLogLevelStrategy(const std::string &path = defaultpath)
            : _logpath(path)
        {
            // 确保目录存在
            LockGuard lockguard(_mutex);
            if (std::filesystem::exists(_logpath))
                return;
            try
            {
                std::filesystem::create_directories(_logpath);
            }
            catch (const std::filesystem::filesystem_error &e)
            {
                std::cerr << e.what() << "\n";
            }
        }

        void SyncLog(const std::string &message) override
        {
            // 解析等级
            LogLevel level = ExtractLevelFromMessage(message);
            std::string filename = GetLevelFileName(level);

            // 构建完整路径
            std::string fullpath = _logpath;
            if (!fullpath.empty() && fullpath.back() != '/')
                fullpath += "/";
            fullpath += filename;

            // 线程安全写入
            LockGuard lockguard(_mutex);
            std::ofstream out(fullpath, std::ios::app);
            if (out.is_open())
            {
                out << message << "\n";
                out.close();
            }
        }

        ~FileLogLevelStrategy() = default;

    private:
        std::string _logpath;
        Mutex _mutex;
    };

    // 日志类:
    // 1. 日志的生成
    // 2. 根据不同的策略,进行刷新
    class Logger
    {
    public:
        // 日志的生成
        // 构建日志字符串
        Logger() {}
        void UseConsoleStrategy()
        {
            _strategy = std::make_unique<ConsoleStrategy>();
        }
        void UseFileStrategy()
        {
            _strategy = std::make_unique<FileLogStrategy>();
        }
        void UseFileLevelStrategy()
        {
            _strategy = std::make_unique<FileLogLevelStrategy>();
        }
        // 内部类,标识一条完整的日志信息
        // 一条完整的日志信息 = 左半固定部分 + 右半不固定部分
        // LogMessage RAII风格的方式,进行刷新
        class LogMessage
        {
        public:
            LogMessage(LogLevel level, std::string &filename, int line, Logger &logger)
                : _level(level), _curr_time(GetCurrentTime()), _pid(getppid()),
                  _filename(filename), _line(line), _logger(logger)
            {
                // 先构建出来左半部分
                std::stringstream ss;
                ss << "[" << _curr_time << "] "
                   << "[" << LogLevel2Message(_level) << "] "
                   << "[" << _pid << "] "
                   << "[" << _filename << "] "
                   << "[" << _line << "] "
                   << "- ";
                _loginfo = ss.str();
            }
            template <typename T>
            LogMessage &operator<<(const T &info)
            {
                std::stringstream ss;
                ss << info;
                _loginfo += ss.str();
                return *this; // 返回当前LOgMessage对象,方便下次继续进行<<
            }
            ~LogMessage()
            {
                if (_logger._strategy)
                {
                    _logger._strategy->SyncLog(_loginfo);
                }
            }

        private:
            LogLevel _level;
            std::string _curr_time;
            pid_t _pid;
            std::string _filename;
            int _line;
            std::string _loginfo; // 一条完整的日志信息

            // 一个引用,引用外部的Logger类对象
            Logger &_logger; // 方便后续进行策略式刷新
        };
        // 故意采用拷贝LogMessage
        LogMessage operator()(LogLevel level, std::string filename, int line)
        {
            return LogMessage(level, filename, line, *this);
        }

        // void Debug(const std::string& message)
        // {
        //     if(_strategy != nullptr)
        //     {
        //         _strategy->SyncLog(message);
        //     }
        // }
        ~Logger() {}

    private:
        std::unique_ptr<LogStrategy> _strategy; // 刷新策略
    };

    // 日志对象:全局使用
    Logger logger;

#define ENABLE_CONSOLE_LOG_STRATEGY() logger.UseConsoleStrategy()
#define ENABLE_File_LOG_STRATEGY() logger.UseFileStrategy()
#define ENABLE_FILE_LEVEL_LOG_STRATEGY() logger.UseFileLevelStrategy()

#define LOG(level) logger(level, __FILE__, __LINE__)
}

#endif
#include "Logger.hpp"

using namespace NS_LOG_MODULE;

int main()
{
	ENABLE_FILE_LEVEL_LOG_STRATEGY();
    LOG(LogLevel::DEBUG) << " hello world " << 3.14 << " 100 " << "hello bit";
    LOG(LogLevel::WARNING) << " hello world " << 3.14 << " 100 " << "hello bit";
    LOG(LogLevel::ERROR) << " hello world " << 3.14 << " 100 " << "hello bit";
    LOG(LogLevel::FATAL) << " hello world " << 3.14 << " 100 " << "hello bit";
    LOG(LogLevel::INFO) << " hello world " << 3.14 << " 100 " << "hello bit";
    
    ENABLE_CONSOLE_LOG_STRATEGY();
    LOG(LogLevel::DEBUG) << " hello world " << 3.14 << " 100 " << "hello bit";
    LOG(LogLevel::WARNING) << " hello world " << 3.14 << " 100 " << "hello bit";
    LOG(LogLevel::ERROR) << " hello world " << 3.14 << " 100 " << "hello bit";
    LOG(LogLevel::FATAL) << " hello world " << 3.14 << " 100 " << "hello bit";
    LOG(LogLevel::INFO) << " hello world " << 3.14 << " 100 " << "hello bit";

    ENABLE_File_LOG_STRATEGY();
    LOG(LogLevel::DEBUG) << " hello world " << 3.14 << " 100 " << "hello bit";
    LOG(LogLevel::WARNING) << " hello world " << 3.14 << " 100 " << "hello bit";
    LOG(LogLevel::ERROR) << " hello world " << 3.14 << " 100 " << "hello bit";
    LOG(LogLevel::FATAL) << " hello world " << 3.14 << " 100 " << "hello bit";
    LOG(LogLevel::INFO) << " hello world " << 3.14 << " 100 " << "hello bit";


    // ENABLE_CONSOLE_LOG_STRATEGY();
    // logger.Debug("console strategy!");
    // logger.Debug("console strategy!");
    // logger.Debug("console strategy!");
    // logger.Debug("console strategy!");
    // ENABLE_File_LOG_STRATEGY();
    // logger.Debug("file strategy!");
    // logger.Debug("file strategy!");
    // logger.Debug("file strategy!");
    // logger.Debug("file strategy!");
    // ENABLE_CONSOLE_LOG_STRATEGY();
    // logger.Debug("console strategy!1");
    // logger.Debug("console strategy!1");
    // logger.Debug("console strategy!1");
    // logger.Debug("console strategy!1");


    // std::cout << GetCurrentTime() << std::endl;
    // sleep(1);
    // std::cout << GetCurrentTime() << std::endl;
    // sleep(1);
    // std::cout << GetCurrentTime() << std::endl;
    // sleep(1);
    return 0;
}

3.2 线程池设计

线程池:

⼀种线程使⽤模式。线程过多会带来调度开销,进⽽影响缓存局部性和整体性能。⽽线程池维护着多个线程,等待着监督管理者分配可并发执⾏的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利⽤,还能防⽌过分调度。可⽤线程数量应该取决于可⽤的并发处理器、处理器内核、内存、⽹络sockets等的数量。

线程池的应⽤场景:

  • 需要⼤量的线程来完成任务,且完成任务的时间⽐较短。⽐如WEB服务器完成⽹⻚请求这样的任务,使⽤线程池技术是⾮常合适的。因为单个任务⼩,⽽任务数量巨⼤,你可以想象⼀个热⻔⽹站的点击次数。但对于⻓时间的任务,⽐如⼀个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间⽐线程的创建时间⼤多了。
  • 对性能要求苛刻的应⽤,⽐如要求服务器迅速响应客⼾请求。
  • 接受突发性的⼤量请求,但不⾄于使服务器因此产⽣⼤量线程的应⽤。突发性⼤量客⼾请求,在没有线程池情况下,将产⽣⼤量线程,虽然理论上⼤部分操作系统线程数⽬最⼤值不是问题,但短时间内产⽣⼤量线程可能使内存到达极限,出现错误.

线程池的种类

a. 创建固定数量线程池,循环从任务队列中获取任务对象,获取到任务对象后,执⾏任务对象中的任务接⼝
b. 浮动线程池,其他同上
此处,我们选择固定线程个数的线程池。

在这里插入图片描述

ThreadPool.hpp

#pragma once

#include <iostream>
#include <vector>
#include <queue>
#include "Logger.hpp"
#include "Thread.hpp"
#include "Cond.hpp"

namespace NS_THREAD_POOL_MODULE
{

    using namespace NS_LOG_MODULE;
    using namespace NS_THREAD_MODULE;

    const int defaultnum = 5;

    // void Test()
    // {
    //     char name[64];
    //     pthread_getname_np(pthread_self(), name, sizeof(name));
    //     while(true)
    //     {
    //         LOG(LogLevel::DEBUG) << "我是一个任务,我正在运行" << name;
    //         sleep(1);
    //     }
    // }

    template <typename T>
    class ThreadPool
    {
    private:
        // void HandlerTask(ThreadPool* this)
        void HandlerTask()
        {
            char name[64];
            pthread_getname_np(pthread_self(), name, sizeof(name));
            while (true)
            {
                T task;
                {
                    // 保护临界区
                    LockGuard lockguard(_mutex);
                    // 检测任务: 休眠:empty && _isrunning = true
                    while (_tasks.empty() && _isrunning)
                    {
                        // 没有任务,休眠
                        _slaver_sleep_num++;
                        _cond.Wait(_mutex);
                        _slaver_sleep_num--;
                    }
                    // 线程池退出: empty && _isrunning = false
                    if (_tasks.empty() && !_isrunning)
                    {
                        _mutex.Unlock();
                        break;
                    }
                    // 有任务,取任务,本质:把任务由公共变成私有
                    task = _tasks.front();
                    _tasks.pop();
                }
                // 处理任务,约定
                // 处理任务需要在临界区处理吗?不需要,取任务后,任务是线程私有的
                LOG(LogLevel::INFO) << name << " 处理任务:";
                task();
                LOG(LogLevel::DEBUG) << task.Result();
            }
            // 线程退出
            LOG(LogLevel::DEBUG) << name << " quit...";
        }

    public:
        ThreadPool(int slaver_num = defaultnum)
            : _isrunning(false), _slaver_num(slaver_num), _slaver_sleep_num(0)
        {
            for (int idx = 0; idx < _slaver_num; idx++)
            {
                // // auto f = std::bind(&ThreadPool::HandlerTask, this);
                // auto f = [this](){this->HandlerTask();};
                // _slavers.emplace_back(f);
                _slavers.emplace_back([this]()
                                      { this->HandlerTask(); });
            }
        }
        void Start()
        {
            if (_isrunning)
            {
                LOG(LogLevel::WARNING) << "Thread Pool Is Already Running";
                return;
            }
            _isrunning = true;
            for (auto &slaver : _slavers)
            {
                slaver.Start();
            }
        }
        void Stop()
        {
            // version1
            // if (!_isrunning)
            // {
            //     LOG(LogLevel::WARNING) << "Thread Pool Is Not Running";
            //     return;
            // }
            // for (auto &slaver : _slavers)
            // {
            //     slaver.Stop();
            // }
            // _isrunning = false;

            // version2
            // 1. _isrunning = false
            // 2. 处理完成tasks的所有任务
            // 线程状态: 休眠 | 正在处理任务 —> 让所有线程全部唤醒
            // HandlerTask自动break
            _mutex.Lock();
            _isrunning = false;
            if (_slaver_sleep_num > 0)
                _cond.Broadcast();
            _mutex.Unlock();
        }
        void Wait()
        {
            for (auto &slaver : _slavers)
            {
                slaver.Join();
            }
        }
        void Enqueue(T in)
        {
            _mutex.Lock();
            _tasks.push(in);
            if (_slaver_sleep_num > 0)
                _cond.Signal();
            _mutex.Unlock();
        }
        ~ThreadPool()
        {
        }

    private:
        bool _isrunning;
        int _slaver_num;
        std::vector<Thread> _slavers;
        std::queue<T> _tasks; // 临界资源
        Mutex _mutex;
        Cond _cond;
        int _slaver_sleep_num;
    };
}

3.3 线程安全的单例模式

3.3.1 什么是单例模式

创建对象的时机
单例模式,就是只允许在加载或者运行期间,整体最多创建一个该类对象

  1. 加载到内存的时候,创建对象,int gval = 100;
  2. 进程在运行期间,创建对象,int* val = (int*)malloc(sizeof(int));

3.3.2 单例模式的特点

某些类,只应该具有⼀个对象(实例),就称之为单例.

例如⼀个男⼈只能有⼀个媳妇.

在很多服务器开发场景中,经常需要让服务器加载很多的数据(上百G)到内存中.此时往往要⽤⼀个单例的类来管理这些数据.

3.3.3 饿汉实现方式和懒汉实现方式

[洗碗的例⼦]

吃完饭, ⽴刻洗碗, 这种就是饿汉⽅式. 因为下⼀顿吃的时候可以⽴刻拿着碗就能吃饭.

吃完饭, 先把碗放下, 然后下⼀顿饭⽤到这个碗了再洗碗, 就是懒汉⽅式.

懒汉⽅式最核⼼的思想是"延时加载".从⽽能够优化服务器的启动速度.

3.3.4 饿汉方式实现单例模式

template <typename T>
class Singleton {
	static T data;
public:
	static T* GetInstance() 
	{
		return &data;
	}
};

只要通过Singleton这个包装类来使⽤T对象,则⼀个进程中只有⼀个T对象的实例.

3.3.5 懒汉方式实现单例模式

template <typename T>
class Singleton {
	static T* inst;
public:
	static T* GetInstance() 
	{
		if (inst == NULL)
			inst = new T();
		return inst;
	}
};

存在⼀个严重的问题,线程不安全.

第⼀次调⽤GetInstance的时候,如果两个线程同时调⽤,可能会创建出两份T对象的实例.

但是后续再次调⽤,就没有问题了.

3.3.6 懒汉方式实现单例模式(线程安全版本)

// 懒汉模式, 线程安全
template <typename T>
class Singleton {
	volatile static T* inst;  // 需要设置volatile 关键字, 否则可能被编译器优化
	static std::mutex lock;
public:
	static T* GetInstance() 
	{
		if (inst == NULL) 
		{
			lock.lock();          
			if (inst == NULL)
				inst = new T();
			lock.unlock();
		}
		return inst;
	}
};

注意事项:

  1. 加锁解锁的位置
  2. 双重if判定,避免不必要的锁竞争
  3. volatile关键字防⽌过度优化

3.4 单例式线程池

ThreadPool.hpp

#pragma once

#include <iostream>
#include <vector>
#include <queue>
#include "Logger.hpp"
#include "Thread.hpp"
#include "Cond.hpp"

namespace NS_THREAD_POOL_MODULE
{

    using namespace NS_LOG_MODULE;
    using namespace NS_THREAD_MODULE;

    const int defaultnum = 5;

    // void Test()
    // {
    //     char name[64];
    //     pthread_getname_np(pthread_self(), name, sizeof(name));
    //     while(true)
    //     {
    //         LOG(LogLevel::DEBUG) << "我是一个任务,我正在运行" << name;
    //         sleep(1);
    //     }
    // }

    template <typename T>
    class ThreadPool
    {
    private:
        // void HandlerTask(ThreadPool* this)
        void HandlerTask()
        {
            char name[64];
            pthread_getname_np(pthread_self(), name, sizeof(name));
            while (true)
            {
                T task;
                {
                    // 保护临界区
                    LockGuard lockguard(_mutex);
                    // 检测任务: 休眠:empty && _isrunning = true
                    while (_tasks.empty() && _isrunning)
                    {
                        // 没有任务,休眠
                        _slaver_sleep_num++;
                        _cond.Wait(_mutex);
                        _slaver_sleep_num--;
                    }
                    // 线程池退出: empty && _isrunning = false
                    if (_tasks.empty() && !_isrunning)
                    {
                        _mutex.Unlock();
                        break;
                    }
                    // 有任务,取任务,本质:把任务由公共变成私有
                    task = _tasks.front();
                    _tasks.pop();
                }
                // 处理任务,约定
                // 处理任务需要在临界区处理吗?不需要,取任务后,任务是线程私有的
                LOG(LogLevel::INFO) << name << " 处理任务:";
                task();
                LOG(LogLevel::DEBUG) << task.Result();
            }
            // 线程退出
            LOG(LogLevel::DEBUG) << name << " quit...";
        }

        ThreadPool(int slaver_num = defaultnum)
            : _isrunning(false), _slaver_num(slaver_num), _slaver_sleep_num(0)
        {
            for (int idx = 0; idx < _slaver_num; idx++)
            {
                // // auto f = std::bind(&ThreadPool::HandlerTask, this);
                // auto f = [this](){this->HandlerTask();};
                // _slavers.emplace_back(f);
                _slavers.emplace_back([this]()
                                      { this->HandlerTask(); });
            }
        }

        // 赋值、拷贝构造禁止
        ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;
        ThreadPool(const ThreadPool<T> &) = delete;

    public:
        // 如果多线程获得这个单例呢?加锁
        static ThreadPool<T> *Instance()
        {
            if(nullptr == _instance) // 双if判断,解决效率问题,保证对象创建后不会再频繁访问锁
            {
                LockGuard lockguard(_lock);
                if (nullptr == _instance)
                {
                    // 第一次调用
                    _instance = new ThreadPool<T>();
                    _instance->Start();
                    LOG(LogLevel::DEBUG) << "第一次使用线程池,创建线程池对象";
                }
            }
            return _instance;
        }

        void Start()
        {
            if (_isrunning)
            {
                LOG(LogLevel::WARNING) << "Thread Pool Is Already Running";
                return;
            }
            _isrunning = true;
            for (auto &slaver : _slavers)
            {
                slaver.Start();
            }
        }
        void Stop()
        {
            // version1
            // if (!_isrunning)
            // {
            //     LOG(LogLevel::WARNING) << "Thread Pool Is Not Running";
            //     return;
            // }
            // for (auto &slaver : _slavers)
            // {
            //     slaver.Stop();
            // }
            // _isrunning = false;

            // version2
            // 1. _isrunning = false
            // 2. 处理完成tasks的所有任务
            // 线程状态: 休眠 | 正在处理任务 —> 让所有线程全部唤醒
            // HandlerTask自动break
            _mutex.Lock();
            _isrunning = false;
            if (_slaver_sleep_num > 0)
                _cond.Broadcast();
            _mutex.Unlock();
        }
        void Wait()
        {
            for (auto &slaver : _slavers)
            {
                slaver.Join();
            }
        }
        void Enqueue(T in)
        {
            _mutex.Lock();
            _tasks.push(in);
            if (_slaver_sleep_num > 0)
                _cond.Signal();
            _mutex.Unlock();
        }
        ~ThreadPool()
        {
        }

    private:
        bool _isrunning;
        int _slaver_num;
        std::vector<Thread> _slavers;
        std::queue<T> _tasks; // 临界资源
        Mutex _mutex;
        Cond _cond;
        int _slaver_sleep_num;

        // 添加单例模式
        static ThreadPool<T> *_instance;
        static Mutex _lock; // 保证单例的安全
    };
    template <typename T>
    ThreadPool<T> *ThreadPool<T>::_instance = nullptr;
    template <typename T>
    Mutex ThreadPool<T>::_lock;
}

Main.cc

#include <memory>
#include <iostream>
#include <functional>
#include "Logger.hpp"
#include "ThreadPool.hpp"
#include <ctime>
#include <cstdlib>

using namespace NS_LOG_MODULE;
using namespace NS_THREAD_POOL_MODULE;

class Task
{
public:
    Task(){}
    Task(int x, int y)
    : _x(x), _y(y)
    {}
    void operator()()
    {
        _result = _x + _y;
    }
    std::string Result()
    {
        return std::to_string(_x) + " + " + std::to_string(_y) + " = " + std::to_string(_result);
    }
    ~Task(){}
private:
    int _x;
    int _y;
    int _result;
};

int main()
{
    // 创建对象的时机
    // 单例模式,就是只允许在加载或者运行期间,整体最多创建一个该类对象
    // 1. 加载到内存的时候,创建对象,int gval = 100;
    // 2. 进程在运行期间,创建对象,int* val = (int*)malloc(sizeof(int));
    ENABLE_CONSOLE_LOG_STRATEGY();
    srand(time(nullptr) ^ getpid());

    int cnt = 10;
    while(cnt--)
    {
        int x = rand() % 10 + 1;
        usleep(1214);
        int y = rand() % 10 + 1;
        Task t(x, y);
        ThreadPool<Task>::Instance()->Enqueue(t);
        sleep(1);
    }

    ThreadPool<Task>::Instance()->Stop();

    ThreadPool<Task>::Instance()->Wait();

    // sleep(5);

    return 0;
}

4. 线程安全和重入问题

概念

线程安全:就是多个线程在访问共享资源时,能够正确地执⾏,不会相互⼲扰或破坏彼此的执⾏结果。⼀般⽽⾔,多个线程并发同⼀段只有局部变量的代码时,不会出现不同的结果。但是对全局变量或者静态变量进⾏操作,并且没有锁保护的情况下,容易出现该问题。

重⼊:同⼀个函数被不同的执⾏流调⽤,当前⼀个流程还没有执⾏完,就有其他的执⾏流再次进⼊,我们称之为重⼊。⼀个函数在重⼊的情况下,运⾏结果不会出现任何不同或者任何问题,则该函数被称为可重⼊函数,否则,是不可重⼊函数。

学到现在,其实我们已经能理解重⼊其实可以分为两种情况

  • 多线程重⼊函数
  • 信号导致⼀个执⾏流重复进⼊函数

在这里插入图片描述

不要被上⾯绕⼝令式的话语唬住,你只要仔细观察,其实对应概念说的都是⼀回事。

在这里插入图片描述

5. 常见锁概念

5.1 死锁

  • 死锁是指在⼀组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所占用不会释放的资源⽽处于的⼀种永久等待状态。
  • 为了⽅便表述,假设现在线程A,线程B必须同时持有锁1和锁2,才能进⾏后续资源的访问

在这里插入图片描述

申请⼀把锁是原⼦的,但是申请两把锁就不⼀定了

在这里插入图片描述

5.2 死锁四个必要条件

  • 互斥条件:⼀个资源每次只能被⼀个执⾏流使⽤
  • 请求与保持条件:⼀个执⾏流因请求资源⽽阻塞时,对已获得的资源保持不放

在这里插入图片描述

  • 不剥夺条件:⼀个执⾏流已获得的资源,在末使⽤完之前,不能强⾏剥夺

在这里插入图片描述

  • 循环等待条件:若⼲执⾏流之间形成⼀种头尾相接的循环等待资源的关系

在这里插入图片描述

5.3 避免死锁

破坏死锁的四个必要条件

  • 破坏循环等待条件问题:资源⼀次性分配,使⽤超时机制、加锁顺序⼀致
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
#include <unistd.h>

//定义两个共享资源(整数变量)和两个互斥锁
int shared_resource1 = 0;
int shared_resource2 = 0;
std::mutex mtx1, mtx2;

//⼀个函数,同时访问两个共享资源
void access_shared_resources()
{
    std::unique_lock<std::mutex> lock1(mtx1, std::defer_lock);
    std::unique_lock<std::mutex> lock2(mtx2, std::defer_lock);
    
    // 使⽤std::lock同时锁定两个互斥锁
    std::lock(lock1, lock2);

    // 现在两个互斥锁都已锁定,可以安全地访问共享资源
    int cnt = 10000;
    while (cnt)
    {
        ++shared_resource1;
        ++shared_resource2;
        cnt--;
    }

    // 当离开access_shared_resources的作⽤域时,lock1和lock2的析构函数会被⾃动调⽤
    // 这会导致它们各⾃的互斥量被⾃动解锁
}

//模拟多线程同时访问共享资源的场景
void simulate_concurrent_access()
{
    std::vector<std::thread> threads;

    //创建多个线程来模拟并发访问
    for (int i = 0; i < 10; ++i)
    {
        threads.emplace_back(access_shared_resources);
    }

    //等待所有线程完成
    for (auto &thread : threads)
    {
        thread.join();
    }

    // 输出共享资源的最终状态
    std::cout << "Shared Resource 1: " << shared_resource1 << std::endl;
    std::cout << "Shared Resource 2: " << shared_resource2 << std::endl;
}

int main()
{
    simulate_concurrent_access();
    return 0;
}

$ ./a.out   // 不⼀次申请
Shared Resource 1: 94416
Shared Resource 2: 94536

$ ./a.out  // ⼀次申请 
Shared Resource 1: 100000
Shared Resource 2: 100000
  • 避免锁未释放的场景

6. STL、智能指针和线程安全

6.1 STL中的容器是否是线程安全的?

不是.

原因是,STL的设计初衷是将性能挖掘到极致,⽽⼀旦涉及到加锁保证线程安全,会对性能造成巨⼤的影响.

⽽且对于不同的容器,加锁⽅式的不同,性能可能也不同(例如hash表的锁表和锁桶).

因此STL默认不是线程安全.如果需要在多线程环境下使⽤,往往需要调⽤者⾃⾏保证线程安全.

6.2 智能指针是否是线程安全的?

对于unique_ptr,由于只是在当前代码块范围内⽣效,因此不涉及线程安全问题.

对于shared_ptr,多个对象需要共⽤⼀个引⽤计数变量,所以会存在线程安全问题.但是标准库实现的时候考虑到了这个问题,基于原⼦操作(CAS)的⽅式保证shared_ptr能够⾼效,原⼦的操作引⽤计数.

7. 其他常⻅的各种锁

  • 悲观锁:在每次取数据时,总是担⼼数据会被其他线程修改,所以会在取数据前先加锁(读锁,写锁,⾏锁等),当其他线程想要访问数据时,被阻塞挂起。
  • 乐观锁:每次取数据时候,总是乐观的认为数据不会被其他线程修改,因此不上锁。但是在更新数据前,会判断其他数据在更新前有没有对数据进⾏修改。主要采⽤两种⽅式:版本号机制和CAS操作。
  • CAS操作:当需要更新数据时,判断当前内存值和之前取得的值是否相等。如果相等则⽤新值更新。若不等则失败,失败则重试,⼀般是⼀个⾃旋的过程,即不断重试。
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐