目录

一.日志与策略模式

一).日志认识

二)自定义日志系统的实现

二.线程池设计

三.线程安全的单例模式

一).单例模式概念

二).设计原理

三).饿汉实方式和懒汉实现方式

1.饿汉方式实现单例模式

2.懒汉方式实现单例模式

3.单例模式线程池



一.日志与策略模式

一).日志认识

计算机中的日志是记录系统和软件运行中发生事件的文件,主要作用是监控运行状态,记录异常信息,帮助快速定位问题并支持程序员进行问题修复。是系统维护,故障排除和安全管理的重要工具。

日志有现有的解决方案,如:spdlog,glog,Boost.Log,Log4cxx等等。

下面实现一个自定义类型的日志系统,日志的格式有以下指标:时间,日志等级,进程pid,输出日志的文件,行号,日志内容

下列是期望输出的日志格式:

二)自定义日志系统的实现

由于实现的日志系统要支持多线程程序日志的有序打印,所以不管在访问显示器还是访问文件的时候都需要通过加锁来维护线程之间的互斥关系

这里对 Linux 系统中的 pthread 库中的互斥锁进行面向对象的封装:

//Mutex.hpp

#pragma once
#include <iostream>
#include <pthread.h>

namespace MutexModule
{
    class Mutex
    {
    public:
        Mutex()
        {
            int n = pthread_mutex_init(&_mutex, nullptr);
            (void)n;
        }
        ~Mutex()
        {
            int n = pthread_mutex_destroy(&_mutex);
            (void)n;
        }
        void Lock()
        {
            int n = pthread_mutex_lock(&_mutex);
            (void)n;
        }
        void Unlock()
        {
            int n = pthread_mutex_unlock(&_mutex);
            (void)n;
        }
        pthread_mutex_t* Get()
        {
            return &_mutex;
        }

    private:
        pthread_mutex_t _mutex;
    };
    //RAII风格
    class LockGuard
    {
    public:
        LockGuard(Mutex &mutex)
            : _mutex(mutex)
        {
            _mutex.Lock();
        }
        ~LockGuard(){
            _mutex.Unlock();
        }
    private:
        Mutex &_mutex;
    };
}
//Log.hpp

#include <iostream>
#include <cstdio>
#include <string>
#include <filesystem> //C++17
#include <sstream>
#include <fstream>
#include <ctime>
#include <memory>
#include <unistd.h>
#include "Mutex.hpp"

namespace LogModule
{
    using namespace MutexModule;

    const std::string gsep = "\r\n";
    // 策略模式 -- 利用C++的多态特性
    // 1. 刷新策略 a: 向显示器打印 b: 向文件中写入

    // 刷新策略基类
    class LogStrategy
    {
    public:
        virtual ~LogStrategy() = default;
        virtual void SyncLog(const std::string &message) = 0;
    };

    // 显示器打印日志的策略
    class ConsoleLogStrategy : public LogStrategy
    {
    public:
        ConsoleLogStrategy() {}
        ~ConsoleLogStrategy() {}

        void SyncLog(const std::string &message) override
        {
            // 加锁使多线程原子性的访问显示器
            LockGuard lockGuard(_mutex);
            std::cout << message << gsep;
        }

    private:
        Mutex _mutex;
    };

    // 默认的日志文件路径和日志文件名
    const std::string defaultPath = "./log";
    const std::string defaultFile = "my.log";

    // 文件打印日志策略
    class FileLogStrategy : public LogStrategy
    {
    public:
        FileLogStrategy(const std::string &path = defaultPath, const std::string &file = defaultFile)
            : _path(path), _file(file)
        {
            // 加锁使多线程原子性的访问文件
            LockGuard lockGuard(_mutex);
            // 判断目录是否存在
            if (std::filesystem::exists(_path)) // 检测文件系统对象(文件,目录,符号链接等)是否存在
            {
                return;
            }
            try
            {
                // 如果目录不存在,递归创建目录
                std::filesystem::create_directories(_path);
            }
            catch (const std::filesystem::filesystem_error &e) // 如果创建失败则打印异常信息
            {
                std::cerr << e.what() << '\n';
            }
        }
        ~FileLogStrategy() {}

        void SyncLog(const std::string &message) override
        {
            LockGuard lockGuard(_mutex);
            // 追加方式向文件中写入
            std::string fileName = _path + (_path.back() == '/' ? "" : "/") + _file;
            // std::ofstream是C++标准库中用于输出到文件的流类,主要用于将数据写入文件
            std::ofstream out(fileName, std::ios::app);
            if (!out.is_open())
            {
                return;
            }
            out << message << gsep;
            out.close();
        }

