一、线程基础知识

1. 进程和线程的概念

进程是系统资源分配和调度的基本单位,也是程序的一次执行过程。线程是进程的一个执行单元,是操作系统调度的基本单位。

2. 多线程的优点和使用场景

多线程程序在提高计算机系统的并发性和响应性方面有着极其重要的作用。它可以更好地利用计算机的多核和多处理器资源,在提高系统吞吐量的同时缩短响应时间。常见的使用场景包括:

  • 程序需要用户交互并保持响应性
  • 后台任务需要异步完成
  • 大量计算密集型任务需要加速

3. 线程的生命周期

多线程程序的生命周期包括:

  • 创建线程
  • 运行线程
  • 线程流程控制
  • 等待其他线程完成
  • 销毁线程

4. 线程的状态

线程可以处于以下状态之一:

  • 新创建状态:线程被创建但还未开始运行。
  • 就绪状态:线程已经准备就绪,等待CPU调度执行。
  • 运行状态:线程正在运行中。
  • 阻塞状态:线程因等待某个事件而暂停执行。
  • 死亡状态:线程退出或被终止。

5. 线程的同步和互斥

5.1 线程同步

线程同步是指多个线程按照一定规律协调工作,使得这些线程在空间、时间上按照既定规律有序地执行工作。实现方式一般有:

  • 互斥量(Mutex)
  • 信号量(Semaphore)
  • 事件(Event)
  • 条件变量(Condition Variable)

5.2 线程互斥

线程互斥是指在多线程环境下,所有线程都要访问共享资源,但同一时刻只能有一个线程访问。实现方式一般有:

  • 互斥量(Mutex)
  • 信号量(Semaphore)

6. 代码示例

下面是一个简单的C++实现多线程和线程同步的示例代码:

代码中创建了3个线程thread_A和1个线程thread_B。在thread_B中,先进行初始化操作,等初始化完成后通过全局变量ready通知其它线程可以开始执行了。在thread_A中,当线程准备就绪时等待主线程的通知。主线程等待所有线程执行结束后再退出。

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
using namespace std;

mutex mtx;
condition_variable cv;
bool ready = false;

void thread_A(int id)
{
    unique_lock<mutex> lck(mtx);
    while (!ready) cv.wait(lck); // 等待条件变量
    cout << "Thread " << id << " is running\n";
}

void thread_B()
{
    cout << "Initializing...\n";
    this_thread::sleep_for(chrono::milliseconds(2000)); // 模拟初始化过程
    {
        lock_guard<mutex> lck(mtx); // 自动加锁互斥量
        ready = true; // 设置条件变量
        cout << "Initialization complete\n";
    }
    cv.notify_all();  // 唤醒所有等待的线程
}

int main()
{
    thread t1(thread_A, 1);
    thread t2(thread_A, 2);
    thread t3(thread_A, 3);
    thread t4(thread_B);

    t1.join();
    t2.join();
    t3.join();
    t4.join();

    return 0;
}

二、C++11线程库

在多线程编程中为了能够方便地创建、控制和同步线程,C++11引入了一个新的特性——C++11线程库。该库提供了一种可移植的、易于使用的线程编程模型,能够让程序员方便地进行线程编程。在本文中,我们将介绍C++11线程库的概念、特性、关键类和函数,以及使用线程库创建和控制线程、完成线程之间的同步和互斥、以及使用原子变量和无锁数据结构的方法。

1 C++11线程库的概念和特性

C++11线程库是C++11标准中的一个新特性,它提供了一种可移植的、易于使用的线程编程模型,能够让程序员方便地创建、控制和同步线程。C++11线程库的主要特性包括:

  • 提供了面向对象的线程编程模型。
  • 提供了可移植的、与平台无关的线程库。
  • 提供了对线程同步、互斥和原子操作的支持。
  • 线程库中的类和函数都采用了RAII(Resource Acquisition Is Initialization)技术,能够在不必手动管理线程生命周期的同时确保资源的正确释放。

2 线程库中的关键类和函数

在C++11线程库中,常用的关键类和函数包括:

  • std::thread:表示一个线程对象。
  • std::mutex:表示互斥量,用于保护共享资源。
  • std::condition_variable:表示条件变量,用于线程之间的通信。
  • std::atomic:表示原子类型,用于实现无锁数据结构。
  • std::async:表示异步操作,可以在另一线程中异步地执行函数。

3 使用线程库创建和控制线程

使用C++11线程库创建和控制线程非常简单,只需要使用std::thread类即可。在创建std::thread对象时,需要将执行的函数作为参数传递进去。例如:

#include <iostream>
#include <thread>

