C++11多线程与MySQL连接池详解
·
一、线程基础
1.1 thread默认构造函数
std::thread::thread() _NOEXCEPT {
_Thr_set_null(_Thr);
}
默认构造函数创建一个空线程对象,不关联任何执行线程。
1.2 thread带参数构造函数
explicit thread(Fn &&, Args &&...);
可变参数模板,可以传入不同的参数组合:
std::thread t1(func, arg1, arg2);
std::thread t2(&Class::memberFunc, instance, arg);
1.3 拷贝构造函数禁用
thread(const thread&) = delete; // 拷贝构造函数被禁用
不允许 thread t2 = t1;,线程对象不可拷贝,只能移动。
二、线程操作详解
2.1 基本操作函数
| 函数 | 说明 |
|---|---|
| get_id() | 获取线程ID |
| joinable() | 判断线程是否可join |
| join() | 等待线程结束并回收资源 |
| detach() | 将线程与主线程分离 |
2.2 join与detach的区别
join()特点:
- 调用join()后,线程不会再被调用
- 主线程会阻塞等待该线程结束
- join()完成后,线程资源被回收
std::thread t(func);
t.join(); // 主线程等待t结束
detach()特点:
- 分离后的线程称为后台线程
- 主线程退出时,其他线程可能未运行完就被迫退出
- detach后无法再管理该线程
- detach后不能调用join
std::thread t(func);
t.detach(); // 分离线程
// 之后无法再管理t
2.3 joinable()与move操作
std::thread t1(func);
std::thread t2 = std::move(t1); // t1的资源转移到t2
// 此时 t1.joinable() == false
// 对 t1 调用 join 会抛出异常
注意: 对线程调用move后,原线程对象变为空壳,不能再join。
三、线程封装与继承
3.1 子类继承模式
通过封装thread,实现业务逻辑与线程管理的分离:
class ZERO_Thread {
public:
void start() {
thread_ = std::thread(&ZERO_Thread::threadEntry, this);
}
protected:
virtual void run() = 0; // 子类实现具体业务
private:
void threadEntry() {
running_ = true;
try {
run(); // 调用子类的run函数
} catch (std::exception &ex) {
running_ = false;
throw ex;
} catch (...) {
running_ = false;
throw;
}
running_ = false;
}
std::thread thread_;
bool running_ = false;
};
流程图:
start()
|
v
thread_ = thread(&threadEntry)
|
v
threadEntry() [新线程]
|
v
running_ = true
|
v
try { run(); }
|
v
catch (...) [如有异常]
|
v
running_ = false
四、参数传递注意事项
4.1 传入引用需要ref()
void func(int &x) { x += 10; }
int a = 0;
std::thread t(func, std::ref(a)); // 必须用ref包装
t.join();
4.2 传入类成员函数
class A {
public:
void setName(const string &name) { name_ = name; }
void func4(int a) { /* ... */ }
};
A *a4_ptr = new A();
a4_ptr->setName("darren");
std::thread t4(&A::func4, a4_ptr, 10);
t4.join();
delete a4_ptr;
4.3 类成员函数重载的处理
// 指向void func4(int)的指针
std::thread t41((void(A::*)(int))A::func4, a4_ptr, 100);
// 指向int func4(string)的指针
std::thread t43((int(A::*)(string))A::func4, a4_ptr, "hello");
bind也支持重载:
std::bind((void(A::*)(int))A::func4, a4_ptr, 100);
五、互斥量与锁
5.1 mutex系列
| 类型 | 说明 |
|---|---|
| mutex | 基本互斥量 |
| timed_mutex | 带超时功能的互斥量 |
| recursive_mutex | 递归互斥量(可重入) |
| recursive_timed_mutex | 递归+超时 |
基本用法:
std::mutex mtx;
mtx.lock();
// 临界区代码
mtx.unlock();
if (mtx.try_lock()) { // 尝试获取锁
// ...
mtx.unlock();
}
5.2 lock_guard与unique_lock
lock_guard特点:
- 构造时自动加锁
- 析构时自动解锁
- 不能手动解锁
std::lock_guard<std::mutex> lck(mtx);
// 进入临界区
// 离开作用域自动解锁
unique_lock特点:
- 更灵活,可手动解锁
- 支持延迟加锁
- 支持条件变量
std::unique_lock<std::mutex> lck(mtx);
lck.unlock(); // 手动解锁
// ...
lck.lock(); // 再次加锁
// 支持 scoped_lock 等高级用法
对比表:
| 特性 | lock_guard | unique_lock |
|---|---|---|
| 自动加锁 | 是 | 是 |
| 自动解锁 | 是 | 是 |
| 手动解锁 | 否 | 是 |
| 条件变量 | 否 | 是 |
| 开销 | 较小 | 较大 |
六、条件变量
6.1 wait机制
死等(wait):
std::mutex mtx;
std::condition_variable cv;
std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck); // 一直等待,直到被唤醒
超时等待(wait_for):
auto status = cv.wait_for(lck, std::chrono::seconds(5));
if (status == std::cv_status::timeout) {
// 超时退出
} else {
// 被唤醒
}
6.2 唤醒操作
| 函数 | 说明 |
|---|---|
| notify_one | 唤醒一个等待的线程 |
| notify_all | 唤醒所有等待的线程 |
示例:
// 生产者
{
std::lock_guard<std::mutex> lck(mtx);
queue.push(data);
}
cv.notify_one(); // 唤醒一个消费者
// 消费者
std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck, [] { return !queue.empty(); }); // 虚假唤醒保护
auto data = queue.front();
queue.pop();
七、原子变量atomic
7.1 基本用法
#include <atomic>
std::atomic<int> count(0); // 初始化为0
count++; // 原子操作
count.fetch_add(1); // 显示原子加
count.store(10); // 原子写
int val = count.load(); // 原子读
特点:
- 无锁操作,保证原子性
- 适用于计数器、标志位等简单变量
- 比mutex效率更高
八、异步与future
8.1 std::async异步运行
std::future<int> f = std::async(std::launch::async, func, arg);
int result = f.get(); // 阻塞等待结果
8.2 std::packaged_task
将任务与future绑定:
std::packaged_task<int(int, int)> task(add);
std::future<int> f = task.get_future();
std::thread t(std::move(task), 1, 2);
t.join();
int result = f.get();
8.3 std::promise设置值
std::promise<int> p;
std::future<int> f = p.get_future();
p.set_value(42); // 设置值
int result = f.get(); // 获取
三者关系:
| 组件 | 作用 |
|---|---|
| std::async | 异步启动任务 |
| std::packaged_task | 绑定任务和future |
| std::promise | 主动设置值 |
九、function、bind与lambda
9.1 std::function保存函数
#include <functional>
std::function<int(int, int)> func1_;
func1_ = add; // 保存普通函数
func1_ = [](int a, int b) { return a + b; }; // 保存lambda
9.2 std::bind绑定函数
auto f = std::bind(add, 1, std::placeholders::_1);
int result = f(2); // 相当于add(1, 2)
注意:
- 占位符从
_1开始,不是_0 - 可以预先绑定部分参数
9.3 lambda表达式
auto f = [](int a, int b) -> int { return a + b; };
int result = f(1, 2);
十、可变模板参数
template<class... T>
void f(T... args) {
// sizeof...(args) 获取参数个数
}
十一、C++11线程池实现
11.1 核心结构
+------------------+
| ThreadPool |
+------------------+
| threads_: 4个 |
| taskQueue_ |
| mutex_ |
| condition_ |
+------------------+
|
+-------------------+-------------------+
| | |
v v v
+--------+ +--------+ +--------+
|thread1 | |thread2 | |thread3 |
+--------+ +--------+ +--------+
| | |
v v v
从taskQueue_取任务执行
11.2 简单实现
class ThreadPool {
public:
ThreadPool(int numThreads) {
for (int i = 0; i < numThreads; ++i) {
threads_.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lck(mtx_);
condition_.wait(lck, [this] { return !tasks_.empty(); });
task = std::move(tasks_.front());
tasks_.pop();
}
task();
}
});
}
}
template<class F, class... Args>
auto exec(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::lock_guard<std::mutex> lck(mtx_);
tasks_.push([task]() { (*task)(); });
}
condition_.notify_one();
return res;
}
~ThreadPool() {
for (auto &t : threads_) {
t.join();
}
}
private:
std::vector<std::thread> threads_;
std::queue<std::function<void()>> tasks_;
std::mutex mtx_;
std::condition_variable condition_;
};
11.3 使用示例
ThreadPool pool(4);
// 执行普通函数
auto f1 = pool.exec(add, 1, 2);
// 执行类成员函数
A obj;
auto f2 = pool.exec(&A::func, &obj, 10);
int result = f1.get(); // get是阻塞的
十二、线程睡眠与ID
12.1 睡眠函数
| 函数 | 说明 |
|---|---|
| std::this_thread::sleep_for(duration) | 睡眠指定时长 |
| std::this_thread::sleep_until(time_point) | 睡眠到指定时间 |
| std::this_thread::yield() | 让出CPU时间片 |
12.2 获取线程ID
std::this_thread::get_id(); // 获取当前线程ID
thread.get_id(); // 获取thread对象的线程ID
十三、MySQL连接池
13.1 为什么需要连接池?
直接创建连接的问题:
每次执行SQL都新建连接:
+---------------------------------------------------------------+
| TCP三次握手 -> ~1-3ms |
| MySQL握手初始化 -> ~1-2ms |
| 用户认证 -> ~1-2ms |
| 执行SQL -> ~0.1ms (假设) |
| 四次挥手断开 -> ~1-2ms |
+---------------------------------------------------------------+
| 总计:连接开销5-10ms,实际SQL执行只需要0.1ms |
| 99%的时间浪费在建立连接上! |
+---------------------------------------------------------------+
如果每秒执行1000次SQL:
无连接池:1000 x 10ms = 10秒(实际只能执行100次)
有连接池:1000 x 0.1ms = 100ms(轻松执行1000次)
连接池的核心思想:
- 预先建立一批连接,复用而不是每次新建
- 跳过握手认证,直接执行SQL
- 将串行的"建连-执行-断连"变成"执行-归还"
13.2 连接池整体架构
+-------------------------------------+
| MySQLConnPool |
| (连接池管理器) |
| |
| +-----------------------------+ |
| | 单例模式,按数据库隔离 | |
| | instances_["db_name"] | |
| +-----------------------------+ |
| |
| +-----------------------------+ |
| | 共享任务队列 | |
| | BlockingQueue<Task*> | |
| +-----------------------------+ |
| ^ | |
| | | |
| Push(Task) Pop(Task) |
+-------------------------------------+
|
+---------------------------------+---------------------------------+
| | |
v v v
+-------------+ +-------------+ +-------------+
| MySQLConn1 | | MySQLConn2 | | MySQLConn3 |
| +--------+ | | +--------+ | | +--------+ |
| | Conn | | | | Conn | | | | Conn | |
| | (TCP) | | | | (TCP) | | | | (TCP) | |
| +--------+ | | +--------+ | | +--------+ |
| +--------+ | | +--------+ | | +--------+ |
| | Worker | | | | Worker | | | | Worker | |
| | thread | | | | thread | | | | thread | |
| +--------+ | | +--------+ | | +--------+ |
+-------------+ +-------------+ +-------------+
组件关系:
- 连接池管理器(单例)管理所有连接
- 所有连接共享同一个任务队列
- 每个连接有自己的Worker线程
- Worker从共享队列取任务,用自己的连接执行SQL
13.3 核心组件与职责
| 组件 | 职责 | 关键设计 |
|---|---|---|
| MySQLConnPool | 连接池管理器 | 单例模式,按数据库隔离 |
| MySQLConn | 单个连接封装 | Open/Close/Query |
| MySQLWorker | 工作线程 | 从队列取任务,执行SQL |
| SQLOperation | SQL任务封装 | 包含SQL + Promise |
| BlockingQueue | 任务队列 | 线程安全,阻塞等待 |
| QueryCallback | 回调封装 | 封装Future和回调函数 |
数据流向:
用户调用 内部处理
| |
| Query(sql, callback) |
+------------------------------------>|
| |
| 创建 SQLOperation(sql) |
| 生成 Promise + Future |
| task_queue.Push(op) |
| |
| 返回 QueryCallback(future, cb) |
v v
Worker线程循环:
while (true) {
op = task_queue.Pop() // 阻塞等待
op->Execute(conn) // 执行SQL
promise.set_value(res) // 设置结果
}
主循环检查:
InvokeIfReady() -> cb(res) // 结果就绪调用回调
13.4 为什么用共享队列?
方案A:每连接独立队列
连接1 <-> 队列1
连接2 <-> 队列2
连接3 <-> 队列3
问题:
- 用户需要知道往哪个队列投任务
- 负载不均衡(有的队列任务多,有的少)
- 实现复杂
方案B:共享队列(我们用的)
共享队列 --> 连接1的Worker
--> 连接2的Worker
--> 连接3的Worker
优点:
- 用户只管投任务,不用关心哪个连接执行
- 自动负载均衡(谁空闲谁取任务)
- 实现简单
关键:队列是生产者-消费者之间的桥梁
13.5 Future/Promise异步机制
为什么需要这个?
同步方式(阻塞):
result = conn->Query(sql); // 主线程在这里卡住等待结果
// 不能做其他事
异步方式(非阻塞):
future = conn->AsyncQuery(sql); // 立即返回
// 可以做其他事
// 之后通过 future.get() 获取结果
或者用回调:
conn->Query(sql, [](result) { ... }); // 结果出来自动调用回调
Future/Promise的工作流程:
请求线程 Worker线程
| |
| 创建 Promise |
| 获取 Future |
| |
| 把任务丢到队列 |
| 立即返回 QueryCallback |
| |
| | 从队列取出任务
| | 执行SQL
| | promise.set_value(result)
| |
| InvokeIfReady() 检查 |
| v 是 |
| 调用回调 cb(result) |
| |
为什么要封装成QueryCallback?
- 用户不想阻塞轮询
- 用
InvokeIfReady()检查,结果就绪自动调用回调 - 主循环定期调用
InvokeIfReady()处理所有完成的任务
13.6 连接生命周期管理
初始化阶段:
MySQLConnPool::InitPool(url, pool_size)
|
v
创建 pool_size 个 MySQLConn
|
v
for each conn:
conn->Open() // 建立TCP连接 + MySQL握手认证
conn->Start() // 启动Worker线程
使用阶段:
用户调用 Query(sql, cb)
|
v
创建 SQLOperation
|
v
task_queue.Push(op) // 任务进入共享队列
|
v
Worker从队列取出任务
|
v
执行 op->Execute(conn)
|
v
promise.set_value(result) // 结果通过Future返回
关闭阶段:
MySQLConnPool::~MySQLConnPool()
|
v
task_queue.Cancel() // 唤醒所有Worker,让它们退出
|
v
for each conn:
conn->Stop() // 停止Worker线程
conn->Close() // 关闭MySQL连接
13.7 心跳维持连接
问题:
- MySQL服务器有
wait_timeout超时时间 - 空闲连接超过这个时间会被MySQL服务端断开
解决方案:心跳检测
定期(如每30秒)执行:
SELECT 1
如果失败:
重新建立连接
恢复连接状态
注意:实际生产环境需要实现心跳机制
13.8 连接池与线程池对比
| 对比项 | 线程池 | MySQL连接池 |
|---|---|---|
| 资源类型 | 工作线程 | 数据库连接 |
| 任务来源 | 主线程提交 | 主线程提交 |
| 执行者 | 池内线程 | 连接对应的Worker线程 |
| 阻塞点 | Pop()队列空 | Pop()队列空 |
| 生命周期 | 随进程/池销毁 | 连接可能断,需维护 |
| 核心问题 | 任务分配 | 连接复用 |
两者关系:
线程池处理业务逻辑
|
| 需要数据库
v
调用连接池执行SQL
|
| 从池中借出连接
v
Worker用连接执行SQL
|
| 返回结果
v
释放连接到池中
13.9 单例模式在连接池中的应用
为什么用单例?
场景:一个项目连接多个数据库
错误做法:每个地方都 new MySQLConnPool
|
+-- 地方A: pool_a = new ... -> 连接DB1
+-- 地方B: pool_b = new ... -> 连接DB1(重复)
+-- 地方C: pool_c = new ... -> 连接DB2(重复)
正确做法:单例模式
|
+-- MySQLConnPool::GetInstance("db1") -> 返回同一个实例
+-- MySQLConnPool::GetInstance("db2") -> 返回另一个实例
单例实现思路:
static std::unordered_map<std::string, MySQLConnPool*> instances_;
MySQLConnPool* GetInstance(const std::string& db) {
if (instances_.find(db) == instances_.end()) {
instances_[db] = new MySQLConnPool(db);
}
return instances_[db];
}
好处:
- 按数据库名隔离,一个DB一个池
- 全局唯一访问点
- 避免重复创建
13.10 MySQL C/C++驱动选择
| 驱动 | 语言 | 性能 | 错误处理 | 适用场景 |
|---|---|---|---|---|
| libmysqlclient | 纯C | 最高 | errno | 性能优先、拒绝异常 |
| libmysqlcppconn | C++ | 略低 | try-catch | 开发效率、快速原型 |
选型建议:
- 性能优先 -> C库
- 拒绝异常机制 -> C库
- 开发效率、快速原型 -> C++库
13.11 实际应用场景
场景1:高并发Web服务
请求 -> 线程池处理 -> 连接池执行SQL -> 返回结果
| |
| +-- 复用连接,高并发支撑
|
+-- 线程池管理工作线程,不阻塞主线程
场景2:批量处理任务
批量任务 -> 线程池分发 -> 连接池执行SQL -> 收集结果
| |
| +-- 每个任务复用连接,开销小
|
+-- 任务并行处理,线程池管理
场景3:异步数据库操作
用户请求 -> 立即返回 -> 后台Worker执行SQL -> 回调通知
|
+-- 连接池提供连接
13.12 面试追问FAQ
| 问题 | 答案 |
|---|---|
| 连接池大小怎么定? | 根据并发量、MySQL配置、服务器资源综合考虑,通常几十到几百 |
| 连接泄露怎么办? | 记录连接状态、设置超时检测、析构时强制回收 |
| 为什么要用共享队列? | 负载均衡、用户不需要关心哪个连接执行、实现简单 |
| 同步和异步连接池哪个好? | 初始化用同步(简单快速),业务处理用异步(吞吐高) |
| 连接断了怎么办? | 重连机制、心跳检测、异常状态标记 |
| MySQL驱动选哪个? | 性能优先->C库,拒绝异常->C库,开发效率->C++库 |
| 为什么不直接用连接池? | 连接创建开销大,复用可以节省5-10ms的握手时间 |
总结
C++11多线程核心要点
| 类别 | 要点 |
|---|---|
| thread | 不可拷贝,可移动;join/detach二选一 |
| mutex | lock/unlock/try_lock;RAII用lock_guard/unique_lock |
| 条件变量 | wait/wait_for + notify_one/notify_all |
| atomic | 无锁原子操作,适用于简单类型 |
| async/future | 异步任务与结果获取 |
| packaged_task | 任务与future绑定 |
| promise | 主动设置异步结果 |
| function/bind | 函数包装与绑定 |
| lambda | 匿名函数对象 |
| 线程池 | 生产者-消费者模式,任务队列 |
| MySQL连接池 | 复用连接,共享队列,Future/Promise异步返回 |
MySQL连接池核心要点
| 组件 | 作用 |
|---|---|
| MySQLConnPool | 单例模式,按数据库隔离,管理连接生命周期 |
| MySQLConn | 封装单个连接,Open/Close/Query |
| MySQLWorker | 从共享队列取任务,用连接执行SQL |
| SQLOperation | 封装SQL任务和Promise,用于异步返回结果 |
| BlockingQueue | 线程安全的阻塞队列,生产者-消费者解耦 |
| QueryCallback | 封装Future和回调函数,InvokeIfReady轮询检查 |
根据零声教育教学写作https://github.com/0voice
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)