一、固定式线程池的概念

固定式线程池是指在创建时就确定好线程数量的线程池实现。池内维护一组预先创建好的工作线程,所有提交的任务不会立刻执行,而是放入一个任务队列中,由这些固定数量的线程依次取出并执行。

特点:

  • 线程数量固定
  • 任务队列通常是有界的,防止内存无线膨胀
  • 线程复用:任务执行完后,线程不会销毁,而是继续等待下一个任务
  • 生产者-消费者模式:调用方(生产者)提交任务,工作线程(消费者)执行任务

与“动态线程池”(CachedThreadPool)不同,固定式线程池不会根据任务量动态增减线程,更适合 CPU 密集型场景或需要严格控制并发度的系统。

二、为什么会设计出固定式线程池

直接在每次任务到来时 new std::thread 的做法有严重缺陷:

  • 创建/销毁线程开销大:每次创建线程都需要内核分配栈空间、上下文切换,销毁同样耗时。在高并发场景下,频繁创建线程会导致“线程爆炸”,系统瞬间生成成百上千个线程池,CPU、内存被耗尽。
  • 资源无法控制:操作系统对线程总数有限制(Linux默认几千个),超出后程序直接崩溃。
  • 性能不可预测:线程越多,上下文切换越频繁,整体吞吐量反而下降。

固定式线程池的解决方案是:

  • 提前创建固定数量的线程(通常设为CPU核心数),彻底消除创建/销毁开销。
  • 通过有界任务队列缓冲多余的任务。实现“生产者阻塞”或“队列满等待”,从而天然限流。
  • 资源可预测、可控:程序启动时就能知道最多占用多少线程和内存,适合服务器、游戏服务器、后台任务处理等生产环境。

三、代码设计

同步队列(SyncQueue):

#include<list>
#include<mutex>
#include<condition_variable>
#include<iostream>
using namespace std;
const int MaxTaskCount = 200;
template<class T>
class SyncQueue
{
private:
	std::list<T> m_queue;
	mutable std::mutex m_mutex;
	std::condition_variable m_notEmpty;
	std::condition_variable m_notFull;
	int m_maxSize;
	bool m_needStop;
	bool IsFull()const
	{
		bool full = m_queue.size() >= m_maxSize;
		if (full)
		{
			cout << "m_queue 已经满了,需要等待..." << endl;
		}
		return full;
	}
	bool IsEmpty()const
	{
		bool empty = m_queue.empty();
		if (empty)
		{
			cout << "m_queue 已经空了,需要等待..." << endl;
		}
		return empty;
	}
	template<class F>
	void Add(F&& task)
	{
		std::unique_lock<std::mutex> locker(m_mutex);
		m_notFull.wait(locker, [this] {return m_needStop || !IsFull();});
		if (m_needStop)
		{
			return;
		}
		m_queue.push_back(std::forward<F>(task));
		m_notEmpty.notify_one();
	}
public:
	SyncQueue(int maxsize) :m_maxSize(maxsize), m_needStop(false) {}
	~SyncQueue() {}
	void Put(const T& task)
	{
		Add(task);
	}
	void Put(T&& task)
	{
		Add(std::forward<T>(task));
	}
	void Take(std::list<T>& list)
	{
		std::unique_lock<std::mutex> locker(m_mutex);
		m_notEmpty.wait(locker, [this] {return m_needStop || !IsEmpty();});
		if (m_needStop)
		{
			return;
		}
		list = std::move(m_queue);
		m_notFull.notify_one();
	}
	void Take(T& task)
	{
		std::unique_lock<std::mutex> locker(m_mutex);
		m_notEmpty.wait(locker, [this] {return m_needStop || !IsEmpty();});
		if (m_needStop)
		{
			return;
		}
		task = m_queue.front();
		m_queue.pop_front();
		m_notFull.notify_one();
	}
	void Stop()
	{
		{
			std::lock_guard<std::mutex> locker(m_mutex);
			m_needStop = true;
		}
		m_notFull.notify_all();
		m_notEmpty.notify_all();
	}
	bool Empty() const
	{
		std::lock_guard<std::mutex> locker(m_mutex);
		return m_queue.empty();
	}
	bool Full() const
	{
		std::lock_guard<std::mutex> locker(m_mutex);
		return m_queue.size() >= m_maxSize;
	}
	size_t Size() const
	{
		std::lock_guard<std::mutex> locker(m_mutex);
		return m_queue.size();
	}
	size_t Count() const
	{
		return m_queue.size();
	}
};