// 定义一个无参无返回值的函数
void func()
{
    std::cout << "Hello, C++11 thread!\n";
}

int main()
{
    // 创建线程并执行函数
    std::thread t(func);

    // 等待线程结束
    t.join();

    return 0;
}

4 使用线程库完成线程之间的同步和互斥

在多线程编程中,为了避免多个线程同时访问共享资源时发生竞争条件导致的错误,需要使用同步和互斥机制。C++11线程库中提供了多种同步和互斥机制,包括:

  • std::mutex和std::lock_guard:用于提供互斥机制。
  • std::condition_variable:用于线程之间的通信和同步。
  • std::atomic: 用于实现并发访问的原子操作。

例如下面的代码展示了如何使用std::mutex和std::lock_guard提供互斥机制:

#include <iostream>
#include <thread>
#include <mutex>

// 共享资源
int cnt = 0;

// 定义互斥量
std::mutex mtx;

// 定义一个函数,该函数对cnt进行自增操作
void func()
{
    // 创建lock_guard对象,注意lock_guard的生命周期
    std::lock_guard<std::mutex> lck(mtx);
    cnt++;
}

int main()
{
    std::thread t1(func);
    std::thread t2(func);

    t1.join();
    t2.join();

    std::cout << "cnt: " << cnt << std::endl;

    return 0;  
}

5 原子变量和无锁数据结构的使用

除了使用std::mutex和std::lock_guard提供互斥机制,C++11线程库还提供了std::atomic类来实现原子操作。这些操作能够在不使用锁的情况下完成原子操作,从而提高了程序的并发性能。

例如下面的代码展示了如何使用std::atomic类实现对共享变量的原子操作:

除了使用std::atomic类之外,C++11线程库还提供了无锁数据结构(Lock-Free Data Structure)的支持,例如std::atomic_flag、std::atomic*等。这些数据结构可以让程序在不使用锁的情况下实现高度的并发性能。

#include <iostream>
#include <thread>
#include <atomic>

// 定义原子变量
std::atomic<int> cnt(0);

// 定义一个函数,该函数对cnt进行自增操作
void func()
{
    cnt++;
}

int main()
{
    std::thread t1(func);
    std::thread t2(func);

    t1.join();
    t2.join();

    std::cout << "cnt: " << cnt << std::endl;

    return 0;
}

三、 多线程编程实践

在当今互联网时代,多线程编程已经成为了很多应用程序的必需品。C++ 作为一门多范式的编程语言,自然也提供了丰富的多线程编程接口和库。然而多线程编程也具有一些特殊的问题和挑战,例如线程安全、性能瓶颈、异常处理等等。本文将针对这些问题,通过实例演示 C++ 多线程编程的常见问题和调试技巧、多线程程序的设计和实现方法、多线程程序中的优化技巧、异常处理和资源管理、性能调优方法等方面的基础知识和实践经验。

1 多线程程序的常见问题和调试技巧

在多线程编程中,常见的问题包括竞态条件、死锁、饥饿和信号量等。这些问题可能会导致程序运行出现错误、性能下降或崩溃。为了避免这些问题,需要使用一些调试技巧,例如:

  • 使用调试器来诊断问题,并使用线程调试器查看线程堆栈;
  • 使用锁和互斥量来保护共享资源;
  • 可以使用条件变量来等待特定事件;
  • 使用信号量来控制资源的使用。

以下是使用 std::mutex 和 std::condition_variable 保护共享资源的例子,来防止 race condition 问题:

#include <iostream>
#include <mutex>
#include <thread>
#include <vector>

class Counter {
public:
    Counter() : value_(0) {}

    void Increment() {
        std::lock_guard<std::mutex> lock(mutex_);
        ++value_;
    }

    void Decrement() {
        std::lock_guard<std::mutex> lock(mutex_);
        --value_;
    }

    int value() const {
        return value_;
    }

private:
    int value_;
    mutable std::mutex mutex_;
};

int main() {
    Counter counter;
    std::vector<std::thread> threads;

    for (int i = 0; i < 5; ++i) {
        threads.push_back(std::thread([&counter](){
            for (int j = 0; j < 1000; ++j) {
                counter.Increment();
            }
        }));
    }

    // 等待所有线程执行完毕
    for (auto& thread : threads) {
        thread.join();
    }

    std::cout << counter.value() << std::endl;
    return 0;
}

在调试过程中建议使用线程安全的日志输出函数,例如依赖于 ATOMIC_FLAG_INIT 宏的 std::atomic_flag 类,保证多线程环境下的线程安全输出:

#include <iostream>
#include <thread>
#include <atomic>
#include <string>

