【Linux系统编程】22. 线程同步与互斥(上)
文章目录
一、线程互斥
1、进程线程间的互斥相关概念
- 临界资源: 多执行流共享的资源
- 临界区: 线程中访问临界资源的代码片段
- 互斥: 同一时刻仅一个执行流进入临界区,保护共享资源
- 原子性: 操作不可被中断,要么完整执行,要么完全不执行
2、互斥量 mutex
样例:操作共享变量会有问题的售票系统代码
#include <cstdio>
#include <cstdlib>
#include <unistd.h>
#include <pthread.h>
int ticket = 100;
void *route(void *arg)
{
char *id = (char *)arg;
while(1)
{
if(ticket>0)
{
usleep(1000);
printf("%s selles ticket: %d\n", id, ticket);
ticket--;
}
else
{
break;
}
}
return nullptr;
}
int main()
{
pthread_t t1, t2, t3, t4;
pthread_create(&t1, NULL, route, (void *)"thread 1");
pthread_create(&t2, NULL, route, (void *)"thread 2");
pthread_create(&t3, NULL, route, (void *)"thread 3");
pthread_create(&t4, NULL, route, (void *)"thread 4");
pthread_join(t1, NULL);
pthread_join(t2, NULL);
pthread_join(t3, NULL);
pthread_join(t4, NULL);
return 0;
}
运行结果:
我们发现最后抢票出现了负数,这肯定是有问题的!
1. 问题根源:共享变量的并发操作
- 局部变量归线程私有,不会有并发问题;但共享变量会被多个线程同时访问。
- 售票系统中,多个线程同时操作共享变量 ticket,导致出现了卖出 0、-1、-2 等非法票数。
2. 为什么出错?
ticket--不是原子操作,它会被拆成三条汇编指令:load:从内存把 ticket 读到寄存器update:寄存器中执行 -1store:把结果写回内存
- 线程切换可能发生在这三步之间,导致多个线程同时读到相同的
ticket值,最终结果出错。 - if 判断 + usleep 放大了问题:多个线程同时进入了临界区。
3. 解决思路:引入互斥(锁)
- 必须保证:同一时刻只有一个线程进入临界区操作共享变量。
- 本质就是需要一把锁,Linux 中提供的就是互斥量(mutex)。
- 线程在进入临界区前
lock,离开后unlock,从而实现互斥访问。
3、mutex 接⼝
1)pthread_mutex_init 函数
静态初始化: pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
2)pthread_mutex_destroy 函数
3)pthread_mutex_lock 函数
4)pthread_mutex_unlock 函数
改进上⾯的售票系统:
#include <cstdio>
#include <cstdlib>
#include <unistd.h>
#include <pthread.h>
int ticket = 100;
pthread_mutex_t mutex;
void *route(void *arg)
{
char *id = (char *)arg;
while (1)
{
pthread_mutex_lock(&mutex);
if (ticket > 0)
{
usleep(1000);
printf("%s selles ticket: %d\n", id, ticket);
ticket--;
pthread_mutex_unlock(&mutex);
}
else
{
pthread_mutex_unlock(&mutex);
break;
}
}
return nullptr;
}
int main()
{
pthread_t t1, t2, t3, t4;
pthread_mutex_init(&mutex, NULL);
pthread_create(&t1, NULL, route, (void *)"thread 1");
pthread_create(&t2, NULL, route, (void *)"thread 2");
pthread_create(&t3, NULL, route, (void *)"thread 3");
pthread_create(&t4, NULL, route, (void *)"thread 4");
pthread_join(t1, NULL);
pthread_join(t2, NULL);
pthread_join(t3, NULL);
pthread_join(t4, NULL);
pthread_mutex_destroy(&mutex);
return 0;
}
结果:
4、互斥量原理
i++/++i本质是读 - 改 - 写三条指令,不是原子操作,多线程下会出现数据不一致。- 互斥锁的底层依赖CPU 原子交换指令(swap/exchange),单指令保证原子性;多处理器下总线会串行执行,天然实现互斥。
互斥锁 lock /unlock 极简伪代码:
// 全局锁变量:0=未上锁,1=已上锁
int mutex = 0;
// 加锁(原子交换实现)
void lock() {
while(原子交换(mutex, 1) == 1); // 交换为1,旧值=1则自旋等待
}
// 解锁
void unlock() {
mutex = 0; // 直接置0
}
5、互斥量封装
1)Mutex.hpp
#pragma once
#include <iostream>
#include <pthread.h>
namespace MutexModule
{
// 互斥锁封装类
class Mutex
{
public:
// 初始化锁
Mutex()
{
pthread_mutex_init(&_mutex, nullptr);
}
// 加锁
void Lock()
{
int n = pthread_mutex_lock(&_mutex);
}
// 解锁
void Unlock()
{
int n = pthread_mutex_unlock(&_mutex);
}
// 获取锁指针
pthread_mutex_t *Get()
{
return &_mutex;
}
// 销毁锁
~Mutex()
{
pthread_mutex_destroy(&_mutex);
}
private:
pthread_mutex_t _mutex; // 原生互斥锁对象
};
// RAII 自动锁
class LockGuard
{
public:
LockGuard(Mutex &mutex)
: _mutex(mutex)
{
_mutex.Lock();
}
~LockGuard()
{
_mutex.Unlock();
}
private:
Mutex &_mutex;
};
}
更新抢票代码:
#include <cstdio>
#include <cstdlib>
#include <unistd.h>
#include <pthread.h>
#include "Mutex.hpp"
using namespace MutexModule;
int ticket = 100;
Mutex mutex;
void *route(void *arg)
{
char *id = (char *)arg;
while (1)
{
LockGuard lockguard(mutex);
if (ticket > 0)
{
usleep(1000);
printf("%s selles ticket: %d\n", id, ticket);
ticket--;
}
else
{
break;
}
}
return nullptr;
}
int main()
{
pthread_t t1, t2, t3, t4;
pthread_create(&t1, NULL, route, (void *)"thread 1");
pthread_create(&t2, NULL, route, (void *)"thread 2");
pthread_create(&t3, NULL, route, (void *)"thread 3");
pthread_create(&t4, NULL, route, (void *)"thread 4");
pthread_join(t1, NULL);
pthread_join(t2, NULL);
pthread_join(t3, NULL);
pthread_join(t4, NULL);
return 0;
}
测试:
RAII 风格互斥锁(C++11)
std::mutex mtx;
{
// 构造时自动加锁 lock()
std::lock_guard<std::mutex> guard(mtx);
// 临界区代码(线程安全)
}
// 离开作用域,析构时自动解锁 unlock()
二、线程同步
1、条件变量
线程访问共享资源时,若资源不满足条件(如队列为空),无法继续操作,需条件变量让线程等待,直到其他线程唤醒。
2、同步概念与竞态条件
- 同步: 保证安全的前提下,让线程按特定顺序访问资源,避免饥饿。
- 竞态条件: 因执行时序不确定,导致程序运行异常。
3、条件变量接口
1)pthread_cond_init 函数
静态初始化:pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
2)pthread_cond_destroy 函数
3)pthread_cond_wait 函数
4)pthread_cond_broadcast 函数
5)pthread_cond_signal 函数
测试样例:
#include <iostream>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
// 静态初始化条件变量 + 互斥锁
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
// 等待被唤醒的线程
void *active(void *arg)
{
std::string name = static_cast<const char *>(arg);
while (true)
{
pthread_mutex_lock(&mutex);
// 等待唤醒,期间自动释放锁
pthread_cond_wait(&cond, &mutex);
std::cout << name << " 活动..." << std::endl;
pthread_mutex_unlock(&mutex);
}
}
int main()
{
pthread_t t1, t2; // 出参
// 创建两个等待线程
pthread_create(&t1, NULL, active, (void *)"thread-1");
pthread_create(&t2, NULL, active, (void *)"thread-2");
sleep(3);
// 每隔1秒广播唤醒所有
while (true)
{
pthread_cond_broadcast(&cond);
// pthread_cond_signal(&cond);
sleep(1);
}
pthread_join(t1, NULL);
pthread_join(t2, NULL);
return 0;
}
结果:
4、⽣产者消费者模型
1)概念
321原则
- 3种关系: 生产者-生产者(互斥)、消费者-消费者(互斥) 、生产者-消费者(互斥+同步)
- 2类角色: 生产者、消费者
- 1个缓冲区: 共享队列 / 共享资源
2)为何要使⽤⽣产者消费者模型
用一个队列,给生产者和消费者解耦,平衡双方处理速度
- 生产者和消费者不直接通信,只通过队列交互
- 生产者生产完数据直接丢队列,不用等消费者处理
- 消费者直接从队列取数据,不用主动找生产者要
- 队列充当缓冲区,抹平双方速度差异,避免一方被另一方拖垮
3)⽣产者消费者模型三大优点
- 解耦: 生产者和消费者不直接通信,都只和中间仓库(队列)打交道,互不依赖。
- 支持并发: 生产者和消费者可以同时运行,不用互相等,效率更高。
- 平衡忙闲不均: 仓库作为缓冲区,能抹平双方速度差异,避免一方快、一方慢导致的性能瓶颈。
5、基于阻塞队列的⽣产者消费者模型
1)阻塞队列(BlockingQueue)
阻塞队列 = 自带同步逻辑的队列,专门为生产者消费者模型而生
它和普通队列的核心区别:
- 队列为空时: 消费者线程调用
take会自动阻塞,直到队列里有数据 - 队列已满时: 生产者线程调用
put会自动阻塞,直到队列有空位
2)使用 queue 模拟阻塞队列的⽣产者消费者模型
a)BlockQueue.hpp(原生版本)
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>
const int defaultcap = 5;
template <typename T>
class BlockQueue
{
private:
// 判断队列是否满/空
bool isFull() { return _q.size() >= _cap; }
bool isEmpty() { return _q.empty(); }
public:
BlockQueue(int cap = defaultcap)
: _cap(cap),
_csleep_num(0),
_psleep_num(0)
{
// 初始化锁+条件变量
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_full_cond, nullptr);
pthread_cond_init(&_empty_cond, nullptr);
}
// 生产者: 入队
void Equeue(const T &in)
{
pthread_mutex_lock(&_mutex); // 加锁
// 队列满, 生产者休眠等待
while (isFull())
{
_psleep_num++;
std::cout << "生产者休眠, _psleep_num: " << _psleep_num << std::endl;
pthread_cond_wait(&_full_cond, &_mutex); // 阻塞并释放锁, 唤醒后重新抢锁
_psleep_num--;
}
// 放入数据
_q.push(in);
// 有等待的消费者, 唤醒
if (_csleep_num > 0)
{
pthread_cond_signal(&_empty_cond);
std::cout << "唤醒消费者..." << std::endl;
}
pthread_mutex_unlock(&_mutex); // 解锁
}
// 消费者: 出队
T Pop()
{
pthread_mutex_lock(&_mutex); // 加锁
// 队列空, 消费者休眠等待
while (isEmpty())
{
_csleep_num++;
std::cout << "消费者休眠, _csleep_num: " << _csleep_num << std::endl;
pthread_cond_wait(&_empty_cond, &_mutex); // 阻塞并释放锁,唤醒后重新抢锁
_csleep_num--;
}
// 取出数据
T data = _q.front();
_q.pop();
// 有等待的生产者, 唤醒
if (_psleep_num > 0)
{
pthread_cond_signal(&_full_cond);
std::cout << "唤醒生产者..." << std::endl;
}
pthread_mutex_unlock(&_mutex); // 加锁
return data;
}
~BlockQueue()
{
// 销毁锁+条件变量
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_full_cond);
pthread_cond_destroy(&_empty_cond);
}
private:
std::queue<T> _q; // 共享队列
int _cap; // 队列容量
int _csleep_num; // 休眠消费者数量
int _psleep_num; // 休眠生产者数量
pthread_mutex_t _mutex; // 互斥锁
pthread_cond_t _full_cond; // 队列满->生产者等待
pthread_cond_t _empty_cond; // 队列空->消费者等待
};
Task.hpp:
#pragma once
#include <iostream>
#include <unistd.h>
#include <functional>
// 无参无返回值任务类型
using task_t = std::function<void()>;
// 下载任务
void Download()
{
std::cout << "下载任务执行..." << std::endl;
sleep(3);
}
// 计算任务(两数相加)
class Task
{
public:
Task() {}
Task(int x, int y) : _x(x), _y(y) {}
// 执行任务
void Execute() { _result = _x + _y; }
int X() { return _x; }
int Y() { return _y; }
int Result() { return _result; }
private:
int _x;
int _y;
int _result;
};
测试用例:
1、单生产单消费:
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <iostream>
// 消费者: 从队列取任务并执行
void* consumer(void* args)
{
BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
while(true)
{
Task t = bq->Pop();
t.Execute();
std::cout << "消费任务: " << t.X() << "+" << t.Y() << "=" << t.Result() << std::endl;
}
}
// 生产者: 往队列投放任务
void* productor(void* args)
{
int x = 1, y = 1;
BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
while(true)
{
sleep(1);
std::cout << "生产任务: " << x << "+" << y << "=?" << std::endl;
Task t(x, y);
bq->Equeue(t);
x++, y++;
}
}
int main()
{
// 创建阻塞队列
BlockQueue<Task> *bq = new BlockQueue<Task>();
// 单生产单消费
pthread_t p,c;
pthread_create(&p, nullptr, productor, bq);
pthread_create(&c, nullptr, consumer, bq);
// 等待线程
pthread_join(p, nullptr);
pthread_join(c, nullptr);
delete bq;
return 0;
}
结果:
2、多生产多消费:
#include "BlockQueue.hpp"
#include "Task.hpp"
#include <iostream>
// 消费者: 从队列取任务并执行
void *consumer(void *args)
{
BlockQueue<task_t> *bq = (BlockQueue<task_t> *)(args);
while (true)
{
task_t t = bq->Pop();
t();
}
}
// 生产者: 往队列投放任务
void *productor(void *args)
{
BlockQueue<task_t> *bq = (BlockQueue<task_t> *)(args);
while (true)
{
std::cout << "生产任务: " << std::endl;
bq->Equeue(Download);
sleep(1);
}
}
int main()
{
// 创建阻塞队列
BlockQueue<task_t> *bq = new BlockQueue<task_t>();
// 3个生产者, 2个消费者
pthread_t p[3], c[2];
pthread_create(p, nullptr, productor, bq);
pthread_create(p + 1, nullptr, productor, bq);
pthread_create(p + 2, nullptr, productor, bq);
pthread_create(c, nullptr, consumer, bq);
pthread_create(c + 1, nullptr, consumer, bq);
// 等待线程
pthread_join(p[0], nullptr);
pthread_join(p[1], nullptr);
pthread_join(p[2], nullptr);
pthread_join(c[0], nullptr);
pthread_join(c[1], nullptr);
delete bq;
return 0;
}
结果:
b)BlockQueue.hpp(封装版本)
#pragma once
#include <iostream>
#include <queue>
#include "Mutex.hpp"
#include "Cond.hpp"
using namespace MutexModule;
using namespace CondModule;
const int defaultcap = 5;
template <typename T>
class BlockQueue
{
private:
// 判断队列是否满/空
bool isFull() { return _q.size() >= _cap; }
bool isEmpty() { return _q.empty(); }
public:
BlockQueue(int cap = defaultcap)
: _cap(cap),
_csleep_num(0),
_psleep_num(0)
{
}
// 生产者: 入队
void Equeue(const T &in)
{
LockGuard lockguard(_mutex);
// 队列满, 生产者休眠等待
while (isFull())
{
_psleep_num++;
std::cout << "生产者休眠, _psleep_num: " << _psleep_num << std::endl;
_full_cond.Wait(_mutex); // 阻塞并释放锁, 唤醒后重新抢锁
_psleep_num--;
}
// 放入数据
_q.push(in);
// 有等待的消费者, 唤醒
if (_csleep_num > 0)
{
_empty_cond.Signal();
std::cout << "唤醒消费者..." << std::endl;
}
}
// 消费者: 出队
T Pop()
{
T data;
{
LockGuard lockguard(_mutex);
// 队列空, 消费者休眠等待
while (isEmpty())
{
_csleep_num++;
std::cout << "消费者休眠, _csleep_num: " << _csleep_num << std::endl;
_empty_cond.Wait(_mutex); // 阻塞并释放锁,唤醒后重新抢锁
_csleep_num--;
}
// 取出数据
data = _q.front();
_q.pop();
// 有等待的生产者, 唤醒
if (_psleep_num > 0)
{
_full_cond.Signal();
std::cout << "唤醒生产者..." << std::endl;
}
}
return data;
}
~BlockQueue()
{
}
private:
std::queue<T> _q; // 共享队列
int _cap; // 队列容量
int _csleep_num; // 休眠消费者数量
int _psleep_num; // 休眠生产者数量
Mutex _mutex; // 互斥锁
Cond _full_cond; // 队列满->生产者等待
Cond _empty_cond; // 队列空->消费者等待
};
6、为什么 pthread_cond_wait 需要互斥量?
1. 条件本身依赖共享数据,必须用互斥锁保护
条件变量的判断、修改都离不开共享数据,这些数据必须被互斥锁保护,否则会出现数据竞争,导致条件判断出错。
2. 避免 “判断条件” 和 “等待” 之间的竞态,防止死锁
如果不持有互斥锁,就可能出现:线程刚判断完条件不成立、准备进入等待,另一个线程就修改了条件并发送信号,导致当前线程永远等不到信号,造成死锁。
3. pthread_cond_wait 会原子地释放锁并阻塞,唤醒后再重新持有锁
这一机制确保了等待期间其他线程可以安全修改共享数据,同时唤醒后线程能以安全的状态继续执行后续逻辑。
按照上⾯的说法,我们设计出如下的代码:
pthread_mutex_lock(&mutex);
while (condition_is_false)
{
pthread_mutex_unlock(&mutex);
// 解锁之后,等待之前,条件可能已经满足,信号已经发出,但是该信号可能被错过
pthread_cond_wait(&cond);
pthread_mutex_lock(&mutex);
}
pthread_mutex_unlock(&mutex);
- 代码问题: 解锁 + 等待是两个独立操作,非原子。解锁后、等待前,其他线程可能已修改条件并发送信号,当前线程会错过信号,永久阻塞。
- 正确原理:
pthread_cond_wait内部原子执行:释放互斥锁 + 阻塞等待;被唤醒后自动重新加锁。 - 要求: 解锁和等待必须是原子操作,这也是该函数强制需要互斥量的根本原因。
7、条件变量使⽤规范
• 等待方: 加锁 → while (条件假) 等待 → 改条件 → 解锁
pthread_mutex_lock(&mutex);
while (条件为假)
pthread_cond_wait(cond, mutex);
修改条件
pthread_mutex_unlock(&mutex);
- 通知方: 加锁 → 改条件 → 发信号 → 解锁
pthread_mutex_lock(&mutex);
设置条件为真
pthread_cond_signal(cond);
pthread_mutex_unlock(&mutex);
8、条件变量封装
1)Mutex.hpp
#pragma once
#include <iostream>
#include <pthread.h>
namespace MutexModule
{
// 互斥锁封装类
class Mutex
{
public:
// 初始化锁
Mutex()
{
pthread_mutex_init(&_mutex, nullptr);
}
// 加锁
void Lock()
{
int n = pthread_mutex_lock(&_mutex);
}
// 解锁
void Unlock()
{
int n = pthread_mutex_unlock(&_mutex);
}
// 获取锁指针
pthread_mutex_t *Get()
{
return &_mutex;
}
// 销毁锁
~Mutex()
{
pthread_mutex_destroy(&_mutex);
}
private:
pthread_mutex_t _mutex; // 原生互斥锁对象
};
// RAII 自动锁
class LockGuard
{
public:
LockGuard(Mutex &mutex)
: _mutex(mutex)
{
_mutex.Lock();
}
~LockGuard()
{
_mutex.Unlock();
}
private:
Mutex &_mutex;
};
}
2)Cond.hpp
#pragma once
#include <iostream>
#include <pthread.h>
#include "Mutex.hpp"
using namespace MutexModule;
namespace CondModule
{
// 条件变量封装
class Cond
{
public:
Cond()
{
pthread_cond_init(&_cond, nullptr);
}
// 等待: 阻塞并释放锁
void Wait(Mutex &mutex)
{
int n = pthread_cond_wait(&_cond, mutex.Get());
}
// 唤醒一个等待线程
void Signal()
{
int n = pthread_cond_signal(&_cond);
}
// 唤醒所有等待线程
void Broadcast()
{
int n = pthread_cond_broadcast(&_cond);
}
~Cond()
{
pthread_cond_destroy(&_cond);
}
private:
pthread_cond_t _cond; // 底层条件变量
};
}
三、POSIX信号量
POSIX 信号量与 SystemV 信号量功能一致,均可同步并安全访问共享资源;POSIX 信号量还支持线程间同步,使用更简便。
1、信号量接口
1)sem_init 函数
2)sem_destroy 函数
3)sem_wait 函数
4)sem_post 函数
2、基于环形队列的⽣产者消费者模型
1、阻塞队列
-
环形队列用数组+模运算实现循环