同步队列框架讲解:

SyncQueue 是整个线程池的“心脏”,它实现了线程安全的生产者-消费者有界队列。

关键成员:

std::list<T> m_queue;                    // 实际存储任务(list 便于整体移动)
mutable std::mutex m_mutex;              // 保护队列和所有状态
std::condition_variable m_notEmpty;      // 消费者等待“非空”
std::condition_variable m_notFull;       // 生产者等待“非满”
int m_maxSize;                           // 队列上限
bool m_needStop;                         // 优雅停止标志

核心方法:

1.IsFull() / IsEmpty()

在 condition_variable::wait 的 lambda 中被调用。
这些函数在持有 mutex 的情况下被调用(wait 的 predicate 语义保证),所以里面直接访问 m_queue.size() 是安全的。
打印调试信息方便读者观察“队列满/空时阻塞”的行为。

2,Add模板+Put (生产者接口)

template<class F> void Add(F&& task)
{
    std::unique_lock<std::mutex> locker(m_mutex);
    m_notFull.wait(locker, [this]{ return m_needStop || !IsFull(); });
    if (m_needStop) return;
    m_queue.push_back(std::forward<F>(task));
    m_notEmpty.notify_one();   // 只唤醒一个消费者(效率最高)
}

完美转发支持左值/右值,零拷贝。
wait + predicate 防止虚假唤醒和队列已满。
m_needStop 判断让 Stop 后立即返回,避免析构时死锁。

3.Take(消费者接口)

Take(T& task):单任务取出(最常用)。
Take(std::list& list):一次性取出所有任务(批量消费,可扩展)。
同样使用 m_notEmpty.wait + predicate + if(m_needStop) return;。
取出后 m_notFull.notify_one() 唤醒生产者。

4.Stop() —— 优雅停止

void Stop()
{
    {
        std::lock_guard<std::mutex> locker(m_mutex);
        m_needStop = true;
    }
    m_notFull.notify_all();
    m_notEmpty.notify_all();
}

先在锁内设置标志,再在锁外 notify_all,避免通知丢失。
让正在 wait 的生产者和消费者立刻醒来并退出。

为什么这样设计?

双条件变量(notEmpty + notFull)是经典的有界缓冲区实现,能同时支持生产者阻塞和消费者阻塞。
m_needStop + predicate 是线程池优雅关闭的标准技巧,避免析构时线程还在 wait 导致程序 hang。
使用 std::list 而非 std::queue 是因为 Take(list) 可以 std::move 整个队列,性能极高(批量消费时优势明显)。
所有查询方法(Empty/Full/Size)都加锁。

固定式线程池(FixedThreadPool):

#include"SyncQueue.hpp"
#include<functional>
class FixedThreadPool
{
public:
	using Task = std::function<void(void)>;
private:
	std::list<std::shared_ptr<std::thread>> m_threadgroup;
	SyncQueue<Task> m_queue;
	std::atomic_bool m_running;
	std::once_flag m_flag;
	void Start(int numthreads)
	{
		m_running = true;
		for (int i = 0;i < numthreads;++i)
		{
			m_threadgroup.push_back(std::make_shared<std::thread>(&FixedThreadPool::RunInThread, this));
			//m_threadgroup.push_back(std::shared_ptr<std::thread>(new thread(&FixedThreadPool::RunInThread,this)))
		}
	}
	void RunInThread()
	{
		while (m_running)
		{
			Task task;
			m_queue.Take(task);
			if (m_running && task)
			{
				task();
			}
		}
	}
	void StopThreadGroup()
	{
		m_queue.Stop();
		m_running = false;
		for (auto& thread : m_threadgroup)
		{
			if (thread)
			{
				thread->join();
			}
		}
		m_threadgroup.clear();
	}
public:
	FixedThreadPool(int numThreads = std::thread::hardware_concurrency()) :m_queue(MaxTaskCount), m_running(false)
	{
		Start(numThreads);
	}
	~FixedThreadPool()
	{
		Stop();
	}
	void Stop()
	{
		std::call_once(m_flag, [this] {StopThreadGroup();});
	}
	void AddTask(Task&& task)
	{
		m_queue.Put(std::forward<Task>(task));
	}
	void AddTask(const Task& task)
	{
		m_queue.Put(task);
	}
};