void Log(const std::string& msg) {
    static std::atomic_flag lock = ATOMIC_FLAG_INIT;
    while (lock.test_and_set(std::memory_order_acquire)) {}
    std::cout << msg << std::endl;
    lock.clear(std::memory_order_release);
}

int main() {
    std::thread t1([]{
        Log("Hello from thread 1");
    });
    std::thread t2([]{
        Log("Hello from thread 2");
    });
    t1.join();
    t2.join();
    return 0;
}

2 多线程程序的设计和实现方法

在设计和实现多线程程序时,需要考虑线程之间的通信和同步。常用的设计和实现方法包括:

  • 使用线程安全队列来传递数据;
  • 将线程之间的依赖性分析并尝试使用锁来保护共享资源;
  • 使用条件变量来阻塞线程等待事件;
  • 使用线程池来管理线程。

以下是使用 std::queue 和 std::condition_variable 进行多线程数据传输的例子:

在生产者函数中,使用 std::unique_lock 获取互斥锁,向队列中压入数据,并通过 std::condition_variable 的 notify_one() 通知消费者线程进行消费。在消费者函数中,通过 wait() 等待生产者线程通知,并获取队列中的数据进行消费。

#include <iostream>
#include <queue>
#include <mutex>
#include <thread>
#include <condition_variable>

std::mutex mtx;
std::queue<int> q;
std::condition_variable cv;

