线程池实现

线程池源码可在采用C++11实现的线程池功能资源-CSDN下载

      目的使用C++ 11 实现简易的线程池功能。在这之前需要了解下C++ 信号量知识,信号量 std::condition_variable 是实现线程同步的方法。其相当于线程的“闹钟 + 休息室”。可避免循环浪费CPU资源,实现线程间精准等待/环形,同时可用于生产者-消费者模型:

  • 一个线程往队列放数据(生产者)
  • 一个线程从队列取数据(消费者)
  • 没数据时,消费者不能死循环疯狂检查,要睡觉等通知
  • 有数据时,生产者叫醒消费者

std::condition_variable实现通知线程,有如下方法可实现:

notify_one:实现通知一个线程

#include <mutex>
#include <vector>
#include <iostream>
#include <thread>

std::mutex m_mutex;
std::condition_variable my_cond;
std::vector<int> m_msgRecvQueue;



void outMsgRecvQueue() {
  int command = 0;
  while (true)
  {
    std::unique_lock<std::mutex> lock(m_mutex);
    my_cond.wait(lock, [&]() {    
      if (!m_msgRecvQueue.empty())//解决虚假唤醒问题
        return true;
      return false;
      });
    command = m_msgRecvQueue.back();
    m_msgRecvQueue.pop_back();
    lock.unlock();
    std::cout << "outMsgRecvQueue()执行,取出一个元素 " << command<<"=====" << std::endl;
  }
}

void inMsgRecvQueue() {
  for (int i = 0; i < 10; i++)
  {
    std::cout << "inMsgRecvQueue()执行,插入一个元素 " << i << "=====" << std::endl;
    std::unique_lock<std::mutex> lock(m_mutex);
    m_msgRecvQueue.emplace_back(i);
    my_cond.notify_one();
  }
}

int main() {

  std::thread t1(outMsgRecvQueue);
  std::thread t2(inMsgRecvQueue);
  std::thread t3(outMsgRecvQueue);
  std::thread t4(inMsgRecvQueue);
  t1.join();
  t2.join();
  t3.join();
  t4.join();

  return 0;
}

上述代码中实现的是:

1、线程启动后outMsgRecvQueue函数执行,此时进入while循环,while循环内部先加锁,接着执行到my_cond.wait,这个根据wait有不同的参数存在不同的执行方法:

  • 如果wait第二个参数的lambda表达式返回的是true,wait直接返回;
  • 如果wait第二个参数的lambda表达式返回的是false,那么wait将解锁互斥量并堵塞这行,堵塞的截止由另外一个线程调用notify_one()通知为止;
  • 如果wait()不用第二个参数,那跟第二个参数为lambda表达式并且返回false效果一样(等待另外一个线程调用notify_one()通知结束堵塞)。

2、inMsgRecvQueue执行往m_msgRecvQueue集合添加数据,为了线程安全,添加了互斥量,在添加完了元素后, 执行 my_cond.notify_one() 唤醒沉睡的线程。

notify_all:实现通知所有等待线程

demo与notify_once一样,只是在inMsgRecvQueue函数中,把my_cond.notify_one()改成my_cond.notify_all();即可。他的执行其实与notify_once一样,每次也只是一个线程获取得到锁。

std::packaged_task:

把函数/可调用对象打包成任务,自动分配好std::future,通过返回的future得到任务的结果。

int mythread(int i){};
std::packaged_task<int(int)> mypt(mythread);
std::thread t1(std::ref(mypt),1);
t1.join();
std::future<int> result = mypt.get_future();
std::cout<<result.get()<<std::endl;

std::unique_lock

unique_lock是一个类模板,它的功能与lock_guard类似,但是比 lock_guard 更 灵 活 。他有unlock方法,意味着可以随时解锁。降低锁的颗粒度。

atomic

为原子操作,是个模板类,支持基本数据类型,实现无锁操作的同时实现线程同步。

std::enable_if<条件,返回值类型>