    private:
        std::string _path; // 日志文件所在路径
        std::string _file; // 日志文件本身
        Mutex _mutex;
    };

    // 2. 形成完整日志并刷新到指定位置
    // 2.1 日志等级
    enum class LogLevel
    {
        DEBUG,
        INFO,
        WARNING,
        ERROR,
        FATAL
    };

    // 2.2 枚举类型的日志等级转换为字符串类型
    std::string Level2Str(LogLevel level)
    {
        switch (level)
        {
        case LogLevel::DEBUG:
            return "DEBUG";
        case LogLevel::INFO:
            return "INFO";
        case LogLevel::WARNING:
            return "WARNING";
        case LogLevel::ERROR:
            return "ERROR";
        case LogLevel::FATAL:
            return "FATAL";
        default:
            return "UNKNOWN";
        }
    }

    // 2.3 获取当前时间的函数
    std::string GetCurTime()
    {
        // time 函数参数为一个time_t类型的指针,若该指针不为NULL,会把获取到的当前时间值存储在指针指向的对象中
        // 若传入为NULL,则仅返回当前时间,返回从1970年1月1日0点到目前的秒数
        time_t cur = time(nullptr);
        struct tm curTm;
        // localtime_r是localtime的可重入版本,主要用于将time_t类型表示的时间转换为本地时间,存储在struct tm 结构体中
        localtime_r(&cur, &curTm);
        char timeBuffer[128];
        snprintf(timeBuffer, sizeof(timeBuffer), "%4d-%02d-%02d %02d:%02d:%02d",
                 curTm.tm_year + 1900,
                 curTm.tm_mon + 1,
                 curTm.tm_mday,
                 curTm.tm_hour,
                 curTm.tm_min,
                 curTm.tm_sec);
        return timeBuffer;
    }

    // 2.4 日志形成并刷新
    class Logger
    {
    public:
        // 默认刷新到显示器上
        Logger()
        {
            EnableConsoleLogStrategy();
        }
        ~Logger()
        {
        }
        void EnableConsoleLogStrategy()
        {
            // std::make_unique用于创建并返回一个std::unique_ptr对象
            _fflushStrategy = std::make_unique<ConsoleLogStrategy>();
        }

        void EnableFileLogStrategy()
        {
            _fflushStrategy = std::make_unique<FileLogStrategy>();
        }

        //  内部类默认是外部类的友元类,可以访问外部类的私有成员变量
        //  内部类LogMessage,表示一条日志信息的类
        class LogMessage
        {
        public:
            LogMessage(LogLevel &level, std::string &srcName, int lineNum, Logger &logger)
                : _curTime(GetCurTime()),
                  _level(level),
                  _pid(getpid()),
                  _srcName(srcName),
                  _lineNum(lineNum),
                  _logger(logger)
            {
                // 日志的基本信息合并起来
                // std::stringstream用于在内存中进行字符串的输入输出操作, 提供一种方便的方式处理字符串
                // 将不同类型的数据转换为字符串,也可以将字符串解析为不同类型的数据
                std::stringstream ss;
                ss << "[" << _curTime << "] "
                   << "[" << Level2Str(_level) << "] "
                   << "[" << _pid << "] "
                   << "[" << _srcName << "] "
                   << "[" << _lineNum << "] "
                   << "- ";

                _logInfo = ss.str();
            }

            ~LogMessage()
            {
                if (_logger._fflushStrategy)
                {
                    _logger._fflushStrategy->SyncLog(_logInfo);
                }
            }

            //  使用模板重载运算符<< -- 支持不同数据类型的输出运算符重载
            template <typename T>
            LogMessage &operator<<(const T &info)
            {
                std::stringstream ss;
                ss << info;
                _logInfo += ss.str();
                return *this;
            }

        private:
            std::string _curTime; // 日志时间
            LogLevel _level;      // 日志等级
            pid_t _pid;           // 进程pid
            std::string _srcName; // 输出日志的文件名
            int _lineNum;         // 输出日志的行号
            std::string _logInfo; // 完整日志内容
            Logger &_logger;      // 方便使用策略进行刷新
        };
        // 使用宏进行替换之后调用的形式如下
        // logger(level, __FILE__, __LINE__) << "hello world" << 3.14;
        // 这里使用仿函数的形式,调用LogMessage的构造函数,构造一个匿名的LogMessage对象
        // 返回的LogMessage对象是一个临时对象,它的生命周期从创建开始到包含它的完整表达式结束(可以简单理解为包含
        // 这个对象的该行代码)
        // 代码调用结束的时候,如果没有LogMessage对象进行临时对象的接收,则会调用析构函数,
        // 如果有LogMessage对象进行临时对象的接收,会调用拷贝构造或者移动构造构造一个对象,并析构临时对象
        // 所以通过临时变量调用析构函数进行日志的打印
        LogMessage operator()(LogLevel level, std::string name, int line)
        {
            return LogMessage(level, name, line, *this);
        }