void producer() {
    for (int i = 0; i < 10; i++) {
        std::unique_lock<std::mutex> lck(mtx);
        q.push(i);
        lck.unlock();
        cv.notify_one();
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}

void consumer() {
    while (true) {
        std::unique_lock<std::mutex> lck(mtx);
        cv.wait(lck, []{ return !q.empty(); });
        std::cout << q.front() << std::endl;
        q.pop();
        lck.unlock();
    }
}

int main() {
    std::thread t1(producer);
    std::thread t2(consumer);
    t1.join();
    t2.join();
    return 0;
}

3 多线程程序中的优化技巧

为了提高多线程程序的性能,可以使用以下优化技巧:

  • 使用原子操作或无锁数据结构来避免锁竞争;
  • 使用线程池来避免线程创建和销毁的开销;
  • 将任务划分为更小的部分以提高并发性;
  • 通过使用分级锁来减少锁竞争的粒度。

以下是使用线程池调度多线程操作的例子:

在上面的例子中,使用了一个线程池调度多个任务,这些任务会被推入任务队列中等待执行。一个固定数量的线程会池化处理这些任务,如果一个线程已经完成了一个任务,就会从任务队列中取出下一个任务继续执行。这样不仅避免了线程创建和销毁的开销,而且能够更好地利用CPU资源,提高程序的并发性能。

#include <iostream>
#include <vector>
#include <thread>
#include <queue>
#include <atomic>
#include <condition_variable>
#include <functional>

class ThreadPool {
public:
    ThreadPool(int numThreads)
        : stop_(false)
    {
        for (int i= 0; i < numThreads; ++i) {
            threads_.emplace_back(std::thread(std::bind(&ThreadPool::threadFunc, this)));
        }
    }

    ~ThreadPool() {
        {
            std::unique_lock<std::mutex> lock(mutex_);
            stop_ = true;
        }
        condition_.notify_all();
        for (std::thread& t : threads_) {
            t.join();
        }
    }

    template <typename F, typename... Args>
    void enqueue(F&& f, Args&&... args) {
        auto task = std::make_shared<std::function<void()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
        {
            std::unique_lock<std::mutex> lock(mutex_);
            tasks_.push([task]{ (*task)(); });
        }
        condition_.notify_one();
    }

private:
    void threadFunc() {
        while (true) {
            std::function<void()> task;
            {
                std::unique_lock<std::mutex> lock(mutex_);
                condition_.wait(lock, [this]{
                    return stop_ || !tasks_.empty();
                });
                if (stop_ && tasks_.empty()) {
                    break;
                }
                task = std::move(tasks_.front());
                tasks_.pop();
            }
            task();
        }
    }

    std::vector<std::thread> threads_;
    std::queue<std::function<void()>> tasks_;
    std::mutex mutex_;
    std::condition_variable condition_;
    std::atomic_bool stop_;
};

void fun(int param) {
    std::cout << "Task #" << param << " running on thread " << std::this_thread::get_id() << std::endl;
}

int main() {
    ThreadPool pool(4);
    for (int i = 0; i < 8; ++i) {
        pool.enqueue(fun, i);
    }
    return 0;
}

4 异常处理和资源管理

在多线程程序中,异常处理和资源管理也是非常重要的。在多线程环境中,如果线程抛出异常,可能会影响到其他线程的运行。因此,在多线程程序中,需要进行合理的异常处理来确保程序的稳定性。

如果线程中申请了一些资源(例如内存、文件句柄、网络连接等),则需要在适当的时候进行释放,以免资源占用过多导致程序崩溃。

以下是一个使用 std::lock_guard 和 std::shared_ptr 管理资源的例子:

在示例中使用 std::shared_ptr 来管理一个资源,多个线程共享该资源的指针。使用 std::lock_guard 来保护共享资源,避免多个线程同时访问该资源导致数据竞争。在使用 shared_ptr 的情况下,无需手动释放资源,可以避免资源泄漏。同时程序具有更好的可读性和易于维护性。

#include <iostream>
#include <memory>
#include <mutex>

class Resource {
public:
    Resource() {
        std::cout << "Resource acquired." << std::endl;
    }

    ~Resource() {
        std::cout << "Resource released." << std::endl;
    }

    void DoSomething() {
        std::cout << "Doing something with the resource." << std::endl;
    }
};
std::mutex mtx;

void ThreadFunc(std::shared_ptr<Resource> ptr) {
    std::lock_guard<std::mutex> lock(mtx);
    ptr->DoSomething();
}

int main() {
    std::shared_ptr<Resource> ptr(new Resource);
    std::thread t1(ThreadFunc, ptr);
    std::thread t2(ThreadFunc, ptr);
    t1.join();
    t2.join();
    return 0;
}

5 性能调优方法

在编写多线程程序时,需要注意一些性能问题。以下是一些常用的性能调优方法:

  • 使用高效的数据结构,例如 std::vector 和 std::map;
  • 避免过度使用锁;
  • 将多个小任务合并为一个较大的只需计算一次的任务;
  • 尽量避免繁重的内存分配;
  • 选择合适的调度算法。

在调度算法方面,例如使用 work stealing 任务窃取算法,避免某些线程的工作比其他线程更繁重,从而导致 CPU 核心的利用率不均衡。以下是一个使用 C++17 的 std::jthread 实现 work stealing 算法的例子:

在示例中使用 std::deque 存储任务队列,使用 std::mutex 保护共享数据。线程池支持将任务推入队列中,由线程池中的线程抢占式地从任务队列中获取任务并执行。如果某个线程的任务执行完毕,就会去其他线程队列中偷取任务执行,以避免某些线程工作量过大导致 CPU 资源利用率不均衡。

#include <iostream>
#include <thread>
#include <vector>
#include <deque>
#include <mutex>

class work_stealing_thread_pool {
public:
    using task_type = std::function<void()>;

    explicit work_stealing_thread_pool(std::size_t num_threads = std::thread::hardware_concurrency());
    ~work_stealing_thread_pool();

    template <typename F>
    void submit(F f) {
        tasks_.push_front(task_type(f));
    }

private:
    task_type get_task_from_local_queue();
    task_type get_task_from_other_thread_queue();
    task_type get_task_from_pool_queue();

    std::vector<std::jthread> threads_;
    std::deque<task_type> tasks_;
    std::mutex mutex_;
};

work_stealing_thread_pool::work_stealing_thread_pool(std::size_t num_threads) {
    for (std::size_t i = 0; i < num_threads; ++i) {
        threads_.emplace_back([this](){
            while (true) {
                task_type task;
                if ((task = get_task_from_local_queue()) ||
                    (task = get_task_from_other_thread_queue()) ||
                    (task = get_task_from_pool_queue())) {
                    task();
                } else {
                    break;
                }
            }
        });
    }
}

work_stealing_thread_pool::~work_stealing_thread_pool() {
    for (std::jthread& t : threads_) {
        t.join();
    }
}

work_stealing_thread_pool::task_type work_stealing_thread_pool::get_task_from_local_queue() {
    if (tasks_.empty()) {
        return task_type();
    }
    task_type task = tasks_.front();
    tasks_.pop_front();
    return task;
}

work_stealing_thread_pool::task_type work_stealing_thread_pool::get_task_from_other_thread_queue() {
    for (std::size_t i = 0; i < threads_.size(); ++i) {
        std::size_t index = (i + 1 + threads_.size()) % threads_.size();
        if (task_type task = threads_[index].get().tasks_.front()) {
            threads_[index].get().tasks_.pop_front();
            return task;
        }
    }
    return task_type();
}

work_stealing_thread_pool::task_type work_stealing_thread_pool::get_task_from_pool_queue() {
    std::lock_guard<std::mutex> lock(mutex_);
    if (tasks_.empty()) {
        return task_type();
    }
    task_type task = tasks_.back();
    tasks_.pop_back();
    return task;
}

int main() {
    work_stealing_thread_pool pool;
    for (int i = 0; i < 10; ++i) {
        pool.submit([i](){
            std::cout << "Task #" << i << " running on thread " << std::this_thread::get_id() << std::endl;
        });
    }
    return 0;
}

四、 并行算法和并行计算

在现代计算机系统中,多核 CPU、GPU 和分布式计算成为了越来越主流的架构。并行计算和并行算法成为了程序员们必备的技能之一。

下面将会介绍并行计算和并行算法的基本概念和原理,以及如何在C++11中使用 std::execution 来调用并行算法。同时,我们还会深入探讨 OpenMP 编写并行程序,CUDA 编程和 GPU 加速,以及分布式计算和云计算技术。

1 并行计算和并行算法的基本概念和原理

在并行计算中,多个处理节点并发执行计算任务,从而提高计算效率。并行计算有以下核心概念:

1.1 程序并发性

程序并发性指的是一个程序中多个操作可以同时执行的程度。并发性越高,程序的计算效率就越高。

1.2 数据并行性

数据并行性指的是将数据划分为多个子集,多个处理节点并行地处理每个子集的能力。

1.3 任务并行性

任务并行性指的是可以并行执行的任务数量。

1.4 同步和通信

并行计算需要处理节点之间的同步和通信。如果处理节点之间需要共享数据,则需要进行同步和通信。

2 C++标准库中的并行算法

C++11 引入了一组并行算法可以使用多个线程并发执行算法。这些算法与串行算法的接口相同,通过在线程池中调度任务来实现并行执行。

C++ 标准库中的并行算法按照使用方式可以分为以下三类:

2.1 顺序算法

顺序算法指的是按顺序在单个线程中执行的算法。

#include <algorithm>
#include <vector>

std::vector<int> v{1, 2, 3, 4, 5};
std::for_each(v.begin(), v.end(), [](int& x){ x *= 2; });

2.2 并行算法

并行算法指的是将算法并行执行的标准库函数。例如,在多处理器系统上并行执行 std::for_each 算法。

#include <algorithm>
#include <execution>
#include <vector>

std::vector<int> v { 1, 2, 3, 4, 5 };
std::for_each(std::execution::par, v.begin(), v.end(), [](int& x){ x *= 2; });

2.3 并行算法的执行策略

执行策略指的是指定算法在哪个线程上执行的方法,可以使用 std::execution::seq、std::execution::par 和 std::execution::par_unseq 来选择不同的执行策略。

  • std::execution::seq:顺序执行
  • std::execution::par:并行执行
  • std::execution::par_unseq:即使在非数值序列上也并行处理

3 使用OpenMP编写并行程序

OpenMP 是一种基于共享内存的并行编程模型,其目的是在性能和可移植性之间找到平衡。OpenMP 使用编译指示和运行时库来实现并行计算。

以下是使用 OpenMP 编写并行程序的示例:

在示例中使用 #pragma omp parallel 指令指示编译器并行运行代码块。通过 num_threads 参数可以设置并行运行的线程数。

#include <iostream>
#include <omp.h>

int main() {
    #pragma omp parallel num_threads(4)
    {
        int tid = omp_get_thread_num();
        std::cout << "Hello World from thread #" << tid << std::endl;
    }
    return 0;
}

4 CUDA编程和GPU加速

CUDA 是 NVIDIA 公司推出的通用并行计算平台和编程模型,支持在 NVIDIA 的 GPU 上进行并行计算。CUDA 编程通过执行大量线程来实现并行计算。

以下是使用 CUDA 编写矩阵加法的示例:

在示例中使用 global 关键字声明一个 CUDA 内核函数 matrixAdd。在主函数中,通过调用 cudaMalloc 分配内存并将数据传输到 GPU 上处理,然后执行 kernel 函数,通过调用 cudaFree 释放内存。

__global__ void matrixAdd(float* A, float* B, float* C, int n) {
    int i = blockIdx.x * blockDim.x + threadIdx.x;
    int j = blockIdx.y * blockDim.y + threadIdx.y;
    if (i < n && j < n) {
        int idx = i * n + j;
        C[idx] = A[idx] + B[idx];
    }
}

int main() {
    int n = 1024;
    float *A, *B, *C;
    cudaMalloc((void**)&A, n * n * sizeof(float));
    cudaMalloc((void**)&B, n * n * sizeof(float));
    cudaMalloc((void**)&C, n * n * sizeof(float));

    dim3 threadsPerBlock(16, 16);
    dim3 numBlocks(n / threadsPerBlock.x, n / threadsPerBlock.y);
    matrixAdd<<<numBlocks, threadsPerBlock>>>(A, B, C, n);

    cudaFree(A);
    cudaFree(B);
    cudaFree(C);
    return 0;
}

5 分布式计算和云计算技术

分布式计算指的是利用多个计算机协同工作来解决一个单一的问题。计算机之间可以通过网络进行通信,协同完成计算任务。

云计算是一种基于互联网的计算模型,通过远程服务器来提供共享计算资源,包括硬件、软件、网络资源和服务等。云计算可以分为公共云、私有云和混合云等几种类型。

分布式计算和云计算可以显著提高计算能力,同时也带来了一些新的问题和挑战,例如网络安全、数据传输等等。因此,在使用分布式计算和云计算技术时,需要注意安全和可靠性等问题。

五、 多线程编程实例

在现代软件开发中,多线程编程已成为必不可少的技能之一。多线程技术可以通过充分利用 CPU 和其他资源来提高程序的性能。

下面将会介绍 c++ 中使用多线程的实例,包括多线程下载器、并行映射/归约算法、任务调度器、生产者消费者模型和并行搜索算法等。

1 多线程下载器

多线程下载器可以通过多线程技术来提高文件下载速度。通过启动多个线程同时下载单个文件,可以充分利用网络带宽,加快下载速度,提高用户体验。

以下是使用 c++ 实现多线程下载器的示例:

在示例中使用 libcurl 库来处理 HTTP 请求和响应,同时使用 c++ 中的多线程技术来下载文件。我们将文件分成多个块,每个块启动一个线程来下载,最后通过合并块的方式合并文件。使用多线程下载器可以充分利用网络带宽,加快文件下载速度。

#include <iostream>
#include <fstream>
#include <thread>
#include <vector>
#include <curl/curl.h>

struct Range {
    long long start;
    long long end;
};

void download_range(const std::string& url, const Range& range, std::ofstream& out_file) {
    std::string range_str = std::to_string(range.start) + "-" + std::to_string(range.end);
    CURL* curl = curl_easy_init();
    if (curl) {
        curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
        curl_easy_setopt(curl, CURLOPT_RANGE, range_str.c_str());
        curl_easy_setopt(curl, CURLOPT_WRITEDATA, &out_file);
        curl_easy_perform(curl);
        curl_easy_cleanup(curl);
    }
}

void download_file(const std::string& url, const std::string& filename, int num_threads) {
    CURL* curl = curl_easy_init();
    if (curl) {
        curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
        curl_easy_setopt(curl, CURLOPT_NOBODY, 1L);
        curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L);
        curl_easy_setopt(curl, CURLOPT_MAXREDIRS, 10L);
        long long file_size = 0;
        curl_easy_getinfo(curl, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &file_size);
        curl_easy_cleanup(curl);

        std::vector<std::thread> threads(num_threads);
        std::vector<std::ofstream> out_files(num_threads);
        std::vector<Range> ranges(num_threads);
        long long chunk_size = file_size / num_threads;

        for (int i = 0; i < num_threads; ++i) {
            out_files[i].open(filename + "." + std::to_string(i), std::ios::out | std::ios::binary);
            ranges[i].start = i * chunk_size;
            ranges[i].end = (i == num_threads - 1) ? (file_size - 1) : (ranges[i].start + chunk_size - 1);
            threads[i] = std::thread(download_range, url, ranges[i], std::ref(out_files[i]));
        }

        for (int i = 0; i < num_threads; ++i) {
            threads[i].join();
            out_files[i].close();
        }

        std::ofstream out_file(filename, std::ios::out | std::ios::binary);
        for (int i = 0; i < num_threads; ++i) {
            std::ifstream in_file(filename + "." + std::to_string(i), std::ios::in | std::ios::binary);
            out_file << in_file.rdbuf();
            in_file.close();
            remove((filename + "." + std::to_string(i)).c_str());
        }
        out_file.close();
    }
}

