C++11多线程编程
C++ 多线程编程自 C++11 起被正式引入标准库,极大简化了跨平台并发程序的开发。要全面掌握 C++ 多线程编程,需要理解以下几个核心知识模块:线程生命周期管理、数据同步与互斥、条件变量、异步编程模型、原子操作,以及 C++20 引入的新特性。
以下是详细的知识点解析及对应的代码示例:
1. 线程的创建与管理 (<thread>)
最基础的知识点是如何启动一个线程,以及如何处理它的生命周期。
-
std::thread: 用于创建线程。可以传入函数指针、Lambda 表达式、函数对象或成员函数。 -
join(): 阻塞当前线程,直到目标线程执行完毕。保证线程安全退出。 -
detach(): 将线程与主调线程分离,使其在后台独立运行(成为守护线程)。
#include <iostream>
#include <thread>
#include <chrono>
void worker_function(int id) {
std::cout << "Thread " << id << " is running.\n";
std::this_thread::sleep_for(std::chrono::seconds(1));
}
int main() {
// 1. 使用函数指针创建线程
std::thread t1(worker_function, 1);
// 2. 使用 Lambda 表达式创建线程
std::thread t2([](int id) {
std::cout << "Thread " << id << " is running via Lambda.\n";
}, 2);
// 等待 t1 完成 (阻塞主线程)
if (t1.joinable()) {
t1.join();
}
// 分离 t2,让其在后台独立运行 (主线程结束时不等待 t2)
if (t2.joinable()) {
t2.detach();
}
std::cout << "Main thread finished.\n";
return 0;
}
2. 互斥量与锁机制 (<mutex>)
多线程同时访问共享数据会引发数据竞争(Data Race)。需要使用互斥锁来保证同一时刻只有一个线程访问共享资源。
-
std::mutex: 最基本的互斥锁。直接调用lock()和unlock()容易因异常导致死锁。 -
std::lock_guard: 基于 RAII 机制的锁,构造时加锁,析构时自动解锁,推荐在简单作用域内使用。 -
std::unique_lock: 比lock_guard更灵活的 RAII 锁,支持延迟加锁、提前解锁以及与条件变量配合使用,但性能开销略大。
#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
std::mutex mtx;
int shared_counter = 0;
void increment_counter(int iterations) {
for (int i = 0; i < iterations; ++i) {
// 使用 lock_guard 自动管理锁的生命周期
std::lock_guard<std::mutex> lock(mtx);
shared_counter++; // 临界区
}
}
int main() {
std::vector<std::thread> threads;
for (int i = 0; i < 5; ++i) {
threads.push_back(std::thread(increment_counter, 1000));
}
for (auto& t : threads) {
t.join();
}
std::cout << "Final counter value: " << shared_counter << " (Expected: 5000)\n";
return 0;
}
3. 防范死锁 (std::scoped_lock / std::lock)
当多个线程需要同时获取多个锁时,如果获取锁的顺序不一致,极易发生死锁。
-
std::scoped_lock(C++17): 可以同时锁定多个互斥量,内部采用死锁避免算法,完全替代了 C++11 的std::lock函数结合std::unique_lock的繁琐写法。
#include <iostream>
#include <thread>
#include <mutex>
std::mutex mtx1, mtx2;
void thread_a() {
// C++17 语法:同时安全地获取两个锁,避免死锁
std::scoped_lock lock(mtx1, mtx2);
std::cout << "Thread A locked both mutexes.\n";
}
void thread_b() {
// 即使顺序不同,scoped_lock 也能保证不发生死锁
std::scoped_lock lock(mtx2, mtx1);
std::cout << "Thread B locked both mutexes.\n";
}
int main() {
std::thread t1(thread_a);
std::thread t2(thread_b);
t1.join();
t2.join();
return 0;
}
4. 条件变量 (<condition_variable>)
用于线程间的同步与通信。一个线程等待某个条件成立,另一个线程在条件满足时唤醒等待的线程。常用于生产者-消费者模型。
-
必须与
std::unique_lock<std::mutex>配合使用。 -
wait(lock, condition): 如果 condition 为 false,则释放锁并阻塞当前线程;被唤醒后重新获取锁并再次检查 condition。 -
notify_one()/notify_all(): 唤醒一个/所有等待该条件变量的线程。
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
std::mutex mtx;
std::condition_variable cv;
std::queue<int> data_queue;
bool finished = false;
void producer() {
for (int i = 1; i <= 5; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
{
std::lock_guard<std::mutex> lock(mtx);
data_queue.push(i);
std::cout << "Produced: " << i << "\n";
}
cv.notify_one(); // 通知消费者有新数据
}
{
std::lock_guard<std::mutex> lock(mtx);
finished = true;
}
cv.notify_all();
}
void consumer() {
while (true) {
std::unique_lock<std::mutex> lock(mtx);
// 等待条件:队列非空,或者生产结束
cv.wait(lock, [] { return !data_queue.empty() || finished; });
while (!data_queue.empty()) {
int data = data_queue.front();
data_queue.pop();
std::cout << "Consumed: " << data << "\n";
}
if (finished && data_queue.empty()) {
break;
}
}
}
int main() {
std::thread t1(producer);
std::thread t2(consumer);
t1.join();
t2.join();
return 0;
}
5. 异步编程模型 (<future>)
相比于手动管理线程和锁,异步模型提供了更高层、更安全的并发获取返回结果的方式。
-
std::async: 异步启动一个任务(可能在新线程中,也可能延迟执行),返回一个std::future。 -
std::future: 代表一个异步操作的未来结果。调用get()会阻塞等待直到结果准备就绪。 -
std::promise: 允许在一个线程中设置值,在另一个线程中通过future获取该值,用于底层的线程间数据传递。
#include <iostream>
#include <future>
#include <chrono>
int complex_computation(int x) {
std::this_thread::sleep_for(std::chrono::seconds(2)); // 模拟耗时计算
return x * x;
}
int main() {
std::cout << "Starting async task...\n";
// 异步执行 complex_computation,std::launch::async 强制开启新线程
std::future<int> result_future = std::async(std::launch::async, complex_computation, 10);
std::cout << "Doing other work in main thread...\n";
// 阻塞主线程,直到异步任务完成并获取结果
int result = result_future.get();
std::cout << "Result: " << result << "\n";
return 0;
}
6. 原子操作与无锁编程 (<atomic>)
对于基础数据类型(如 int, bool),使用互斥锁太重了。原子操作保证了单个指令级别的不可分割性,通常基于硬件级的 CAS (Compare-And-Swap) 指令实现无锁并发。
-
std::atomic<T>: 模板类,用于提供线程安全的无锁基本操作。
#include <iostream>
#include <thread>
#include <atomic>
#include <vector>
// 使用 std::atomic 替代 int + std::mutex
std::atomic<int> atomic_counter(0);
void increment_atomic(int iterations) {
for (int i = 0; i < iterations; ++i) {
atomic_counter++; // 原子递增,线程安全且无锁
}
}
int main() {
std::vector<std::thread> threads;
for (int i = 0; i < 5; ++i) {
threads.push_back(std::thread(increment_atomic, 1000));
}
for (auto& t : threads) {
t.join();
}
std::cout << "Final atomic counter value: " << atomic_counter << "\n";
return 0;
}
7. C++20 新增并发特性
C++20 进一步丰富了并发库,引入了许多传统操作系统中常见的同步原语。
-
std::jthread:std::thread的升级版,析构时会自动调用request_stop()和join(),避免忘记 join 导致程序崩溃。 -
std::counting_semaphore: 信号量,控制同时访问某一资源的线程数量。 -
std::latch&std::barrier: 用于同步多个线程,等待所有线程到达同一个执行点。
// 需使用支持 C++20 的编译器编译
#include <iostream>
#include <thread>
#include <latch>
// 初始化一个需要等待 3 个线程到达的屏障 (latch 只能使用一次)
std::latch work_done(3);
void worker(int id) {
std::cout << "Worker " << id << " is doing initialization work.\n";
// ... 模拟工作 ...
// 工作完成,计数器减一
work_done.count_down();
std::cout << "Worker " << id << " finished initialization.\n";
}
int main() {
std::jthread t1(worker, 1);
std::jthread t2(worker, 2);
std::jthread t3(worker, 3);
std::cout << "Main thread waiting for all workers to finish initialization...\n";
// 主线程阻塞,直到 latch 的计数器变为 0
work_done.wait();
std::cout << "All workers initialized. Main thread proceeding.\n";
// 使用 jthread 不需要手动 join()
return 0;
}
掌握以上知识点可以应对绝大多数 C++ 业务场景中的并发需求。
线程池 (Thread Pool) 和 内存顺序 (Memory Order)。这两个概念确实是 C++ 并发编程中的“深水区”。为了让这些抽象的概念更直观,我们将结合实际的高性能计算场景——例如 GNSS 信号仿真软件的开发——来详细拆解线程池和内存顺序。
8.线程池 (Thread Pool) 的设计与实现
为什么需要线程池?
在复杂的系统中(比如需要同时对多颗卫星、多个通道的 GNSS 信号进行高频次仿真和解算),如果每处理一个历元 (Epoch) 的数据或每生成一段基带信号就新建一个 std::thread,频繁的线程创建和销毁会导致巨大的操作系统上下文切换开销。
线程池的核心思想是池化技术:预先创建一组固定数量的工作线程,让它们处于阻塞等待状态。当有新任务时,直接唤醒一个空闲线程去执行,执行完毕后线程不销毁,而是继续等待下一个任务。
核心组件:
-
任务队列 (Task Queue):存储待处理的任务,通常需要是线程安全的(结合
std::mutex和std::condition_variable)。 -
工作线程组 (Worker Threads):不断从队列中取出任务并执行。
-
管理机制:负责线程的创建、销毁(优雅退出)以及任务的提交。
应用场景举例:
在信号模拟器中,你可以将 GPS、BDS、Galileo 各个通道的载波发生、伪码生成等独立计算任务打包成函数,扔进线程池中并行处理,最后在主线程中汇总集成。
代码示例(实现一个精简的现代 C++ 线程池):
#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>
class ThreadPool {
public:
// 构造函数:启动指定数量的工作线程
ThreadPool(size_t threads) : stop(false) {
for(size_t i = 0; i < threads; ++i) {
workers.emplace_back([this] {
while(true) {
std::function<void()> task;
{
// 锁定队列以获取任务
std::unique_lock<std::mutex> lock(this->queue_mutex);
// 等待条件:线程池停止 或 队列中有任务
this->condition.wait(lock, [this] {
return this->stop || !this->tasks.empty();
});
if(this->stop && this->tasks.empty()) {
return; // 线程退出
}
task = std::move(this->tasks.front());
this->tasks.pop();
}
task(); // 执行任务
}
});
}
}
// 提交任务的模板函数,返回 std::future 以获取结果
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args) -> std::future<typename std::invoke_result<F, Args...>::type> {
using return_type = typename std::invoke_result<F, Args...>::type;
// 将任务包装成 std::packaged_task
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
if(stop) throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task]() { (*task)(); });
}
condition.notify_one(); // 唤醒一个工作线程
return res;
}
// 析构函数:优雅关闭所有线程
~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for(std::thread &worker: workers) {
worker.join();
}
}
private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};
// --- 测试代码 ---
void simulate_satellite_channel(int sv_id) {
std::cout << "Simulating signal for SV: " << sv_id << " in thread " << std::this_thread::get_id() << "\n";
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 模拟计算耗时
}
int main() {
ThreadPool pool(4); // 创建包含4个工作线程的线程池
std::vector<std::future<void>> results;
// 模拟提交 8 颗卫星的信号生成任务
for(int i = 1; i <= 8; ++i) {
results.emplace_back(pool.enqueue(simulate_satellite_channel, i));
}
// 等待所有任务完成
for(auto && result: results) {
result.get();
}
std::cout << "All channels simulated.\n";
return 0;
}
9. 原子操作与内存顺序 (Memory Order)
std::atomic 不仅仅是为了提供无锁的 ++ 或 -- 操作,它的核心在于控制内存可见性和指令重排。
为什么需要内存顺序?
编译器在编译阶段、CPU 在执行阶段,为了极致的性能优化,常常会打乱代码的执行顺序(指令重排)。在单线程下这没有问题,但在多线程下,如果线程 B 依赖线程 A 的某个变量更新,指令重排会导致致命的逻辑错误。
C++ 提供了 6 种内存顺序(定义在 std::memory_order 中),按约束从弱到强主要分为三类:
1) 自由序 (Relaxed Ordering)
-
std::memory_order_relaxed -
特点:只保证当前操作本身的原子性,不保证线程间的同步和执行顺序。编译器和 CPU 可以随意重排。
-
场景:仅仅需要一个原子的计数器(比如统计总共生成了多少个数据包),不涉及依赖这个计数器去读取其他数据。
2)获取-释放序 (Acquire-Release Ordering)
-
std::memory_order_acquire(用于读操作) /std::memory_order_release(用于写操作) -
特点:极其重要的高性能无锁编程模型。
-
Release (释放):在本线程中,所有在 release 之前的内存写操作,绝对不能重排到 release 之后。
-
Acquire (获取):在本线程中,所有在 acquire 之后的内存读写操作,绝对不能重排到 acquire 之前。
-
-
场景举例:在高速信号仿真中,模块 A(生产者)生成了一段完整的载波和伪码数据存入内存,然后通过
release标记一个原子 flag;模块 B(消费者)通过acquire读取这个 flag。这就保证了模块 B 一旦看到 flag 变为 true,就绝对能读取到模块 A 写入的完整数据,不会读到脏数据,且完全不需要使用沉重的std::mutex。
3)顺序一致性 (Sequential Consistency)
-
std::memory_order_seq_cst(默认行为) -
特点:最严格的内存顺序。所有的线程看到的内存操作顺序都是全局一致的。
-
代价:会在 CPU 层面插入内存屏障 (Memory Barriers),阻止缓存优化,性能开销最大。
代码示例(使用 Acquire-Release 实现高性能的生产者-消费者同步):
#include <iostream>
#include <thread>
#include <atomic>
#include <vector>
#include <chrono>
std::vector<int> payload_data; // 共享数据(非原子)
std::atomic<bool> data_ready(false); // 原子标志位
void trajectory_producer() {
// 1. 准备数据 (普通的内存写操作)
payload_data = {101, 102, 103, 104};
std::cout << "Producer: Data generated.\n";
// 2. 使用 release 内存序标记数据准备完毕
// 保证:在这行代码之前的 payload_data 写入,绝不会被重排到这行代码之后!
data_ready.store(true, std::memory_order_release);
}
void signal_consumer() {
// 1. 使用 acquire 内存序等待数据
// 自旋锁等待,直到读取到 true
while (!data_ready.load(std::memory_order_acquire)) {
std::this_thread::yield(); // 让出 CPU
}
// 2. 安全读取数据
// 保证:由于 acquire 语义,这行代码绝不会被重排到 load 之前。
// 因此这里肯定能看到 producer 写入的最新 payload_data。
std::cout << "Consumer: Data received: ";
for (int v : payload_data) {
std::cout << v << " ";
}
std::cout << "\n";
}
int main() {
std::thread t2(signal_consumer);
std::this_thread::sleep_for(std::chrono::milliseconds(50)); // 故意让消费者先启动并等待
std::thread t1(trajectory_producer);
t1.join();
t2.join();
return 0;
}
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)