    private:
        std::unique_ptr<LogStrategy> _fflushStrategy;
    };

     //  定义一个全局的Logger对象
    Logger logger;
 
    // 使用宏定义,简化用户操作并且获取文件名和行号
    #define LOG(level) logger(level, __FILE__, __LINE__) // 使用仿函数的方式进行调用
    #define Enable_Console_Log_Strategy() logger.EnableConsoleLogStrategy()
    #define Enable_File_Log_Strategy() logger.EnableFileLogStrategy()
}

 显示器打印如下:

文件打印如下:


二.线程池设计

线程池:

一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。

线程池的种类

  1. 创建固定数量线程池,循环从任务队列中获取任务对象,获取到任务对象后,执行任务对象中的任务接口
  2. 浮动线程池,其他同上

此处,我们选择固定线程个数的线程池。

这里模拟创建一个固定线程数量的线程池,循环从任务队列中获取任务对象,获取到任务对象后,执行任务对象中的任务。下列线程,日志,条件变量,互斥量等的封装参考前面的博客

//ThreadPool.hpp

#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <queue>
#include "Log.hpp"
#include "Thread.hpp"
#include "Cond.hpp"
#include "Mutex.hpp"

namespace ThreadPoolModule
{
    using namespace ThreadModule;
    using namespace LogModule;
    using namespace CondModule;
    using namespace MutexModule;

    static const int gnum = 5;
    template <typename T>
    class ThreadPool
    {
    private:
        void WakeUpAllThread()
        {
            if (_sleep_num)
                _cond.Broadcast();

            LOG(LogLevel::DEBUG) << "唤醒所有休眠线程";
        }

        void WakeOne()
        {
            _cond.Signal();
            LOG(LogLevel::INFO) << "唤醒一个休眠的线程";
        }

    public:
        ThreadPool(int num = gnum)
            : _num(num), _isrunning(false), _sleep_num(0)
        {
            for (int i = 0; i < _num; i++)
            {
                _threads.emplace_back([this]()
                                      { HandlerTask(); }); // 调用线程的构造函数,线程的构造函数形参是一个回调函数
            }
        }
        ~ThreadPool() {}

        void HandlerTask()
        {
            char name[64];
            pthread_getname_np(pthread_self(), name, sizeof(name));
            while (true)
            {
                T t;
                // 处理任务
                {
                    LockGuard LockGuard(_mutex);
                    // 1. 队列为空,线程池没有退出,进行休眠
                    while (_taskq.empty() && _isrunning)
                    {
                        _sleep_num++;
                        LOG(LogLevel::INFO) << name << " 进入休眠";
                        _cond.Wait(_mutex);
                        _sleep_num--;
                    }
                    // 2. 任务为空,线程池退出,则该线程退出
                    if (!_isrunning && _taskq.empty())
                    {
                        LOG(LogLevel::INFO) << name << " 退出, 因为线程池退出&&任务队列为空";
                        break;
                    }

                    // 3. 获取任务
                    t = _taskq.front();
                    _taskq.pop();
                }
                // 4. 处理任务
                t();
            }
        }

        bool Enqueue(const T &in)
        {
            if (_isrunning) // 如果线程池停止,则停止入任务
            {
                LockGuard lockGuard(_mutex);
                _taskq.push(in);
                if (_threads.size() == _sleep_num)
                {
                    WakeOne(); // 如果全部线程都在休眠,则唤醒一个线程
                    return true;
                }
            }
            return false;
        }

        void Start()
        {
            if (_isrunning)
                return;
            _isrunning = true;
            for (auto &thread : _threads)
            {
                thread.Start();
            }
        }

        void Stop()
        {
            // 1. 将运行标志位置为false
            LockGuard lockGuard(_mutex);
            if (!_isrunning)
                return;
            _isrunning = false;

            // 2. 唤醒休眠的线程,然后再HandlerTask中进行退出
            WakeUpAllThread();
        }

        void Join()
        {
            for (auto &thread : _threads)
            {
                thread.Join();
                LOG(LogLevel::INFO) << thread.GetName() << " 被Join";
            }
        }

