原子类型

C++11中实现的Atomic类型是通过storeload 这两个CPU 指令进行数据存取(寄存器和内存之间)的,并且额外接收一个内存序列 (Memory Order)作为参数。C++11支持6种内存排序约束。

Rust的多线程内存模型来源于C++11, 是基于LLVM实现的,所以Rust通过LLVM原子内存排序约束来实现不同级别的原子性。

线程同步互斥

不论哪种语言,都是基于操作系统提供的4种机制:

  • 临界区: lock
  • 互斥量: mutex
  • 信号量
  • 事件: C++(condition_variable;promise),Go(chan),Rust(mpsc::sync_channel)中的条件变量

条件变量

条件变量 wait(用于挂起等待) 和notify (使用条件变量来阻塞线程,降低CPU占用率)

条件变量是多线程程序中用来实现等待和唤醒逻辑常用的方法。通常有wait和notify两个动作,wait用于阻塞挂起线程A,直到另一个线程B通过通过notify唤醒线程A,唤醒后线程A会继续运行。

条件变量在多线程中很常用,在有名的生产者和消费者问题中,消费者如何知道生产者是否生产出了可以消费的产品,通过while循环不停的去判断是否有可消费的产品?众所周知,死循环极其消耗CPU性能,所以需要使用条件变量来阻塞线程,降低CPU占用率。

用法: (FIXME:注意这段代码存在bug)

std::mutex mutex;
std::condition_variable cv;
std::vector<int> vec;

void Consume() {
  std::unique_lock<std::mutex> lock(mutex);
  cv.wait(lock);
  std::cout << "consume " << vec.size() << "\n";
}

void Produce() {
  std::unique_lock<std::mutex> lock(mutex);
  vec.push_back(1);
  cv.notify_all();
  std::cout << "produce \n";
}

int main() {
  std::thread t(Consume);
  t.detach();
  Produce();
  return 0;
}

信号丢失问题

如果先执行的Produce(),后执行的Consume(),生产者提前生产出了数据,去通知消费者,但是此时消费者线程如果还没有执行到wait语句,即线程还没有处于挂起等待状态,线程没有等待此条件变量上,那通知的信号就丢失了,后面Consume()中才执行wait处于等待状态,但此时生产者已经不会再触发notify,那消费者线程就会始终阻塞下去,出现bug。

修复信号丢失问题: FIXME:

std::mutex mutex;
std::condition_variable cv;
std::vector<int> vec;

void Consumer() {
  std::unique_lock<std::mutex> lock(mutex);
  if (vec.empty()) { //加入此判断条件
      cv.wait(lock);
  }
  std::cout << "consumer " << vec.size() << "\n";
}

void Produce() {
  std::unique_lock<std::mutex> lock(mutex);
  vec.push_back(1);
  cv.notify_all();
  std::cout << "produce \n";
}

int main() {
  std::thread t(Consumer);
  t.detach();
  Produce();
  return 0;
}

虚假唤醒问题

通过增加附加条件可以解决信号丢失的问题,但这里还有个地方需要注意,消费者线程处于wait阻塞状态时,即使没有调用notify,操作系统也会有一些概率会唤醒处于阻塞的线程,使其继续执行下去,这就是虚假唤醒问题,当出现了虚假唤醒后,消费者线程继续执行,还是没有可以消费的数据,出现了bug。

解决虚假唤醒问题:

std::mutex mutex;
std::condition_variable cv;
std::vector<int> vec;

void Consumer() {
  std::unique_lock<std::mutex> lock(mutex);
  while (vec.empty()) { // 将if改为while
      cv.wait(lock);
  }
  std::cout << "consumer " << vec.size() << "\n";
}

void Produce() {
  std::unique_lock<std::mutex> lock(mutex);
  vec.push_back(1);
  cv.notify_all();
  std::cout << "produce \n";
}

int main() {
  std::thread t(Consumer);
  t.detach();
  Produce();
  return 0;
}

使用while循环附加判断条件来解决条件变量的信号丢失和虚假唤醒问题。

使用 lambda

在C++中其实有更好的封装,只需要调用wait函数时,在参数中直接添加附加条件就好了,内部已经做好了while循环判断,直接使用即可,见代码:

std::mutex mutex;
std::condition_variable cv;
std::vector<int> vec;

void Consumer() {
  std::unique_lock<std::mutex> lock(mutex);
  cv.wait(lock, [&](){ return !vec.empty(); }); // 这里可以直接使用C++的封装
  std::cout << "consumer " << vec.size() << "\n";
}

void Produce() {
  std::unique_lock<std::mutex> lock(mutex);
  vec.push_back(1);
  cv.notify_all();
  std::cout << "produce \n";
}

int main() {
  std::thread t(Consumer);
  t.detach();
  Produce();
  return 0;
}

条件变量需要和锁配合使用

因为内部是通过判断及修改某个全局变量来决定线程的阻塞与唤醒,多线程操作同一个变量肯定需要加锁来使得线程安全。同时,一个简单的wait函数调用内部会很复杂的,有可能线程A调用了wait函数但是还没有进入到wait阻塞等待前,另一个线程B在此时却调用了notify函数,此时nofity的信号就丢失啦,如果加了锁,线程B必须等待线程A释放了锁并进入了等待状态后才可以调用notify,继而防止信号丢失。


往期精彩回顾:
基础能力系列
区块链知识系列
密码学系列
零知识证明系列
共识系列
公链调研系列
BTC系列
以太坊系列
EOS系列
Filecoin系列
联盟链系列
Fabric系列
智能合约系列
Token系列
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