int main() {
    std::string url = "https://www.example.com/example_file.zip";
    std::string filename = "example_file.zip";
    int num_threads = 4;
    download_file(url, filename, num_threads);
    return 0;
}

2 并行映射/归约算法

并行映射/归约算法是将映射和归约两个步骤并行执行的算法,可以用于大数据集的分析和处理。

以下是使用 c++ 实现并行映射/归约算法的示例:

在示例中使用 c++ 的标准库函数 std::accumulate 来实现归约操作,使用多线程技术实现映射操作。我们将数据集分成多个块,每个块启动一个线程来计算部分和,最后通过合并部分和得到总和。使用并行映射/归约算法可以大大提高数据处理的效率。

#include <iostream>
#include <vector>
#include <numeric>
#include <thread>
#include <algorithm>

void map_reduce(const std::vector<int>& data, int& result) {
    int num_threads = std::thread::hardware_concurrency();
    std::vector<int> partial_results(num_threads);
    int chunk_size = data.size() / num_threads;

    std::vector<std::thread> threads(num_threads);

    for (int i = 0; i < num_threads; ++i) {
        threads[i] = std::thread([&data, &partial_results, i, chunk_size]() {
            int start = i * chunk_size;
            int end = (i == num_threads - 1) ? data.size() : start + chunk_size;
            partial_results[i] = std::accumulate(data.begin() + start, data.begin() + end, 0);
        });
    }

    for (int i = 0; i < num_threads; ++i) {
        threads[i].join();
    }

    result = std::accumulate(partial_results.begin(), partial_results.end(), 0);
}