    private:
        std::vector<Thread> _threads;
        int _num;
        std::queue<T> _taskq;
        Cond _cond;
        Mutex _mutex;
        bool _isrunning;
        int _sleep_num;
    };
}
//Task.hpp
 
#pragma once
 
#include <iostream>
#include <functional>
#include <unistd.h>
#include "Log.hpp"
 
using namespace LogModule;
using task_t = std::function<void()>;
 
void Download()
{
    LOG(LogLevel::DEBUG) << "I am a download task...";
}
// Main.cc
#include "ThreadPool.hpp"
#include "Task.hpp"
 
using namespace LogModule;
using namespace ThreadPoolModule;
int main()
{
    Enable_Console_Log_Strategy();
    ThreadPool<task_t> *tp = new ThreadPool<task_t>();
    tp->Start();
    sleep(2);
    int count = 10;
    while (count)
    {
        tp->Enqueue(Download);
        sleep(1);
        count--;
    }
    tp->Stop();
    tp->Join();
    return 0;
}

在主程序中,主线程循环向任务队列中放入下载任务,然后唤醒线程池中休眠的线程进行任务处理。在处理完之后停止线程池,然后对线程池中的线程进行回收。

这里因为任务处理的过程时间很短,所以在下一个任务进入的时候,上一个处理任务的线程处理完任务就进行了休眠,导致打印出来的日志是唤醒一个线程,处理一个任务。当处理任务的时间消耗较长时,线程进入休眠的概率就会下降。 


三.线程安全的单例模式

一).单例模式概念

单例模式确保一个类只有一个实例存在,同时提供一个全局访问点,让其他代码可以方便地获取到这个唯一实例。在很多情况下,我们只需要一个实例来协调系统中的各项操作,比如配置文件管理、数据库连接池、日志记录器等,使用单例模式可以避免多个实例带来的资源浪费和数据不一致问题例如:一个男人只能有一个媳妇。

二).设计原理

  1. 私有构造函数:单例类的构造函数被设置为私有,防止了外部代码通过 new 关键字来创建该类的实例。
  2. 静态实例变量:在单例类内部定义一个静态的实例变量,用于保存该类的唯一实例。
  3. 静态访问方法:提供一个静态的公共方法,用于获取该类的唯一实例。在这个方法中,会检查实例是否已经创建,如果未创建则创建一个新实例,若已创建则直接返回该实例。

三).饿汉实方式和懒汉实现方式

洗碗的子:

吃完饭, 立刻洗碗, 这种就是饿方式. 因为下一顿吃的时候可以立刻拿着碗就能吃饭.。

吃完饭, 先把碗放下, 然后下一顿饭用到这个碗了再洗碗, 就是懒汉方式

1.饿汉方式实现单例模式

饿汉式单例在类加载时就创建了实例,因此它是线程安全的。但是如果这个实例在程序运行中没有进行使用,就会造成资源的浪费

template <typename T>
class Singleton
{
    static T data;

public:
    static T *GetInstance()
    {
        return &data;
    }
};

只要通过 Singleton 这个包装类来使用 T 对象, 则一个进程中只有一个 T 对象的实例。

2.懒汉方式实现单例模式

懒汉式单例在第一次调用静态访问方法时才创建实例,避免了不必要的资源浪费。不过在多线程环境下,这种实现方式不是线程安全的,可能会创建多个实例。需要通过使用互斥量确保在多线程环境下只会创建一个实例懒汉方式最核心的思想是“延时加载”,从而能够优化服务器的启动速度。

template <typename T>
class Singleton
{
    static T *inst;

public:
    static T *GetInstance()
    {
        if (inst == NULL)
        {
            inst = new T();
        }       
            return inst;
    }
};

但存在一个严重的问题, 线程不安全。

第一次调用GetInstance 的时候, 如果两个线程同时调用, 可能会创建出两份 T 对象的实例. 但是后续再次调用, 就没有问题了。

// 懒汉模式, 线程安全
template <typename T>
class Singleton
{
    volatile static T *inst; // 需要设置 volatile 关键字, 否则可能被编译器优化.
    static std::mutex lock;

public:
    static T *GetInstance()
    {
        if (inst == NULL)
        {                // 双重判定空指针, 降低锁冲突的概率, 提⾼性能.
            lock.lock(); // 使⽤互斥锁, 保证多线程情况下也只调⽤⼀次 new.
            if (inst == NULL)
            {
                inst = new T();
            }          
                lock.unlock();
        }     
            return inst;
    }
};

注意事项:

  1. 加锁解锁的位置
  2. 双重 if 判定, 避免不必要的锁竞争
  3. volatile关键字防止过度优化

