59、QWaitCondition:条件变量---------多线程、竟态条件和同步
QWaitCondition:条件变量
基本概念与用法
条件变量是一种同步机制,允许线程在某个条件不满足时阻塞,直到该条件由其他线程通知满足。Qt提供了QWaitCondition类,用于实现条件变量的功能。通常,QWaitCondition与QMutex结合使用,以确保在等待和通知过程中对共享资源的访问是安全的。
典型用法:
1创建一个QWaitCondition对象和一个QMutex对象。
2在线程中,当特定条件不满足时,使用wait()方法等待条件变量,同时锁定互斥量。
3当条件满足时,其他线程通过wakeOne()或wakeAll()方法通知等待的线程。
4等待线程在被通知后重新获取互斥量,并继续执行。
关键方法:
●wait(QMutex *mutex):使调用线程等待,直到被唤醒。调用该方法前必须锁定互斥量,调用后会自动解锁,并在被唤醒后重新锁定。
●wakeOne():唤醒一个等待的线程。
●wakeAll():唤醒所有等待的线程。
应用场景
条件变量经常用于控制多线程访问资源,进行条件判断的场景。比如一个线程A向队列中放入数据,一个线程B从队列中取出数据。
线程A我们叫做生产者线程,线程B我们叫做消费者线程。
如果消费者线程取出数据比生产者线程放入数据的速度快,那么队列为空时,消费者线程就无法取出数据,我们常用的做法就是判断队列为空,就跳过本次循环继续循环,比如下面的逻辑
//模拟消费者线程做的事情
void consumerWork(){
//消费者无限轮询工作
while(true){
//判断队列是否为空
if(queue.isEmpty()){
continue;
}
//取出数据
queue.pop()
}
}
对于上面的代码,如果队列为空,则直接跳过,进入下一次循环,如果短时间内队列仍旧为空,会再次continue,要是一直为空这个循环就一直在空跑,循环空跑是对cpu资源最大的浪费!
有的同学可能会说,那就让消费者线程睡眠1s呗,消费者线程会等1s睡完之后才能处理,比如下面这样
//模拟消费者线程做的事情
void consumerWork(){
//消费者无限轮询工作
while(true){
//判断队列是否为空
if(queue.isEmpty()){
//睡眠1s
QThread::sleep(1);
continue;
}
//取出数据
queue.pop()
}
}
那如果在这1s之内生产者线程往队列里放入数据了呢?这样就不及时了,对于很多零延迟系统(游戏,即时交易,高并发场景等)是不允许的。
如果生产数据的线程放入数据的速度比消费者线程的速度快,那么很容易把队列撑爆,实际公司的场景会对队列设置最大大小,如果队列超过最大值,那么就不让生产者将数据放入,之前谈过消费者可以跳过,那生产者是不能跳过的,因为跳过放入队列的操作,就意味着数据丢失,这次数据没放入就丢掉了。这是无法容忍的。(比如同学们玩元神,氪金648后台将你的重置信息放入队列,但是因为今天活动太火爆了,充值队列撑爆了,你的消费信息无法放入队列被丢弃了,出现的后果就是钱花了原石被到账,坑爹吧)
所以遇到这些情况就不能跳过和重试,要让这个操作阻塞,直到队列满足条件后,操作继续,这样不会丢失数据,处理也足够及时。
生产者消费者问题
单一生产者和消费者
实现如下案例, 启动生产者和消费者线程,生产者线程向队列放入数据,消费者线程从队列中取出数据,并且两个线程将数据信息发送给MainWindow显示。