int main() {
    std::vector<int> data = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
    int result = 0;
    map_reduce(data, result);
    std::cout << "The sum is: " << result << std::endl;
    return 0;
}

3 任务调度器

任务调度器是一种用于管理和分配任务的软件,可以将任务自动分配给可用的处理器或执行单元。任务调度器可以使程序更加灵活、可扩展和高效。

以下是使用 c++ 实现任务调度器的示例:

在示例中使用了 c++ 的线程、互斥锁、条件变量和 lambda 表达式等技术来实现任务调度。我们定义了一个任务队列和一个线程池,并使用条件变量来实现线程的等待和唤醒。我们使用 submit() 函数向任务队列提交任务,可以异步执行任务。我们还使用 wait() 函数来等待所有任务完成。使用任务调度器可以有效地管理任务的执行,并提高任务处理的效率和灵活性。

#include <iostream>
#include <thread>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>

class task_queue {
public:
    void push(std::function<void()> f) {
        {
            std::unique_lock<std::mutex> lock(q_mutex_);
            queue_.push(f);
        }
        q_condition_.notify_one();
    }

    std::function<void()> pop() {
        std::unique_lock<std::mutex> lock(q_mutex_);
        q_condition_.wait(lock, [this] { return !queue_.empty(); });
        auto f = queue_.front();
        queue_.pop();
        return f;
    }

private:
    std::queue<std::function<void()>> queue_;
    std::mutex q_mutex_;
    std::condition_variable q_condition_;
};