-
空/满状态难以直接区分,可通过:预留一个空位或者增加计数器/标记位来判断。

我们现在有信号量这个计数器,就很简单的进⾏多线程间的同步过程。
1)Mutex.hpp
#pragma once
#include <iostream>
#include <pthread.h>
namespace MutexModule
{
// 互斥锁封装类
class Mutex
{
public:
// 初始化锁
Mutex()
{
pthread_mutex_init(&_mutex, nullptr);
}
// 加锁
void Lock()
{
int n = pthread_mutex_lock(&_mutex);
}
// 解锁
void Unlock()
{
int n = pthread_mutex_unlock(&_mutex);
}
// 获取锁指针
pthread_mutex_t *Get()
{
return &_mutex;
}
// 销毁锁
~Mutex()
{
pthread_mutex_destroy(&_mutex);
}
private:
pthread_mutex_t _mutex; // 原生互斥锁对象
};
// RAII 自动锁
class LockGuard
{
public:
LockGuard(Mutex &mutex)
: _mutex(mutex)
{
_mutex.Lock();
}
~LockGuard()
{
_mutex.Unlock();
}
private:
Mutex &_mutex;
};
}
2)Sem.hpp
#pragma once
#include <iostream>
#include <semaphore.h>
namespace SemModule
{
class Sem
{
public:
Sem(int val = 1)
{
sem_init(&_sem, 0, val);
}
// P操作: 申请资源(--)
void P()
{
int n = sem_wait(&_sem);
}
// V操作: 释放资源(++)
void V()
{
int n = sem_post(&_sem);
}
~Sem()
{
sem_destroy(&_sem);
}
private:
sem_t _sem;
};
}
3)RingQueue.hpp
#pragma once
#include <iostream>
#include <vector>
#include "Mutex.hpp"
#include "Sem.hpp"
using namespace MutexModule;
using namespace SemModule;
static const int gcap = 1;
template <typename T>
class RingQueue
{
public:
// 构造: 初始化环形队列、信号量、下标
RingQueue(int cap = gcap)
: _cap(cap),
_rq(cap),
_blank_sem(cap), // 空位数信号量
_p_step(0), // 生产者起始下标
_data_sem(0), // 数据数信号量
_c_step(0) // 消费者起始下标
{
}
// 生产者入队
void Equeue(const T &in)
{
_blank_sem.P();
{
LockGuard lockguard(_pmutex);
_rq[_p_step] = in;
++_p_step;
_p_step %= _cap; // 维持环形特性
}
_data_sem.V();
}
// 消费者出队 出参out
void Pop(T *out)
{
_data_sem.P();
{
LockGuard lockguard(_cmutex);
*out = _rq[_c_step];
++_c_step;
_c_step %= _cap;
}
_blank_sem.V();
}
private:
std::vector<T> _rq; // 环形队列底层数组
int _cap; // 队列容量
Sem _blank_sem; // 空位数信号量
int _p_step; // 生产者位置下标
Sem _data_sem; // 数据数信号量
int _c_step; // 消费者位置下标
Mutex _cmutex; // 消费者锁
Mutex _pmutex; // 生产者锁
};
测试用例:
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include "RingQueue.hpp"
// 线程参数封装
struct threaddata
{
std::string name;
RingQueue<int> *rq;
};
Mutex print_mutex; // 全局打印锁,解决cout乱序
Mutex data_mutex; // 全局数据锁,解决data重复问题
// 消费者
void *consumer(void *args)
{
threaddata *td = static_cast<threaddata *>(args);
while (true)
{
sleep(1);
int t = 0;
td->rq->Pop(&t);
LockGuard lockguard(print_mutex);
std::cout << td->name << " 消费数据: " << t << std::endl;
}
}
int data = 1;
// 生产者
void *productor(void *args)
{
threaddata *td = static_cast<threaddata *>(args);
while (true)
{
sleep(2);
// data++ 加锁
int cur;
{
LockGuard lockguard(data_mutex);
cur = data++;
}
// 打印加锁
{
LockGuard lockguard(print_mutex);
std::cout << td->name << " 生产数据: " << cur << std::endl;
}
td->rq->Equeue(cur);
}
}
int main()
{
RingQueue<int> *rq = new RingQueue<int>();
// 多生产多消费
pthread_t p[3], c[2];
threaddata td1{"pthread-1", rq};
threaddata td2{"pthread-2", rq};
threaddata td3{"pthread-3", rq};
threaddata td4{"cthread-1", rq};
threaddata td5{"cthread-2", rq};
pthread_create(p, nullptr, productor, &td1);
pthread_create(p+1, nullptr, productor, &td2);
pthread_create(p+2, nullptr, productor, &td3);
pthread_create(c, nullptr, consumer, &td4);
pthread_create(c+1, nullptr, consumer, &td5);
pthread_join(p[0], nullptr);
pthread_join(p[1], nullptr);
pthread_join(p[2], nullptr);
pthread_join(c[0], nullptr);
pthread_join(c[1], nullptr);
delete rq;
return 0;
}
结果:
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)