全局文件
我们需要实现全局文件global.h和global.cpp,用来声明和定义要使用的全局变量
#ifndef GLOBAL_H
#define GLOBAL_H
#include <QQueue>
#include <atomic>
#include <QMutex>
#include <QWaitCondition>
#include <QSet>
//全局队列,生产者和消费者共享
extern QQueue<int> _global_queue;
//生产者共享的计数
extern std::atomic<int> _global_num;
//互斥量,生产者和消费者共享
extern QMutex _global_mtx;
//生产者条件变量
extern QWaitCondition _global_producer_wc;
//消费者条件变量
extern QWaitCondition _global_consumer_wc;
//队列最大值
const int MAX_SIZE = 10;
extern void printQtThreadId(QString str);
#endif // GLOBAL_H
定义全局变量
#include "global.h"
#include <QThread>
#include <QDebug>
//全局队列,生产者和消费者共享
QQueue<int> _global_queue = {};
//生产者共享的计数
std::atomic<int> _global_num = {0};
//互斥量,生产者和消费者共享
QMutex _global_mtx;
//生产者条件变量
QWaitCondition _global_producer_wc;
//消费者条件变量
QWaitCondition _global_consumer_wc;
void printQtThreadId(QString str)
{
qDebug() << str;
Qt::HANDLE tid = QThread::currentThreadId();
// 将 Qt::HANDLE 转换为 uintptr_t(无符号整数类型)
quintptr numericTid = reinterpret_cast<quintptr>(tid);
// 使用十六进制格式打印更具辨识度
qDebug() << "Qt Thread ID:" << QString("0x%1").arg(numericTid, 0, 16);
qDebug() << str;
}
消费者任务类
声明消费者信号和槽函数等
#ifndef CONSUMER_H
#define CONSUMER_H
#include <QObject>
class Consumer : public QObject
{
Q_OBJECT
public:
explicit Consumer(int id, QObject *parent = nullptr);
~Consumer();
signals:
//更新主界面的信号
void sig_consume_up(int con_id, int num, bool b_empty = false);
//任务完成信号
void sig_finished(int id);
public slots:
//开始消费任务函数
void consumeWork();
//停止执行消费任务
void stopWork(int id);
private:
int _id;
bool _b_run;
};
#endif // CONSUMER_H
实现具体的消费者类
#include "consumer.h"
#include "global.h"
#include <QThread>
#include <QDebug>
Consumer::Consumer(int id, QObject *parent) : QObject(parent),_id(id),_b_run(false)
{
}
Consumer::~Consumer()
{
qDebug() << QString(" consumer [%1] destruct").arg(_id);
}
void Consumer::consumeWork()
{
_b_run = true;
bool flag = false;
while(_b_run){
//判断队列是否满了,需要先加锁
_global_mtx.lock();
//队列满了,需要用while循环判断,防止虚假唤醒
while(_global_queue.size() <= 0 && _b_run){
//通知主界面消费者队列为空
emit sig_consume_up(_id, 0, true);
//生产者线程被挂起,直到消费者线程唤醒它
_global_consumer_wc.wait(&_global_mtx);
}
//队列未满,或者被消费者唤醒,或者是主界面退出唤醒
// 二次判断是否因为主界面退出唤醒
if(!_b_run){
//解锁
_global_mtx.unlock();
//发送结束信号
emit sig_finished(_id);
return;
}
auto temp = _global_queue.front();
_global_queue.pop_front();
//发送信号通知主界面刷新信息
emit sig_consume_up(_id, temp);
flag = false;
if(_global_queue.size() == MAX_SIZE-1){
flag = true;
}
//解锁
_global_mtx.unlock();
//发送通知唤醒生产者
if(flag){
_global_producer_wc.notify_one();
}
//为防止循环过快,小睡一会
QThread::msleep(500);
}
//发送结束信号
emit sig_finished(_id);
return;
}
void Consumer::stopWork(int id)
{
//界面停止信号会连接多个消费者,需要判断id是否为自己
if(_id != id){
return;
}
_global_mtx.lock();
//是自己则退出
_b_run = false;
_global_consumer_wc.notify_all();
_global_mtx.unlock();
}
生产者任务类
实现生产者任务类的声明
#ifndef PRODUCER_H
#define PRODUCER_H
#include <QObject>
class Producer : public QObject
{
Q_OBJECT
public:
//id为生产者id
explicit Producer(int id, QObject *parent = nullptr);
~Producer();
public slots:
//槽函数响应线程启动信号,模拟生产工作
void produceWork();
//槽函数响应主界面停止信号,结束任务
void stopWork(int id);
private:
//是否运行
std::atomic<bool> _b_run;
//生产者id
int _id;
signals:
//任务完成信号
void sig_finished(int id);
//发送信号通知主界面显示
void sig_up_text(int id, int num, bool full = false);
};
#endif // PRODUCER_H
实现具体的定义
#include "producer.h"
#include "global.h"
#include <QThread>
#include <QDebug>
Producer::Producer(int id, QObject *parent): QObject(parent),_id(id)
{
}
void Producer::produceWork()
{
_b_run = true;
bool flag = false;
while(_b_run){
//判断队列是否满了,需要先加锁
_global_mtx.lock();
_global_num++;
auto temp = _global_num.load();
//队列满了,需要用while循环判断,防止虚假唤醒
while(_global_queue.size()>=MAX_SIZE && _b_run ){
//通知主界面生产者满了
emit sig_up_text(_id, temp, true);
//生产者线程被挂起,直到消费者线程唤醒它
_global_producer_wc.wait(&_global_mtx);
}
//队列未满,或者被消费者唤醒,或者是主界面退出唤醒
// 二次判断是否因为主界面退出唤醒
if(!_b_run){
//解锁
_global_mtx.unlock();
//发送结束信号
emit sig_finished(_id);
return;
}
_global_queue.push_back(temp);
//发送信号通知主界面刷新信息
emit sig_up_text(_id, temp);
flag = false;
if(_global_queue.size() == 1){
flag = true;
}
//解锁
_global_mtx.unlock();
//通知消费者唤醒
if(flag){
_global_consumer_wc.notify_one();
}
//为防止循环过快,小睡一会
QThread::msleep(500);
}
//发送结束信号
emit sig_finished(_id);
return;
}
Producer::~Producer()
{
qDebug() << QString(" producer [%1] destruct").arg(_id);
}
void Producer::stopWork(int id)
{
//界面停止信号会连接多个生产者,需要判断id是否为自己
if(_id != id){
return;
}
_global_mtx.lock();
//是自己则退出
_b_run = false;
_global_producer_wc.notify_all();
_global_mtx.unlock();
}
MainWindow启动生产者和消费者
声明
#ifndef MAINWINDOW_H
#define MAINWINDOW_H
#include <QMainWindow>
namespace Ui {
class MainWindow;
}
class MainWindow : public QMainWindow
{
Q_OBJECT
public:
explicit MainWindow(QWidget *parent = nullptr);
~MainWindow();
private slots:
//点击启动消费者按钮响应的槽函数
void on_consumerBtn_clicked();
//点击停止消费者按钮响应的槽函数
void on_constopBtn_clicked();
//点击启动生产者按钮响应的槽函数
void on_producerBtn_clicked();
//点击停止生产者按钮响应的槽函数
void on_prostopBtn_clicked();
//响应生产者线程发过来的消息显示到界面上
void slot_up_text(int id, int num, bool full);
//响应消费者线程发过来的消息显示到界面上
void slot_consume_text(int id, int num, bool b_empty);
private:
Ui::MainWindow *ui;
signals:
//停止某个生产者信号
void sig_stop_produce(int id);
//停止某个消费者信号
void sig_stop_consumer(int id);
};
#endif // MAINWINDOW_H
具体实现
#include "mainwindow.h"
#include "ui_mainwindow.h"
#include <QThread>
#include "global.h"
#include "producer.h"
#include <QMessageBox>
#include <QDebug>
#include "consumer.h"
MainWindow::MainWindow(QWidget *parent) :
QMainWindow(parent),
ui(new Ui::MainWindow),_worker_id(0)
{
ui->setupUi(this);
}
MainWindow::~MainWindow()
{
//通知生产者结束工作
for(auto iter = _map_producers.begin();
iter != _map_producers.end(); iter++){
iter.value()->stopWork(iter.key());
}
//等待线程退出
for(auto& thread: _map_producer_thread){
thread->wait();
}
//通知消费者结束工作
for(auto iter = _map_consumers.begin();
iter != _map_consumers.end(); iter++){
iter.value()->stopWork(iter.key());
}
//等待线程退出
for(auto& thread: _map_consumer_thread){
thread->wait();
}
delete ui;
}
void MainWindow::on_addProducerBtn_clicked()
{
//创建线程
auto thread = std::make_shared<QThread>();
//增加id
_worker_id++;
//创建生产者
auto producer = new Producer(_worker_id);
//放入线程
producer->moveToThread(thread.get());
//连接线程启动信号
connect(thread.get(),&QThread::started, producer, &Producer::produceWork);
//连接线程结束信号, 这里为了延长thread声明周期
connect(thread.get(), &QThread::finished, this, [=]() {
thread->wait();
});
//连接停止信号,注意下面的方式是错误的
//因为producer线程任务produceWork死循环,导致无法响应其他信号
//connect(this, &MainWindow::sig_stop_produce, producer, &Producer::stopWork);
//改为在MainWindow发送线程触发调用,内部再调用stopWork,此时是在MainWindow所属线程触发
connect(this, &MainWindow::sig_stop_produce,this,[producer](int id){
producer->stopWork(id);
});
//连接刷新信息信号
connect(producer,&Producer::sig_up_text,this, &MainWindow::slot_up_text);
//连接任务退出信号,因为线程执行的是死循环任务,所以接收者不能是thread
//而且接收者不能是mainwindow,此时mainwindow如果处于退出等待状态也会卡死
connect(producer,&Producer::sig_finished, [=](int id){
qDebug()<<QString("收到任务[%1]退出信号").arg(id);
producer->deleteLater();
thread->quit();
});
//线程放入容器管理
_producer_ids.push_back(_worker_id);
//生产者map
_map_producer_thread.insert(_worker_id,thread);
//生产者放入map
_map_producers.insert(_worker_id,producer);
//更新生产者数量
ui->producerLb->setText(QString("%1").arg(_producer_ids.size()));
//更新提示信息
ui->textEdit->append(QString("创建生产者线程:[%1]").arg(_worker_id));
thread->start();
}
void MainWindow::on_delProducerBtn_clicked()
{
if(_producer_ids.isEmpty()){
QMessageBox::information(nullptr, "提示信息","没有生产者线程运行");
return;
}
//获取首部元素
auto id = _producer_ids.front();
//弹出
_producer_ids.pop_front();
//从map中移除thread
_map_producer_thread.remove(id);
//从map中移除生产者
_map_producers.remove(id);
//发送信号停止
emit sig_stop_produce(id);
//更新生产者数量
ui->producerLb->setText(QString("%1").arg(_producer_ids.size()));
//更新文本信息
ui->textEdit->append(QString("销毁生产者线程[%1]").arg(id));
}
void MainWindow::slot_up_text(int id, int num, bool full)
{
if (full) {
this->ui->textEdit->append(QString("队列已满!!! 线程:[%1] 无法将数据[%2]放入队列").arg(id).arg(num));
}
else {
this->ui->textEdit->append(QString("线程:[%1] 将数据[%2]放入队列").arg(id).arg(num));
}
}
void MainWindow::slot_consume_text(int id, int num, bool b_empty)
{
if (b_empty) {
this->ui->textEdit->append(QString("队列为空!!! 线程:[%1] 无法取出数据").arg(id));
}
else {
this->ui->textEdit->append(QString("线程:[%1] 将数据[%2]取出队列").arg(id).arg(num));
}
}
void MainWindow::on_addConsumerBtn_clicked()
{
//创建线程
auto thread = std::make_shared<QThread>();
//增加id
_worker_id++;
//创建生产者
auto consumer = new Consumer(_worker_id);
//放入线程
consumer->moveToThread(thread.get());
//连接线程启动信号
connect(thread.get(),&QThread::started, consumer, &Consumer::consumeWork);
//连接线程结束信号, 这里为了延长thread声明周期
connect(thread.get(), &QThread::finished, this, [=]() {
thread->wait();
});
//连接停止信号,注意下面的方式是错误的
//因为producer线程任务produceWork死循环,导致无法响应其他信号
//connect(this, &MainWindow::sig_stop_produce, producer, &Producer::stopWork);
//改为在MainWindow发送线程触发调用,内部再调用stopWork,此时是在MainWindow所属线程触发
connect(this, &MainWindow::sig_stop_consumer,this,[consumer](int id){
consumer->stopWork(id);
});
//连接刷新信息信号
connect(consumer,&Consumer::sig_consume_up,this, &MainWindow::slot_consume_text);
//连接任务退出信号,因为线程执行的是死循环任务,所以接收者不能是thread
//而且接收者不能是mainwindow,此时mainwindow如果处于退出等待状态也会卡死
connect(consumer,&Consumer::sig_finished, [=](int id){
qDebug()<<QString("收到任务[%1]退出信号").arg(id);
consumer->deleteLater();
thread->quit();
});
//线程放入容器管理
_consumer_ids.push_back(_worker_id);
//生产者map
_map_consumer_thread.insert(_worker_id,thread);
//生产者放入map
_map_consumers.insert(_worker_id,consumer);
//更新生产者数量
ui->consumerLb->setText(QString("%1").arg(_consumer_ids.size()));
//更新提示信息
ui->textEdit->append(QString("创建消费者线程:[%1]").arg(_worker_id));
thread->start();
}
void MainWindow::on_delConsumerBtn_clicked()
{
if(_consumer_ids.isEmpty()){
QMessageBox::information(nullptr, "提示信息","没有生产者线程运行");
return;
}
//获取首部元素
auto id = _consumer_ids.front();
//弹出
_consumer_ids.pop_front();
//从map中移除thread
_map_consumer_thread.remove(id);
//从map中移除生产者
_map_consumers.remove(id);
//发送信号停止
emit sig_stop_consumer(id);
//更新生产者数量
ui->consumerLb->setText(QString("%1").arg(_consumer_ids.size()));
//更新文本信息
ui->textEdit->append(QString("销毁消费者线程[%1]").arg(id));
}
主函数启动
主函数启动展示MainWindow
#include "mainwindow.h"
#include <QApplication>
int main(int argc, char *argv[])
{
QApplication a(argc, argv);
MainWindow w;
w.show();
return a.exec();
}
多生产者多消费者(课外拓展)
我们采用的方法就是QWaitCondition, 我们可以制作如下界面,通过增减生产者,消费者达到控制速度的目的,当队列达到200时我们认为队列已满,当队列为空时我们认为不能消费。