class task_scheduler {
public:
    task_scheduler() {
        for (int i = 0; i < num_threads_; ++i) {
            threads_.push_back(std::thread([&] { worker_thread(); }));
        }
    }

    ~task_scheduler() {
        stop_ = true;
        cv_.notify_all();
        for (auto& t: threads_) {
            t.join();
        }
    }

    void submit(std::function<void()> f) {
        task_queue_.push(f);
    }

    void wait() {
        std::unique_lock<std::mutex> lock(cv_mutex_);
        cv_.wait(lock, [&] { return task_queue_.empty() && active_tasks_ == 0; });
    }

private:
    void worker_thread() {
        std::function<void()> f;
        while (true) {
            {
                std::unique_lock<std::mutex> lock(cv_mutex_);
                cv_.wait(lock, [&] { return !task_queue_.empty() || stop_; });
                if (stop_ && task_queue_.empty()) {
                    return;
                }
                ++active_tasks_;
            }
            if (!task_queue_.empty()) {
                f = task_queue_.pop();
                f();
            }
            {
                std::unique_lock<std::mutex> lock(cv_mutex_);
                --active_tasks_;
                cv_.notify_all();
            }
        }
    }

    int num_threads_ = std::thread::hardware_concurrency();
    std::vector<std::thread> threads_;
    task_queue task_queue_;
    std::atomic<bool> stop_ = false;
    std::condition_variable cv_;
    std::mutex cv_mutex_;
    std::atomic<int> active_tasks_{0};
};

int main() {
    task_scheduler ts;
    int sum1 = 0, sum2 = 0;
    ts.submit([&sum1] { sum1 = 0; for (int i = 0; i < 1000; ++i) sum1 += i; });
    ts.submit([&sum2] { sum2 = 0; for (int i = 1000; i < 2000; ++i) sum2 += i; });
    ts.wait();
    std::cout << "Sum: " << sum1 + sum2 << std::endl;
    return 0;
}

4 生产者消费者模型

生产者消费者模型是一种常见的并发编程模式,用于解决生产者和消费者之间的同步问题。生产者和消费者可以是线程、进程或网络节点等互相独立的实体。