std::enable_if目的是控制函数是否生效,实现模板函数重载。其原理是若条件为true,那么生成合法的类型,否则,直接让这个函数不参与编译。在实现的线程池中,使用了std::enable_if,目的是根据传入函数不同类型,让编译器选择不同的类型的函数,具体实现代码如下:

    //有参数且有std::future返回值走这个方法
    template<typename F, typename... Arg>
    typename std::enable_if<(sizeof...(Arg) > 0), std::future<decltype(declval<F>()(declval<Arg>()...))>>::type
      Commit(F&& f, Arg&&...args)
    {
      if (!m_run)
        throw runtime_error("commit on ThreadPool is stopped.");
      using RetType = decltype(f(args...));
      auto task = std::make_shared<std::packaged_task<RetType()>>(
        std::bind(std::forward<F>(f), std::forward<Arg>(args)...)
        );

      std::future<RetType> future = task->get_future();
      std::lock_guard<std::mutex> lock(m_mutex);
      m_taskList.emplace([task]() {
        (*task)();
        });
      m_task_cv.notify_one();
      return future;
    };
    //无参且有std::future返回值走这个方法
    template<typename F, typename... Arg>
    typename std::enable_if<(sizeof...(Arg) <= 0)&&!(std::is_void<decltype(std::declval<F>()())>::value),
      std::future<decltype(declval<F>()(declval<Arg>()...))>>::type
      Commit( F&& f, Arg&&...args)
    {
      if (!m_run)
        throw runtime_error("commit on ThreadPool is stopped.");
      using RetType = decltype(f());
      auto task = std::make_shared<
        std::packaged_task<decltype(f())()>
      >(std::forward<F>(f));
      std::future<RetType> future = task->get_future();
      std::lock_guard<std::mutex> lock(m_mutex);
      m_taskList.emplace([task]() {
        (*task)();
        });
      m_task_cv.notify_one();
      return future;
    }
    ////无参且无返回值走这个方法
    template<typename F, typename... Arg>
    typename std::enable_if<
      std::is_void<decltype(std::declval<F>()())>::value
    >::type
      Commit( F&& f, Arg&&... args) {
      if (!m_run)
        throw runtime_error("commit on ThreadPool is stopped.");
      auto task = std::make_shared<packaged_task<void()>>(std::bind(std::forward<F>(f), std::forward<Arg>(args)...));
      m_taskList.emplace(task);
      m_task_cv.notify_one();
    }

有了上述基础后,我们来实现线程池就相对简单了。在这里先阐述下线程池实现原理:

首先创建ThreadPool类,在类中设置默认最大线程池大小,防止线程数量过大,导致系统资源开发过渡导致线程池效率降低问题。

#define THREADPOOL_MAX_NUM 16//设置默认最大线程池大小