解决思路
创建项目ProducerConsumer, 编辑mainwindow.ui,ui界面如下

属性结构图

全局文件
我们创建一个全局文件global.h和global.cpp,在global.h中声明如下全局变量
#ifndef GLOBAL_H
#define GLOBAL_H
#include <QQueue>
#include <atomic>
#include <QMutex>
#include <QWaitCondition>
#include <QSet>
//全局队列,生产者和消费者共享
extern QQueue<int> _global_queue;
//生产者共享的计数
extern std::atomic<int> _global_num;
//互斥量,生产者和消费者共享
extern QMutex _global_mtx;
//生产者条件变量
extern QWaitCondition _global_producer_wc;
//消费者条件变量
extern QWaitCondition _global_consumer_wc;
//队列最大值
const int MAX_SIZE = 10;
extern void printQtThreadId(QString str);
#endif // GLOBAL_H
定义如下全局变量
#include "global.h"
#include <QThread>
#include <QDebug>
//全局队列,生产者和消费者共享
QQueue<int> _global_queue = {};
//生产者共享的计数
std::atomic<int> _global_num = {0};
//互斥量,生产者和消费者共享
QMutex _global_mtx;
//生产者条件变量
QWaitCondition _global_producer_wc;
//消费者条件变量
QWaitCondition _global_consumer_wc;
void printQtThreadId(QString str)
{
qDebug() << str;
Qt::HANDLE tid = QThread::currentThreadId();
// 将 Qt::HANDLE 转换为 uintptr_t(无符号整数类型)
quintptr numericTid = reinterpret_cast<quintptr>(tid);
// 使用十六进制格式打印更具辨识度
qDebug() << "Qt Thread ID:" << QString("0x%1").arg(numericTid, 0, 16);
qDebug() << str;
}
生产者任务类
#ifndef PRODUCER_H
#define PRODUCER_H
#include <QObject>
class Producer : public QObject
{
Q_OBJECT
public:
//id为生产者id
explicit Producer(int id, QObject *parent = nullptr);
~Producer();
public slots:
//槽函数响应线程启动信号,模拟生产工作
void produceWork();
//槽函数响应主界面停止信号,结束任务
void stopWork(int id);
private:
//是否运行
std::atomic<bool> _b_run;
//生产者id
int _id;
signals:
//任务完成信号
void sig_finished(int id);
//发送信号通知主界面显示
void sig_up_text(int id, int num, bool full = false);
};
#endif // PRODUCER_H
具体实现
#include "producer.h"
#include "global.h"
#include <QThread>
#include <QDebug>
Producer::Producer(int id, QObject *parent): QObject(parent),_id(id)
{
}
void Producer::produceWork()
{
_b_run = true;
bool flag = false;
while(_b_run){
//判断队列是否满了,需要先加锁
_global_mtx.lock();
_global_num++;
auto temp = _global_num.load();
//队列满了,需要用while循环判断,防止虚假唤醒
while(_global_queue.size()>=MAX_SIZE && _b_run ){
//通知主界面生产者满了
emit sig_up_text(_id, temp, true);
//生产者线程被挂起,直到消费者线程唤醒它
_global_producer_wc.wait(&_global_mtx);
}
//队列未满,或者被消费者唤醒,或者是主界面退出唤醒
// 二次判断是否因为主界面退出唤醒
if(!_b_run){
//解锁
_global_mtx.unlock();
//发送结束信号
emit sig_finished(_id);
return;
}
_global_queue.push_back(temp);
//发送信号通知主界面刷新信息
emit sig_up_text(_id, temp);
flag = false;
if(_global_queue.size() == 1){
flag = true;
}
//解锁
_global_mtx.unlock();
//通知消费者唤醒
if(flag){
_global_consumer_wc.notify_one();
}
//为防止循环过快,小睡一会
QThread::msleep(500);
}
//发送结束信号
emit sig_finished(_id);
return;
}
Producer::~Producer()
{
qDebug() << QString(" producer [%1] destruct").arg(_id);
}
void Producer::stopWork(int id)
{
//界面停止信号会连接多个生产者,需要判断id是否为自己
if(_id != id){
return;
}
_global_mtx.lock();
//是自己则退出
_b_run = false;
_global_producer_wc.notify_all();
_global_mtx.unlock();
}
消费者任务类
创建Consumer类声明如下
#ifndef CONSUMER_H
#define CONSUMER_H
#include <QObject>
class Consumer : public QObject
{
Q_OBJECT
public:
explicit Consumer(int id, QObject *parent = nullptr);
~Consumer();
signals:
//更新主界面的信号
void sig_consume_up(int con_id, int num, bool b_empty = false);
//任务完成信号
void sig_finished(int id);
public slots:
//开始消费任务函数
void consumeWork();
//停止执行消费任务
void stopWork(int id);
private:
int _id;
bool _b_run;
};
#endif // CONSUMER_H
定义如下
#include "consumer.h"
#include "global.h"
#include <QThread>
#include <QDebug>
Consumer::Consumer(int id, QObject *parent) : QObject(parent),_id(id),_b_run(false)
{
}
Consumer::~Consumer()
{
qDebug() << QString(" consumer [%1] destruct").arg(_id);
}
void Consumer::consumeWork()
{
_b_run = true;
bool flag = false;
while(_b_run){
//判断队列是否满了,需要先加锁
_global_mtx.lock();
//队列满了,需要用while循环判断,防止虚假唤醒
while(_global_queue.size() <= 0 && _b_run){
//通知主界面消费者队列为空
emit sig_consume_up(_id, 0, true);
//生产者线程被挂起,直到消费者线程唤醒它
_global_consumer_wc.wait(&_global_mtx);
}
//队列未满,或者被消费者唤醒,或者是主界面退出唤醒
// 二次判断是否因为主界面退出唤醒
if(!_b_run){
//解锁
_global_mtx.unlock();
//发送结束信号
emit sig_finished(_id);
return;
}
auto temp = _global_queue.front();
_global_queue.pop_front();
//发送信号通知主界面刷新信息
emit sig_consume_up(_id, temp);
flag = false;
if(_global_queue.size() == MAX_SIZE-1){
flag = true;
}
//解锁
_global_mtx.unlock();
//发送通知唤醒生产者
if(flag){
_global_producer_wc.notify_one();
}
//为防止循环过快,小睡一会
QThread::msleep(500);
}
//发送结束信号
emit sig_finished(_id);
return;
}
void Consumer::stopWork(int id)
{
//界面停止信号会连接多个消费者,需要判断id是否为自己
if(_id != id){
return;
}
_global_mtx.lock();
//是自己则退出
_b_run = false;
_global_consumer_wc.notify_all();
_global_mtx.unlock();
}
实现mainwindow
声明如下
#ifndef MAINWINDOW_H
#define MAINWINDOW_H
#include <QMainWindow>
#include <QVector>
#include <QMap>
#include "producer.h"
#include <memory>
#include "consumer.h"
namespace Ui {
class MainWindow;
}
class MainWindow : public QMainWindow
{
Q_OBJECT
public:
explicit MainWindow(QWidget *parent = nullptr);
~MainWindow();
private slots:
//点击添加生产者线程按钮响应的槽函数
void on_addProducerBtn_clicked();
//点击删除生产者线程按钮响应的槽函数
void on_delProducerBtn_clicked();
//响应生产者线程发过来的消息显示到界面上
void slot_up_text(int id, int num, bool full);
//响应消费者线程发过来的消息显示到界面上
void slot_consume_text(int id, int num, bool b_empty);
//响应添加消费者线程按钮的槽函数
void on_addConsumerBtn_clicked();
//响应删除消费者线程按钮的槽函数
void on_delConsumerBtn_clicked();
private:
Ui::MainWindow *ui;
//工人id,生产者和消费者id
int _worker_id;
//生产者id列表
QVector<int> _producer_ids;
//消费者id列表
QVector<int> _consumer_ids;
//生产者id对应线程的map
QMap<int, std::shared_ptr<QThread> > _map_producer_thread;
//生产者id对应生产者数据
QMap<int, Producer*> _map_producers;
//消费者id对应线程的map
QMap<int, std::shared_ptr<QThread> > _map_consumer_thread;
//消费者id对应生产者数据
QMap<int, Consumer*> _map_consumers;
signals:
//停止某个生产者信号
void sig_stop_produce(int id);
//停止某个消费者信号
void sig_stop_consumer(int id);
};
#endif // MAINWINDOW_H
完成具体实现
#include "mainwindow.h"
#include "ui_mainwindow.h"
#include <QThread>
#include "global.h"
#include "producer.h"
#include <QMessageBox>
#include <QDebug>
#include "consumer.h"
MainWindow::MainWindow(QWidget *parent) :
QMainWindow(parent),
ui(new Ui::MainWindow),_worker_id(0)
{
ui->setupUi(this);
}
MainWindow::~MainWindow()
{
//通知生产者结束工作
for(auto iter = _map_producers.begin();
iter != _map_producers.end(); iter++){
iter.value()->stopWork(iter.key());
}
//等待线程退出
for(auto& thread: _map_producer_thread){
thread->wait();
}
//通知消费者结束工作
for(auto iter = _map_consumers.begin();
iter != _map_consumers.end(); iter++){
iter.value()->stopWork(iter.key());
}
//等待线程退出
for(auto& thread: _map_consumer_thread){
thread->wait();
}
delete ui;
}
void MainWindow::on_addProducerBtn_clicked()
{
//创建线程
auto thread = std::make_shared<QThread>();
//增加id
_worker_id++;
//创建生产者
auto producer = new Producer(_worker_id);
//放入线程
producer->moveToThread(thread.get());
//连接线程启动信号
connect(thread.get(),&QThread::started, producer, &Producer::produceWork);
//连接线程结束信号, 这里为了延长thread声明周期
connect(thread.get(), &QThread::finished, this, [=]() {
thread->wait();
});
//连接停止信号,注意下面的方式是错误的
//因为producer线程任务produceWork死循环,导致无法响应其他信号
//connect(this, &MainWindow::sig_stop_produce, producer, &Producer::stopWork);
//改为在MainWindow发送线程触发调用,内部再调用stopWork,此时是在MainWindow所属线程触发
connect(this, &MainWindow::sig_stop_produce,this,[producer](int id){
producer->stopWork(id);
});
//连接刷新信息信号
connect(producer,&Producer::sig_up_text,this, &MainWindow::slot_up_text);
//连接任务退出信号,因为线程执行的是死循环任务,所以接收者不能是thread
//而且接收者不能是mainwindow,此时mainwindow如果处于退出等待状态也会卡死
connect(producer,&Producer::sig_finished, [=](int id){
qDebug()<<QString("收到任务[%1]退出信号").arg(id);
producer->deleteLater();
thread->quit();
});
//线程放入容器管理
_producer_ids.push_back(_worker_id);
//生产者map
_map_producer_thread.insert(_worker_id,thread);
//生产者放入map
_map_producers.insert(_worker_id,producer);
//更新生产者数量
ui->producerLb->setText(QString("%1").arg(_producer_ids.size()));
//更新提示信息
ui->textEdit->append(QString("创建生产者线程:[%1]").arg(_worker_id));
thread->start();
}
void MainWindow::on_delProducerBtn_clicked()
{
if(_producer_ids.isEmpty()){
QMessageBox::information(nullptr, "提示信息","没有生产者线程运行");
return;
}
//获取首部元素
auto id = _producer_ids.front();
//弹出
_producer_ids.pop_front();
//从map中移除thread
_map_producer_thread.remove(id);
//从map中移除生产者
_map_producers.remove(id);
//发送信号停止
emit sig_stop_produce(id);
//更新生产者数量
ui->producerLb->setText(QString("%1").arg(_producer_ids.size()));
//更新文本信息
ui->textEdit->append(QString("销毁生产者线程[%1]").arg(id));
}
void MainWindow::slot_up_text(int id, int num, bool full)
{
if (full) {
this->ui->textEdit->append(QString("队列已满!!! 线程:[%1] 无法将数据[%2]放入队列").arg(id).arg(num));
}
else {
this->ui->textEdit->append(QString("线程:[%1] 将数据[%2]放入队列").arg(id).arg(num));
}
}
void MainWindow::slot_consume_text(int id, int num, bool b_empty)
{
if (b_empty) {
this->ui->textEdit->append(QString("队列为空!!! 线程:[%1] 无法取出数据").arg(id));
}
else {
this->ui->textEdit->append(QString("线程:[%1] 将数据[%2]取出队列").arg(id).arg(num));
}
}
void MainWindow::on_addConsumerBtn_clicked()
{
//创建线程
auto thread = std::make_shared<QThread>();
//增加id
_worker_id++;
//创建生产者
auto consumer = new Consumer(_worker_id);
//放入线程
consumer->moveToThread(thread.get());
//连接线程启动信号
connect(thread.get(),&QThread::started, consumer, &Consumer::consumeWork);
//连接线程结束信号, 这里为了延长thread声明周期
connect(thread.get(), &QThread::finished, this, [=]() {
thread->wait();
});
//连接停止信号,注意下面的方式是错误的
//因为producer线程任务produceWork死循环,导致无法响应其他信号
//connect(this, &MainWindow::sig_stop_produce, producer, &Producer::stopWork);
//改为在MainWindow发送线程触发调用,内部再调用stopWork,此时是在MainWindow所属线程触发
connect(this, &MainWindow::sig_stop_consumer,this,[consumer](int id){
consumer->stopWork(id);
});
//连接刷新信息信号
connect(consumer,&Consumer::sig_consume_up,this, &MainWindow::slot_consume_text);
//连接任务退出信号,因为线程执行的是死循环任务,所以接收者不能是thread
//而且接收者不能是mainwindow,此时mainwindow如果处于退出等待状态也会卡死
connect(consumer,&Consumer::sig_finished, [=](int id){
qDebug()<<QString("收到任务[%1]退出信号").arg(id);
consumer->deleteLater();
thread->quit();
});
//线程放入容器管理
_consumer_ids.push_back(_worker_id);
//生产者map
_map_consumer_thread.insert(_worker_id,thread);
//生产者放入map
_map_consumers.insert(_worker_id,consumer);
//更新生产者数量
ui->consumerLb->setText(QString("%1").arg(_consumer_ids.size()));
//更新提示信息
ui->textEdit->append(QString("创建消费者线程:[%1]").arg(_worker_id));
thread->start();
}
void MainWindow::on_delConsumerBtn_clicked()
{
if(_consumer_ids.isEmpty()){
QMessageBox::information(nullptr, "提示信息","没有生产者线程运行");
return;
}
//获取首部元素
auto id = _consumer_ids.front();
//弹出
_consumer_ids.pop_front();
//从map中移除thread
_map_consumer_thread.remove(id);
//从map中移除生产者
_map_consumers.remove(id);
//发送信号停止
emit sig_stop_consumer(id);
//更新生产者数量
ui->consumerLb->setText(QString("%1").arg(_consumer_ids.size()));
//更新文本信息
ui->textEdit->append(QString("销毁消费者线程[%1]").arg(id));
}
启动界面
主函数中启动界面展示
#include "mainwindow.h"
#include <QApplication>
int main(int argc, char *argv[])
{
QApplication a(argc, argv);
MainWindow w;
w.show();
return a.exec();
}
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)