固定式线程池框架讲解:

FixedThreadPool 完全建立在 SyncQueue 之上,职责清晰分离:

std::list<std::shared_ptr<std::thread>> m_threadgroup;  // 线程组(必须用 shared_ptr,因为 thread 不可复制)
SyncQueue<Task> m_queue;                                // 任务队列(MaxTaskCount=200)
std::atomic_bool m_running;                             // 原子运行标志
std::once_flag m_flag;                                  // 防止重复 Stop

关键方法:

Start(int numThreads):创建固定数量线程,每个线程跑 RunInThread。

RunInThread()(工作线程主循环):

while (m_running)
{
    Task task;
    m_queue.Take(task);               // 阻塞等待任务
    if (m_running && task)            // 双重检查!
        task();
}

if (m_running && task) 是防止 Stop 后还执行任务的防护。

StopThreadGroup():
m_queue.Stop() → 唤醒所有 Take/Put。
m_running = false。
join() 所有线程。
清空线程组。

Stop() 使用 std::call_once 保证只停止一次(析构和手动 Stop 都安全)。

AddTask 直接调用 m_queue.Put,支持完美转发。

为什么要这样设计:

职责分离:SyncQueue 只管“任务存储+同步”,FixedThreadPool 只管“线程生命周期”,代码清晰、可维护。
默认线程数 = hardware_concurrency():最合理的 CPU 密集型默认值。
shared_ptr + list 是 C++ 中存储动态线程组的标准做法(thread 不可复制、不可移动到 vector 后又 join)。
std::once_flag 解决“析构时重复 Stop”导致的二次 join 崩溃问题。
生产者(AddTask)可能阻塞(队列满),这正是固定线程池的“背压”机制,防止任务无限堆积。

测试代码

#include"FixedThreadPool.hpp"
#include<future>

FixedThreadPool pool(4);

std::mutex g_cout_mutex;

void Add(int a, int b, std::promise<int>& c_promise)
{
    cout << "add begin ..." << endl;
    std::this_thread::sleep_for(std::chrono::milliseconds(2000));
    int c = a + b;
    c_promise.set_value(c);
    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
    cout << "add end ..." << endl;
}
void add_a()
{
    std::promise<int> c_promise;
    std::future<int> a_future = c_promise.get_future();
    std::function<void(void)> f = std::bind(Add, 10, 20, std::ref(c_promise));
    pool.AddTask(f);

    {
        std::unique_lock<std::mutex> locker(g_cout_mutex);
        cout << "add_a:" << a_future.get() << endl;
    }
   
}
void add_b()
{
    std::promise<int> c_promise;
    std::future<int> a_future = c_promise.get_future();
    std::function<void(void)> f = std::bind(Add, 20, 30, std::ref(c_promise));
    pool.AddTask(f);
    {
        std::unique_lock<std::mutex> locker(g_cout_mutex);
        cout << "add_b:" << a_future.get() << endl;
    }
    
}
void add_c()
{
    std::promise<int> c_promise;
    std::future<int> a_future = c_promise.get_future();
    std::function<void(void)> f = std::bind(Add, 30, 40, std::ref(c_promise));
    pool.AddTask(f);
    {
        std::unique_lock<std::mutex> locker(g_cout_mutex);
        cout << "add_c:" << a_future.get() << endl;
    }
    
}
int main()
{
    std::thread tha(add_a);
    std::thread thb(add_b);
    std::thread thc(add_c);

    tha.join();
    thb.join();
    thc.join();
    return 0;
}

测试代码的目的:

  • 验证多生产者 + 多消费者并发:3 个线程(tha/thb/thc)同时调用 AddTask,池内 4 个工作线程同时执行。
  • 验证异步执行 + 结果返回:每个任务用 std::promise + std::future 把计算结果(a+b)传回主线程。future.get() 会阻塞调用方,直到工作线程 set_value。
  • 验证任务执行的耗时与交错:Add 函数里 sleep(2000ms) + sleep(1000ms),让 cout 输出 begin/end 明显交错,肉眼可见“任务在后台并行执行”。
  • 验证线程池在析构时的正确停止:main 结束时 ~FixedThreadPool 调用 Stop(),此时所有任务已完成(因为 get() 已返回),不会丢失任务。
  • 验证互斥输出:g_cout_mutex 保护打印 add_a:30 等结果,避免多线程 cout 乱序。
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