以下是使用 c++ 实现生产者消费者模型的示例:

在示例中使用了 c++ 的互斥锁、条件变量和队列等技术来实现生产者消费者模型。我们定义了一个缓冲区类,其中包含向缓冲区添加和获取元素的方法。我们使用条件变量来进行等待和唤醒,以确保生产者和消费者之间的同步。我们还定义了一个生产者类和一个消费者类,并在主函数中启动两个线程来运行它们。使用生产者消费者模型可以有效地解决生产者和消费者之间的同步问题,并提高程序的并发和性能。

#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>

class buffer {
public:
    void add(int value) {
        std::unique_lock<std::mutex> lock(mutex_);
        while (queue_.size() >= capacity_) {
            full_condition_.wait(lock);
        }
        queue_.push(value);
        empty_condition_.notify_one();
    }

    int get() {
        std::unique_lock<std::mutex> lock(mutex_);
        while (queue_.empty()) {
            empty_condition_.wait(lock);
        }
        int value = queue_.front();
        queue_.pop();
        full_condition_.notify_one();
        return value;
    }

private:
    std::queue<int> queue_;
    int capacity_ = 10;
    std::mutex mutex_;
    std::condition_variable empty_condition_;
    std::condition_variable full_condition_;
};

class producer {
public:
    producer(buffer& buffer) : buffer_(buffer) {}

    void run() {
        for (int i = 1; i <= 100; ++i) {
            buffer_.add(i);
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
        }
    }

private:
    buffer& buffer_;
};

class consumer {
public:
    consumer(buffer& buffer) : buffer_(buffer) {}

    void run() {
        for (int i = 1; i <= 100; ++i) {
            int value = buffer_.get();
            std::cout << "Consumer: " << value << std::endl;
            std::this_thread::sleep_for(std::chrono::milliseconds(500));
        }
    }

private:
    buffer& buffer_;
};

int main() {
    buffer buffer;
    producer p(buffer);
    consumer c(buffer);
    std::thread producer_thread(&producer::run, &p);
    std::thread consumer_thread(&consumer::run, &c);
    producer_thread.join();
    consumer_thread.join();
    return 0;
}

5 并行搜索算法

并行搜索算法是一种用于在多个处理器或计算机上并行执行搜索的算法,可以提高搜索效率和减少计算时间。

以下是使用 c++ 实现并行搜索算法的示例:

在示例中使用了 c++ 的原子变量、线程和递归等技术来实现并行搜索算法。我们定义了一个 is_solution() 函数来判断是否为解决方案,使用 search() 函数来进行搜索,并使用原子变量来确保在找到解决方案后立即停止。我们还定义了一个 parallel_search() 函数来启动多个线程来进行并行搜索,并将结果合并。使用并行搜索算法可以大大加快搜索的速度,减少计算时间。

#include <iostream>
#include <thread>
#include <vector>
#include <atomic>

bool is_solution(const std::vector<int>& data) {
    int n = data.size();
    int sum = 0;
    for (int i = 0; i < n; ++i) {
        sum += data[i];
    }
    return sum == n * (n + 1) / 2;
}

void search(std::vector<int> data, std::vector<bool>& used, int level, std::vector<std::vector<int>>& results, std::atomic<bool>& found) {
    if (found) return;
    if (level == data.size() && is_solution(data)) {
        found = true;
        results.push_back(data);
        return;
    }
    for (int i = 0; i < data.size(); ++i) {
        if (!used[i]) {
            data[level] = i + 1;
            used[i] = true;
            if (found) return;
            std::thread t(search, data, std::ref(used), level + 1, std::ref(results), std::ref(found));
            t.join();
            used[i] = false;
        }
    }
}

std::vector<std::vector<int>> parallel_search(int n, int num_threads) {
    std::atomic<bool> found = false;
    std::vector<std::vector<int>> results;
    std::vector<int> data(n);
    std::vector<bool> used(n, false);
    std::vector<std::thread> threads(num_threads);
    for (int i = 0; i < num_threads; ++i) {
        threads[i] = std::thread(search, data, std::ref(used), 0, std::ref(results), std::ref(found));
    }
    for (int i = 0; i < num_threads; ++i) {
        threads[i].join();
    }
    return results;
}

int main() {
    std::vector<std::vector<int>> results = parallel_search(6, 4);
    std::cout << "Results found: " << results.size() << std::endl;
    for (auto& r: results) {
        std::cout << "Solution: ";
        for (auto& e: r) {
            std::cout << e << " ";
        }
        std::cout << std::endl;
    }
    return 0;
}
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