接着创建构造函数,采用的是有参数构造函数,设置默认线程个数4个,同时初始化m_avilable_thread_num。接着在构造函数内部调用AddThread方法,实现线程池的初始化。接下来,详细介绍AddThread方法的实现。

  class ThreadPool {
    private:
    unsigned int m_size;
    std::mutex m_mutex;
    std::vector<std::thread> m_pool;//线程池
    using Task = std::function<void()>;
    std::queue<Task> m_taskList;//存储任务
    atomic<bool> m_run{ true };//通知线程停止
    std::condition_variable m_task_cv;//信号量
    atomic<int> m_avilable_thread_num;//可用线程数量
  public:
    int GetAvilableThreadNum() const {
      return m_avilable_thread_num;
    };
  public:
    ThreadPool(unsigned int size = 4) :m_size(size), m_avilable_thread_num(0) {
      AddThread(size);
    };
    ~ThreadPool() {
      m_run = false;
      m_task_cv.notify_all(); 
      for (thread& thread : m_pool) {
        if (thread.joinable())
          thread.join(); 
      }
    }

AddThread实现如下:

void AddThread(unsigned int size) {
      for (; size > 0&&(m_pool.size() < THREADPOOL_MAX_NUM); size--) {
        m_pool.emplace_back([&]() {
          while (true)
          {
            std::unique_lock<std::mutex> lock(m_mutex);
            Task task;
            {
              m_task_cv.wait(lock, [&] {
                return !m_run || !m_taskList.empty();
                });
              if (!m_run || m_taskList.empty())
                break;
              m_avilable_thread_num--;
               task = std::move(m_taskList.front());
              m_taskList.pop();
            }
            lock.unlock();
            task();
            m_avilable_thread_num++;
          }
          });
          m_avilable_thread_num++;//创建线程的时候,任务为空,此时可用线程数量自增
      }
    };

工作原理:其入口参数为线程数量,方法体是为了实现创建线程,同时在存在任务的情况下,从m_taskList任务列表里面提取任务,执行任务。

详解:

1、for (; size > 0&&(m_pool.size() < THREADPOOL_MAX_NUM); size--) 根据传入的线程数量循环创建线程,同时限制线程数量,保证线程数量不超过THREADPOOL_MAX_NUM。m_pool字段,其声明类型为 std::vector<std::thread>,其emplace_back为将创建的线程句柄存入m_pool内,同时创建的线程交由cpu选择时机执行。

2、while循环内部,首先通过std::unique_lock<std::mutex> lock(m_mutex);上锁,当获取到锁后执行后续代码,否则将在此等待,直至获取到锁,目的是实现后从m_taskList提取任务,防止线程冲突。

3、m_task_cv.wait有两个参数,传入一个lock,第二个参数为lambda表达式,当lambda表达式返回true时,同时获得锁后,程序立马返回,但若lambda返回false,则m_task_cv将释放锁,等待其他线程触发notify_one/notify_all。

4、 if (!m_run || m_taskList.empty()) ,这部分代码目的是为了若任务列表为空或者线程池被回收的时候,能从打断while循环,逐步释放线程。

5、 lock.unlock(); 这部分代码目的是为了解锁线程,使 task();能实现多线程并发,也就是不受线程同步影响。

Commit方法详解:

    template<typename F, typename... Arg>
    typename std::enable_if<(sizeof...(Arg) > 0), 
    std::future<decltype(declval<F>()(declval<Arg>()...))>>::type
      Commit(F&& f, Arg&&...args)//有参数且有std::future返回值走这个方法
    {
      if (!m_run)
        throw runtime_error("commit on ThreadPool is stopped.");
      using RetType = decltype(f(args...));
      auto task = std::make_shared<std::packaged_task<RetType()>>(
        std::bind(std::forward<F>(f), std::forward<Arg>(args)...)
        );

      std::future<RetType> future = task->get_future();
      std::lock_guard<std::mutex> lock(m_mutex);
      m_taskList.emplace([task]() {
        (*task)();
        });
      m_task_cv.notify_one();
      return future;
    };
    
    template<typename F, typename... Arg>
    typename std::enable_if<(sizeof...(Arg) <= 0)&&!(std::is_void<decltype(std::declval<F>()())>::value),
      std::future<decltype(declval<F>()(declval<Arg>()...))>>::type
      Commit( F&& f, Arg&&...args)//无参且有std::future返回值走这个方法
    {
      if (!m_run)
        throw runtime_error("commit on ThreadPool is stopped.");
      using RetType = decltype(f());
      auto task = std::make_shared<
        std::packaged_task<decltype(f())()>
      >(std::forward<F>(f));
      std::future<RetType> future = task->get_future();
      std::lock_guard<std::mutex> lock(m_mutex);
      m_taskList.emplace([task]() {
        (*task)();
        });
      m_task_cv.notify_one();
      return future;
    }
    
    template<typename F, typename... Arg>
    typename std::enable_if<
      std::is_void<decltype(std::declval<F>()())>::value
    >::type////无参且无返回值走这个方法
      Commit( F&& f, Arg&&... args) {
      if (!m_run)
        throw runtime_error("commit on ThreadPool is stopped.");
      auto task = std::make_shared<packaged_task<void()>>(std::bind(std::forward<F>(f), std::forward<Arg>(args)...));
      m_taskList.emplace(task);
      m_task_cv.notify_one();
    }

上述代码为实现模板重载,当传入任务存在有参数且有std::future返回值时,编译器将选择方法1;当传入任务存在无参且有std::future返回值时,编译器将选择方法2;若传入任务无参且无返回值,编译器选择方法3。上述代码大同小异,就采用方法1的代码段详细介绍:

template<typename F, typename... Arg>
    typename std::enable_if<(sizeof...(Arg) > 0), 
    std::future<decltype(declval<F>()(declval<Arg>()...))>>::type
      Commit(F&& f, Arg&&...args)

1、首先针对std::enable_if部分代码介绍。std::enable_if<条件,返回值类型>,条件是(sizeof...(Arg) > 0),其表达的意思是获取参数的数量,若大于0,则编译器选择这个方法,同时返回值类型为 std::future<decltype(declval<F>()(declval<Arg>()...))>,decltype为获取f(args…)方法的返回值类型。std::declval<T>(),用于在编译期间创建T类型对象,但实际不会真的创建。declval<F>()为创建F函数对象,declval<Arg>()...为创建参数包对象,declval<F>()(declval<Arg>()...)相当于执行了f(args…)函数,decltype(declval<F>()(declval<Arg>()...))>则为获取执行的函数的返回值类型。

2、函数体内部, auto task = std::make_shared<std::packaged_task<RetType()>>( std::bind(std::forward<F>(f), std::forward<Arg>(args)...) ); 为包装一个函数体。

接下来,在main函数中测试线程池代码:

#include "ThreadPool.h"
#include <iostream>
void fun1(int slp)
{
  printf("  hello, fun1 !  %d\n", std::this_thread::get_id());
  if (slp > 0) {
    printf(" ======= fun1 sleep %d  =========  %d\n", slp, std::this_thread::get_id());
    std::this_thread::sleep_for(std::chrono::milliseconds(slp));
    //Sleep(slp );
  }
}

struct gfun {
  int operator()(int n) {
    printf("%d  hello, gfun !  %d\n", n, std::this_thread::get_id());
    return 42;
  }
};

class A {    
public:
  static int Afun(int n = 0) {
    std::cout << n << "  hello, Afun !  " << std::this_thread::get_id() << std::endl;
    return n;
  }

  static std::string Bfun(int n, std::string str, char c) {
    std::cout << n << "  hello, Bfun !  " << str.c_str() << "  " << (int)c << "  " << std::this_thread::get_id() << std::endl;
    return str;
  }
};

int main()
try {
  std::ThreadPool executor{ 50 };
  A a;
  std::future<void> ff = executor.Commit(fun1, 0);
  std::future<int> gg = executor.Commit(a.Afun, 9999); 
  std::future<std::string> gh = executor.Commit(A::Bfun, 9998, "mult args", 123);
  std::future<std::string> fh = executor.Commit(
    []()->
    std::string { 
    std::cout << "hello, fh !  " << std::this_thread::get_id() << std::endl;
    return "hello,fh ret !"; 
    });

  std::cout << " =======  sleep ========= " << std::this_thread::get_id() << std::endl;
  std::this_thread::sleep_for(std::chrono::microseconds(900));

  for (int i = 0; i < 50; i++) {
    executor.Commit(fun1, i * 100);
  }
  std::cout << " =======  commit all ========= " << std::this_thread::get_id() << " idlsize=" << executor.GetAvilableThreadNum() << std::endl;

  std::cout << " =======  sleep ========= " << std::this_thread::get_id() << std::endl;
  std::this_thread::sleep_for(std::chrono::seconds(3));


  std::cout << " =======  sleep ========= " << std::this_thread::get_id() << std::endl;
  std::this_thread::sleep_for(std::chrono::seconds(3));

  std::cout << " =======  fun1,55 ========= " << std::this_thread::get_id() << std::endl;
  executor.Commit(fun1, 55).get();    //调用.get()获取返回值会等待线程执行完

  std::cout << "end... " << std::this_thread::get_id() << std::endl;


  std::ThreadPool pool(4);
  std::vector< std::future<int> > results;

  for (int i = 0; i < 8; ++i) {
    results.emplace_back(
      pool.Commit([i] {
        std::cout << "hello " << i << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(1));
        std::cout << "world " << i << std::endl;
        return i * i;
        })
    );
  }
  std::cout << " =======  commit all2 ========= " << std::this_thread::get_id() << std::endl;

  for (auto&& result : results)
    std::cout << result.get() << ' ';
  std::cout << std::endl;
  return 0;
}
catch (std::exception& e) {
  std::cout << "some unhappy happened...  " << std::this_thread::get_id() << e.what() << std::endl;
}

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