3.单例模式线程池

下列的实现增加了一个 _stop_flag 标记位,在调用 Stop() 时置为1,防止后续在调用其他函数获取单例的时候,调用 Start() 函数重新将 _isrunning 标记位置为 true。在 Start() 中对_stop_flag 标记位进行判断,如果为真,则再次获取单例的时候,不在将 _isrunning 标记位置为true.

// 饿汉式单例模式线程池
#pragma once
 
#include <iostream>
#include <string>
#include <vector>
#include <queue>
#include "Log.hpp"
#include "Thread.hpp"
#include "Cond.hpp"
#include "Mutex.hpp"
 
namespace ThreadPoolModule
{
    using namespace ThreadModule;
    using namespace LogModule;
    using namespace CondModule;
    using namespace MutexModule;
 
    static const int gnum = 5;  // 使用全局变量来表示一个线程池默认的线程数量
 
    template <typename T>   // 使用模版的方式使线程池支持多类型的任务
    class ThreadPool
    {
    private:
        void WakeUpAllThread()
        {
            if (_sleep_num)
                _cond.Broadcast();
 
            LOG(LogLevel::DEBUG) << "唤醒所有休眠线程";
        }
 
        void WakeOne()
        {
            _cond.Signal();
            LOG(LogLevel::INFO) << "唤醒一个休眠的线程";
        }
 
        // 私有化构造函数
        ThreadPool(int num = gnum) 
            : _num(num),
              _isrunning(false),
              _sleep_num(0),
              _stop_flag(false)
        {
            for (int i = 0; i < _num; i++)
            {
                _threads.emplace_back([this]()
                                      { HandlerTask(); }); // 调用线程的构造函数,线程的构造函数形参是一个回调函数
            }
        }
 
        void Start()
        {
            if (_isrunning)
                return;
            if (_stop_flag)
                return;
            _isrunning = true;
            for (auto &thread : _threads)
            {
                thread.Start();
            }
        }
 
        // 禁用拷贝构造和赋值运算符
        ThreadPool(const ThreadPool<T> &) = delete;
        ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;
 
    public:
        static ThreadPool<T> *GetInstance()
        {
            inc.Start();
            return &inc;
        }
 
        void HandlerTask()
        {
            char name[64];
            pthread_getname_np(pthread_self(), name, sizeof(name));
            while (true)
            {
                T t;
                // 处理任务
                {
                    LockGuard lockGuard(_mutex);
                    // 1. 队列为空,线程池没有退出,进行休眠
                    while (_taskq.empty() && _isrunning)
                    {
                        _sleep_num++;
                        LOG(LogLevel::INFO) << name << " 进入休眠";
                        _cond.Wait(_mutex);
                        _sleep_num--;
                    }
                    // 2. 任务为空,线程池退出,则该线程退出
                    if (!_isrunning && _taskq.empty())
                    {
                        LOG(LogLevel::INFO) << name << " 退出, 因为线程池退出&&任务队列为空";
                        break;
                    }
 
                    // 3. 获取任务
                    t = _taskq.front();
                    _taskq.pop();
                }
                t(); // 4. 处理任务
                // LOG(LogLevel::DEBUG) << name << " is running";
            }
        }
 
        bool Enqueue(const T &in)
        {
            if (_isrunning) // 如果线程池停止,则停止入任务
            {
                LockGuard lockGuard(_mutex);
                _taskq.push(in);
                if (_threads.size() == _sleep_num)  // 如果全部线程都在休眠,则唤醒一个线程
                    WakeOne();
                return true;
            }
            return false;
        }
 
        void Stop()
        {
            // 1. 将运行标志位置为false
            LockGuard lockGuard(_mutex);
            if (!_isrunning)
                return;
            _isrunning = false;
            _stop_flag = true;
 
            // 2. 唤醒休眠的线程,然后再HandlerTask中进行退出
            WakeUpAllThread();
        }
 
        void Join()
        {
            for (auto &thread : _threads)
            {
                thread.Join();
                LOG(LogLevel::INFO) << thread.GetName() << " 被Join";
            }
        }
 
        ~ThreadPool()
        {
        }
 
    private:
        std::vector<Thread> _threads;
        int _num; // 线程数量
        std::queue<T> _taskq;   // 任务队列
        Cond _cond;
        Mutex _mutex;
        bool _isrunning;
        int _sleep_num;
        int _stop_flag;
 
        static ThreadPool<T> inc;  // 单例
    };
 
    template<typename T>
    ThreadPool<T> ThreadPool<T>::inc;    // 静态成员变量需要在类外进行初始化
}

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