QWaitCondition:条件变量

基本概念与用法
条件变量是一种同步机制,允许线程在某个条件不满足时阻塞,直到该条件由其他线程通知满足。Qt提供了QWaitCondition类,用于实现条件变量的功能。通常,QWaitCondition与QMutex结合使用,以确保在等待和通知过程中对共享资源的访问是安全的。

典型用法:

1创建一个QWaitCondition对象和一个QMutex对象。
2在线程中,当特定条件不满足时,使用wait()方法等待条件变量,同时锁定互斥量。
3当条件满足时,其他线程通过wakeOne()或wakeAll()方法通知等待的线程。
4等待线程在被通知后重新获取互斥量,并继续执行。

关键方法:

●wait(QMutex *mutex):使调用线程等待,直到被唤醒。调用该方法前必须锁定互斥量,调用后会自动解锁,并在被唤醒后重新锁定。
●wakeOne():唤醒一个等待的线程。
●wakeAll():唤醒所有等待的线程。
应用场景
条件变量经常用于控制多线程访问资源,进行条件判断的场景。比如一个线程A向队列中放入数据,一个线程B从队列中取出数据。
线程A我们叫做生产者线程,线程B我们叫做消费者线程。
如果消费者线程取出数据比生产者线程放入数据的速度快,那么队列为空时,消费者线程就无法取出数据,我们常用的做法就是判断队列为空,就跳过本次循环继续循环,比如下面的逻辑

//模拟消费者线程做的事情
void consumerWork(){
    //消费者无限轮询工作
    while(true){
        //判断队列是否为空
        if(queue.isEmpty()){
            continue;
        }
        //取出数据
        queue.pop()
    }   
    
}

对于上面的代码,如果队列为空,则直接跳过,进入下一次循环,如果短时间内队列仍旧为空,会再次continue,要是一直为空这个循环就一直在空跑,循环空跑是对cpu资源最大的浪费!

有的同学可能会说,那就让消费者线程睡眠1s呗,消费者线程会等1s睡完之后才能处理,比如下面这样

//模拟消费者线程做的事情
void consumerWork(){
    //消费者无限轮询工作
    while(true){
        //判断队列是否为空
        if(queue.isEmpty()){
            //睡眠1s
            QThread::sleep(1);
            continue;
        }
        //取出数据
        queue.pop()
    }   
    
}

那如果在这1s之内生产者线程往队列里放入数据了呢?这样就不及时了,对于很多零延迟系统(游戏,即时交易,高并发场景等)是不允许的。

如果生产数据的线程放入数据的速度比消费者线程的速度快,那么很容易把队列撑爆,实际公司的场景会对队列设置最大大小,如果队列超过最大值,那么就不让生产者将数据放入,之前谈过消费者可以跳过,那生产者是不能跳过的,因为跳过放入队列的操作,就意味着数据丢失,这次数据没放入就丢掉了。这是无法容忍的。(比如同学们玩元神,氪金648后台将你的重置信息放入队列,但是因为今天活动太火爆了,充值队列撑爆了,你的消费信息无法放入队列被丢弃了,出现的后果就是钱花了原石被到账,坑爹吧)

所以遇到这些情况就不能跳过和重试,要让这个操作阻塞,直到队列满足条件后,操作继续,这样不会丢失数据,处理也足够及时。

生产者消费者问题

单一生产者和消费者

实现如下案例, 启动生产者和消费者线程,生产者线程向队列放入数据,消费者线程从队列中取出数据,并且两个线程将数据信息发送给MainWindow显示。

全局文件

我们需要实现全局文件global.h和global.cpp,用来声明和定义要使用的全局变量

#ifndef GLOBAL_H
#define GLOBAL_H
#include <QQueue>
#include <atomic>
#include <QMutex>
#include <QWaitCondition>
#include <QSet>


//全局队列,生产者和消费者共享
extern QQueue<int> _global_queue;
//生产者共享的计数
extern std::atomic<int> _global_num;
//互斥量,生产者和消费者共享
extern QMutex _global_mtx;
//生产者条件变量
extern QWaitCondition _global_producer_wc;
//消费者条件变量
extern QWaitCondition _global_consumer_wc;
//队列最大值
const int MAX_SIZE = 10;


extern void printQtThreadId(QString str);
#endif // GLOBAL_H

定义全局变量

#include "global.h"
#include <QThread>
#include <QDebug>

//全局队列,生产者和消费者共享
QQueue<int> _global_queue = {};
//生产者共享的计数
std::atomic<int> _global_num = {0};
//互斥量,生产者和消费者共享
QMutex _global_mtx;
//生产者条件变量
QWaitCondition _global_producer_wc;
//消费者条件变量
QWaitCondition _global_consumer_wc;

void printQtThreadId(QString str)
{
    qDebug() << str;
    Qt::HANDLE tid = QThread::currentThreadId();

    // 将 Qt::HANDLE 转换为 uintptr_t(无符号整数类型)
    quintptr numericTid = reinterpret_cast<quintptr>(tid);

    // 使用十六进制格式打印更具辨识度
    qDebug() << "Qt Thread ID:" << QString("0x%1").arg(numericTid, 0, 16);
    qDebug() << str;
}

消费者任务类

声明消费者信号和槽函数等

#ifndef CONSUMER_H
#define CONSUMER_H

#include <QObject>

class Consumer : public QObject
{
    Q_OBJECT
public:
    explicit Consumer(int id, QObject *parent = nullptr);
    ~Consumer();
signals:
    //更新主界面的信号
    void sig_consume_up(int con_id, int num, bool b_empty = false);
    //任务完成信号
    void sig_finished(int id);
public slots:
    //开始消费任务函数
    void consumeWork();
    //停止执行消费任务
    void stopWork(int id);
private:
    int _id;
    bool _b_run;
};

#endif // CONSUMER_H

实现具体的消费者类

#include "consumer.h"
#include "global.h"
#include <QThread>
#include <QDebug>

Consumer::Consumer(int id, QObject *parent) : QObject(parent),_id(id),_b_run(false)
{

}

Consumer::~Consumer()
{
    qDebug() << QString(" consumer [%1] destruct").arg(_id);
}

void Consumer::consumeWork()
{
    _b_run = true;
    bool flag = false;
    while(_b_run){
        //判断队列是否满了,需要先加锁
        _global_mtx.lock();
        //队列满了,需要用while循环判断,防止虚假唤醒
        while(_global_queue.size() <= 0 && _b_run){
            //通知主界面消费者队列为空
            emit  sig_consume_up(_id, 0, true);
            //生产者线程被挂起,直到消费者线程唤醒它
            _global_consumer_wc.wait(&_global_mtx);
        }

        //队列未满,或者被消费者唤醒,或者是主界面退出唤醒
        // 二次判断是否因为主界面退出唤醒
        if(!_b_run){
            //解锁
            _global_mtx.unlock();
            //发送结束信号
            emit sig_finished(_id);
            return;
        }

        auto temp = _global_queue.front();
        _global_queue.pop_front();
        //发送信号通知主界面刷新信息
        emit sig_consume_up(_id, temp);

        flag = false;
        if(_global_queue.size() == MAX_SIZE-1){
            flag = true;
        }
        //解锁
        _global_mtx.unlock();

        //发送通知唤醒生产者
        if(flag){
            _global_producer_wc.notify_one();
        }

        //为防止循环过快,小睡一会
        QThread::msleep(500);
    }

    //发送结束信号
    emit sig_finished(_id);
    return;
}

void Consumer::stopWork(int id)
{
    //界面停止信号会连接多个消费者,需要判断id是否为自己
    if(_id != id){
        return;
    }

    _global_mtx.lock();
    //是自己则退出
    _b_run = false;
    _global_consumer_wc.notify_all();
    _global_mtx.unlock();
}

生产者任务类

实现生产者任务类的声明

#ifndef PRODUCER_H
#define PRODUCER_H

#include <QObject>

class Producer : public QObject
{
    Q_OBJECT
public:
    //id为生产者id
    explicit Producer(int id, QObject *parent = nullptr);
    ~Producer();

public slots:
    //槽函数响应线程启动信号,模拟生产工作
    void produceWork();
    //槽函数响应主界面停止信号,结束任务
    void stopWork(int id);
private:
    //是否运行
   std::atomic<bool> _b_run;
    //生产者id
    int _id;
signals:
    //任务完成信号
    void sig_finished(int id);
    //发送信号通知主界面显示
    void sig_up_text(int id, int num, bool full = false);

};

#endif // PRODUCER_H

实现具体的定义

#include "producer.h"
#include "global.h"
#include <QThread>
#include <QDebug>

Producer::Producer(int id, QObject *parent): QObject(parent),_id(id)
{

}

void Producer::produceWork()
{
    _b_run = true;
    bool flag = false;
    while(_b_run){
        //判断队列是否满了,需要先加锁
        _global_mtx.lock();
         _global_num++;
         auto temp = _global_num.load();
        //队列满了,需要用while循环判断,防止虚假唤醒
        while(_global_queue.size()>=MAX_SIZE && _b_run ){
            //通知主界面生产者满了
            emit  sig_up_text(_id, temp, true);
            //生产者线程被挂起,直到消费者线程唤醒它
            _global_producer_wc.wait(&_global_mtx);
        }

        //队列未满,或者被消费者唤醒,或者是主界面退出唤醒
        // 二次判断是否因为主界面退出唤醒
        if(!_b_run){
            //解锁
            _global_mtx.unlock();
            //发送结束信号
            emit sig_finished(_id);
            return;
        }


        _global_queue.push_back(temp);
        //发送信号通知主界面刷新信息
        emit sig_up_text(_id, temp);

        flag = false;

        if(_global_queue.size() == 1){
            flag = true;
        }
        //解锁
        _global_mtx.unlock();

        //通知消费者唤醒
        if(flag){
           _global_consumer_wc.notify_one();
        }

        //为防止循环过快,小睡一会
        QThread::msleep(500);
    }

    //发送结束信号
    emit sig_finished(_id);
    return;
}

Producer::~Producer()
{
    qDebug() << QString(" producer [%1] destruct").arg(_id);
}

void Producer::stopWork(int id)
{
    //界面停止信号会连接多个生产者,需要判断id是否为自己
    if(_id != id){
        return;
    }

    _global_mtx.lock();
    //是自己则退出
    _b_run = false;
    _global_producer_wc.notify_all();
    _global_mtx.unlock();
}

MainWindow启动生产者和消费者

声明

#ifndef MAINWINDOW_H
#define MAINWINDOW_H

#include <QMainWindow>

namespace Ui {
class MainWindow;
}

class MainWindow : public QMainWindow
{
    Q_OBJECT

public:
    explicit MainWindow(QWidget *parent = nullptr);
    ~MainWindow();

private slots:
    //点击启动消费者按钮响应的槽函数
    void on_consumerBtn_clicked();
    //点击停止消费者按钮响应的槽函数
    void on_constopBtn_clicked();
    //点击启动生产者按钮响应的槽函数
    void on_producerBtn_clicked();
    //点击停止生产者按钮响应的槽函数
    void on_prostopBtn_clicked();

    //响应生产者线程发过来的消息显示到界面上
    void slot_up_text(int id, int num, bool full);
    //响应消费者线程发过来的消息显示到界面上
    void slot_consume_text(int id, int num, bool b_empty);

private:
    Ui::MainWindow *ui;

signals:
    //停止某个生产者信号
    void sig_stop_produce(int id);
    //停止某个消费者信号
    void sig_stop_consumer(int id);
};

#endif // MAINWINDOW_H

具体实现

#include "mainwindow.h"
#include "ui_mainwindow.h"
#include <QThread>
#include "global.h"
#include "producer.h"
#include <QMessageBox>
#include <QDebug>
#include "consumer.h"

MainWindow::MainWindow(QWidget *parent) :
    QMainWindow(parent),
    ui(new Ui::MainWindow),_worker_id(0)
{
    ui->setupUi(this);
}

MainWindow::~MainWindow()
{
    //通知生产者结束工作
    for(auto iter = _map_producers.begin();
        iter != _map_producers.end(); iter++){
        iter.value()->stopWork(iter.key());
    }

    //等待线程退出
    for(auto& thread: _map_producer_thread){
        thread->wait();
    }

    //通知消费者结束工作
    for(auto iter = _map_consumers.begin();
        iter != _map_consumers.end(); iter++){
        iter.value()->stopWork(iter.key());
    }

    //等待线程退出
    for(auto& thread: _map_consumer_thread){
        thread->wait();
    }


    delete ui;
}

void MainWindow::on_addProducerBtn_clicked()
{
    //创建线程
    auto thread = std::make_shared<QThread>();
    //增加id
    _worker_id++;
    //创建生产者
    auto producer = new Producer(_worker_id);
    //放入线程
    producer->moveToThread(thread.get());
    //连接线程启动信号
    connect(thread.get(),&QThread::started, producer, &Producer::produceWork);
    //连接线程结束信号, 这里为了延长thread声明周期
    connect(thread.get(), &QThread::finished, this, [=]() {
            thread->wait();
        });


    //连接停止信号,注意下面的方式是错误的
    //因为producer线程任务produceWork死循环,导致无法响应其他信号
    //connect(this, &MainWindow::sig_stop_produce, producer, &Producer::stopWork);
    //改为在MainWindow发送线程触发调用,内部再调用stopWork,此时是在MainWindow所属线程触发
    connect(this, &MainWindow::sig_stop_produce,this,[producer](int id){
        producer->stopWork(id);
    });

    //连接刷新信息信号
    connect(producer,&Producer::sig_up_text,this, &MainWindow::slot_up_text);

    //连接任务退出信号,因为线程执行的是死循环任务,所以接收者不能是thread
    //而且接收者不能是mainwindow,此时mainwindow如果处于退出等待状态也会卡死
    connect(producer,&Producer::sig_finished, [=](int id){
        qDebug()<<QString("收到任务[%1]退出信号").arg(id);
        producer->deleteLater();
        thread->quit();
    });

    //线程放入容器管理
    _producer_ids.push_back(_worker_id);
    //生产者map
    _map_producer_thread.insert(_worker_id,thread);
    //生产者放入map
    _map_producers.insert(_worker_id,producer);

    //更新生产者数量
    ui->producerLb->setText(QString("%1").arg(_producer_ids.size()));

    //更新提示信息
    ui->textEdit->append(QString("创建生产者线程:[%1]").arg(_worker_id));

    thread->start();
}

void MainWindow::on_delProducerBtn_clicked()
{
    if(_producer_ids.isEmpty()){
        QMessageBox::information(nullptr, "提示信息","没有生产者线程运行");
        return;
    }
    //获取首部元素
    auto id = _producer_ids.front();
    //弹出
    _producer_ids.pop_front();
    //从map中移除thread
    _map_producer_thread.remove(id);
    //从map中移除生产者
    _map_producers.remove(id);
    //发送信号停止
    emit sig_stop_produce(id);

    //更新生产者数量
    ui->producerLb->setText(QString("%1").arg(_producer_ids.size()));
    //更新文本信息
    ui->textEdit->append(QString("销毁生产者线程[%1]").arg(id));
}

void MainWindow::slot_up_text(int id, int num, bool full)
{
    if (full) {
        this->ui->textEdit->append(QString("队列已满!!! 线程:[%1] 无法将数据[%2]放入队列").arg(id).arg(num));
    }
    else {
        this->ui->textEdit->append(QString("线程:[%1] 将数据[%2]放入队列").arg(id).arg(num));
    }

}

void MainWindow::slot_consume_text(int id, int num, bool b_empty)
{
    if (b_empty) {
        this->ui->textEdit->append(QString("队列为空!!! 线程:[%1] 无法取出数据").arg(id));
    }
    else {
        this->ui->textEdit->append(QString("线程:[%1] 将数据[%2]取出队列").arg(id).arg(num));
    }
}

void MainWindow::on_addConsumerBtn_clicked()
{
    //创建线程
    auto thread = std::make_shared<QThread>();
    //增加id
    _worker_id++;
    //创建生产者
    auto consumer = new Consumer(_worker_id);
    //放入线程
    consumer->moveToThread(thread.get());
    //连接线程启动信号
    connect(thread.get(),&QThread::started, consumer, &Consumer::consumeWork);
    //连接线程结束信号, 这里为了延长thread声明周期
    connect(thread.get(), &QThread::finished, this, [=]() {
            thread->wait();
        });


    //连接停止信号,注意下面的方式是错误的
    //因为producer线程任务produceWork死循环,导致无法响应其他信号
    //connect(this, &MainWindow::sig_stop_produce, producer, &Producer::stopWork);
    //改为在MainWindow发送线程触发调用,内部再调用stopWork,此时是在MainWindow所属线程触发
    connect(this, &MainWindow::sig_stop_consumer,this,[consumer](int id){
        consumer->stopWork(id);
    });

    //连接刷新信息信号
    connect(consumer,&Consumer::sig_consume_up,this, &MainWindow::slot_consume_text);

    //连接任务退出信号,因为线程执行的是死循环任务,所以接收者不能是thread
    //而且接收者不能是mainwindow,此时mainwindow如果处于退出等待状态也会卡死
    connect(consumer,&Consumer::sig_finished, [=](int id){
        qDebug()<<QString("收到任务[%1]退出信号").arg(id);
        consumer->deleteLater();
        thread->quit();
    });

    //线程放入容器管理
    _consumer_ids.push_back(_worker_id);
    //生产者map
    _map_consumer_thread.insert(_worker_id,thread);
    //生产者放入map
    _map_consumers.insert(_worker_id,consumer);

    //更新生产者数量
    ui->consumerLb->setText(QString("%1").arg(_consumer_ids.size()));

    //更新提示信息
    ui->textEdit->append(QString("创建消费者线程:[%1]").arg(_worker_id));

    thread->start();
}

void MainWindow::on_delConsumerBtn_clicked()
{
    if(_consumer_ids.isEmpty()){
        QMessageBox::information(nullptr, "提示信息","没有生产者线程运行");
        return;
    }
    //获取首部元素
    auto id = _consumer_ids.front();
    //弹出
    _consumer_ids.pop_front();
    //从map中移除thread
    _map_consumer_thread.remove(id);
    //从map中移除生产者
    _map_consumers.remove(id);
    //发送信号停止
    emit sig_stop_consumer(id);

    //更新生产者数量
    ui->consumerLb->setText(QString("%1").arg(_consumer_ids.size()));
    //更新文本信息
    ui->textEdit->append(QString("销毁消费者线程[%1]").arg(id));
}

主函数启动

主函数启动展示MainWindow

#include "mainwindow.h"
#include <QApplication>

int main(int argc, char *argv[])
{
    QApplication a(argc, argv);
    MainWindow w;
    w.show();

    return a.exec();
}

多生产者多消费者(课外拓展)

我们采用的方法就是QWaitCondition, 我们可以制作如下界面,通过增减生产者,消费者达到控制速度的目的,当队列达到200时我们认为队列已满,当队列为空时我们认为不能消费。

解决思路

创建项目ProducerConsumer, 编辑mainwindow.ui,ui界面如下

属性结构图

全局文件

我们创建一个全局文件global.h和global.cpp,在global.h中声明如下全局变量

#ifndef GLOBAL_H
#define GLOBAL_H
#include <QQueue>
#include <atomic>
#include <QMutex>
#include <QWaitCondition>
#include <QSet>


//全局队列,生产者和消费者共享
extern QQueue<int> _global_queue;
//生产者共享的计数
extern std::atomic<int> _global_num;
//互斥量,生产者和消费者共享
extern QMutex _global_mtx;
//生产者条件变量
extern QWaitCondition _global_producer_wc;
//消费者条件变量
extern QWaitCondition _global_consumer_wc;
//队列最大值
const int MAX_SIZE = 10;


extern void printQtThreadId(QString str);
#endif // GLOBAL_H

定义如下全局变量

#include "global.h"
#include <QThread>
#include <QDebug>

//全局队列,生产者和消费者共享
QQueue<int> _global_queue = {};
//生产者共享的计数
std::atomic<int> _global_num = {0};
//互斥量,生产者和消费者共享
QMutex _global_mtx;
//生产者条件变量
QWaitCondition _global_producer_wc;
//消费者条件变量
QWaitCondition _global_consumer_wc;

void printQtThreadId(QString str)
{
    qDebug() << str;
    Qt::HANDLE tid = QThread::currentThreadId();

    // 将 Qt::HANDLE 转换为 uintptr_t(无符号整数类型)
    quintptr numericTid = reinterpret_cast<quintptr>(tid);

    // 使用十六进制格式打印更具辨识度
    qDebug() << "Qt Thread ID:" << QString("0x%1").arg(numericTid, 0, 16);
    qDebug() << str;
}

生产者任务类

#ifndef PRODUCER_H
#define PRODUCER_H

#include <QObject>

class Producer : public QObject
{
    Q_OBJECT
public:
    //id为生产者id
    explicit Producer(int id, QObject *parent = nullptr);
    ~Producer();

public slots:
    //槽函数响应线程启动信号,模拟生产工作
    void produceWork();
    //槽函数响应主界面停止信号,结束任务
    void stopWork(int id);
private:
    //是否运行
   std::atomic<bool> _b_run;
    //生产者id
    int _id;
signals:
    //任务完成信号
    void sig_finished(int id);
    //发送信号通知主界面显示
    void sig_up_text(int id, int num, bool full = false);

};

#endif // PRODUCER_H

具体实现

#include "producer.h"
#include "global.h"
#include <QThread>
#include <QDebug>

Producer::Producer(int id, QObject *parent): QObject(parent),_id(id)
{

}

void Producer::produceWork()
{
    _b_run = true;
    bool flag = false;
    while(_b_run){
        //判断队列是否满了,需要先加锁
        _global_mtx.lock();
         _global_num++;
         auto temp = _global_num.load();
        //队列满了,需要用while循环判断,防止虚假唤醒
        while(_global_queue.size()>=MAX_SIZE && _b_run ){
            //通知主界面生产者满了
            emit  sig_up_text(_id, temp, true);
            //生产者线程被挂起,直到消费者线程唤醒它
            _global_producer_wc.wait(&_global_mtx);
        }

        //队列未满,或者被消费者唤醒,或者是主界面退出唤醒
        // 二次判断是否因为主界面退出唤醒
        if(!_b_run){
            //解锁
            _global_mtx.unlock();
            //发送结束信号
            emit sig_finished(_id);
            return;
        }


        _global_queue.push_back(temp);
        //发送信号通知主界面刷新信息
        emit sig_up_text(_id, temp);

        flag = false;

        if(_global_queue.size() == 1){
            flag = true;
        }
        //解锁
        _global_mtx.unlock();

        //通知消费者唤醒
        if(flag){
           _global_consumer_wc.notify_one();
        }

        //为防止循环过快,小睡一会
        QThread::msleep(500);
    }

    //发送结束信号
    emit sig_finished(_id);
    return;
}

Producer::~Producer()
{
    qDebug() << QString(" producer [%1] destruct").arg(_id);
}

void Producer::stopWork(int id)
{
    //界面停止信号会连接多个生产者,需要判断id是否为自己
    if(_id != id){
        return;
    }

    _global_mtx.lock();
    //是自己则退出
    _b_run = false;
    _global_producer_wc.notify_all();
    _global_mtx.unlock();
}

消费者任务类

创建Consumer类声明如下

#ifndef CONSUMER_H
#define CONSUMER_H

#include <QObject>

class Consumer : public QObject
{
    Q_OBJECT
public:
    explicit Consumer(int id, QObject *parent = nullptr);
    ~Consumer();
signals:
    //更新主界面的信号
    void sig_consume_up(int con_id, int num, bool b_empty = false);
    //任务完成信号
    void sig_finished(int id);
public slots:
    //开始消费任务函数
    void consumeWork();
    //停止执行消费任务
    void stopWork(int id);
private:
    int _id;
    bool _b_run;
};

#endif // CONSUMER_H

定义如下

#include "consumer.h"
#include "global.h"
#include <QThread>
#include <QDebug>

Consumer::Consumer(int id, QObject *parent) : QObject(parent),_id(id),_b_run(false)
{

}

Consumer::~Consumer()
{
    qDebug() << QString(" consumer [%1] destruct").arg(_id);
}

void Consumer::consumeWork()
{
    _b_run = true;
    bool flag = false;
    while(_b_run){
        //判断队列是否满了,需要先加锁
        _global_mtx.lock();
        //队列满了,需要用while循环判断,防止虚假唤醒
        while(_global_queue.size() <= 0 && _b_run){
            //通知主界面消费者队列为空
            emit  sig_consume_up(_id, 0, true);
            //生产者线程被挂起,直到消费者线程唤醒它
            _global_consumer_wc.wait(&_global_mtx);
        }

        //队列未满,或者被消费者唤醒,或者是主界面退出唤醒
        // 二次判断是否因为主界面退出唤醒
        if(!_b_run){
            //解锁
            _global_mtx.unlock();
            //发送结束信号
            emit sig_finished(_id);
            return;
        }

        auto temp = _global_queue.front();
        _global_queue.pop_front();
        //发送信号通知主界面刷新信息
        emit sig_consume_up(_id, temp);

        flag = false;
        if(_global_queue.size() == MAX_SIZE-1){
            flag = true;
        }
        //解锁
        _global_mtx.unlock();

        //发送通知唤醒生产者
        if(flag){
            _global_producer_wc.notify_one();
        }

        //为防止循环过快,小睡一会
        QThread::msleep(500);
    }

    //发送结束信号
    emit sig_finished(_id);
    return;
}

void Consumer::stopWork(int id)
{
    //界面停止信号会连接多个消费者,需要判断id是否为自己
    if(_id != id){
        return;
    }

    _global_mtx.lock();
    //是自己则退出
    _b_run = false;
    _global_consumer_wc.notify_all();
    _global_mtx.unlock();
}

实现mainwindow

声明如下

#ifndef MAINWINDOW_H
#define MAINWINDOW_H

#include <QMainWindow>
#include <QVector>
#include <QMap>
#include "producer.h"
#include <memory>
#include "consumer.h"

namespace Ui {
class MainWindow;
}

class MainWindow : public QMainWindow
{
    Q_OBJECT

public:
    explicit MainWindow(QWidget *parent = nullptr);
    ~MainWindow();

private slots:
    //点击添加生产者线程按钮响应的槽函数
    void on_addProducerBtn_clicked();
    //点击删除生产者线程按钮响应的槽函数
    void on_delProducerBtn_clicked();
    //响应生产者线程发过来的消息显示到界面上
    void slot_up_text(int id, int num, bool full);
    //响应消费者线程发过来的消息显示到界面上
    void slot_consume_text(int id, int num, bool b_empty);
    //响应添加消费者线程按钮的槽函数
    void on_addConsumerBtn_clicked();
    //响应删除消费者线程按钮的槽函数
    void on_delConsumerBtn_clicked();

private:
    Ui::MainWindow *ui;
    //工人id,生产者和消费者id
    int _worker_id;
    //生产者id列表
    QVector<int> _producer_ids;
    //消费者id列表
    QVector<int> _consumer_ids;

    //生产者id对应线程的map
    QMap<int, std::shared_ptr<QThread> > _map_producer_thread;
    //生产者id对应生产者数据
    QMap<int, Producer*> _map_producers;

    //消费者id对应线程的map
    QMap<int, std::shared_ptr<QThread> > _map_consumer_thread;
    //消费者id对应生产者数据
    QMap<int, Consumer*> _map_consumers;

signals:
    //停止某个生产者信号
    void sig_stop_produce(int id);
    //停止某个消费者信号
    void sig_stop_consumer(int id);
};

#endif // MAINWINDOW_H

完成具体实现

#include "mainwindow.h"
#include "ui_mainwindow.h"
#include <QThread>
#include "global.h"
#include "producer.h"
#include <QMessageBox>
#include <QDebug>
#include "consumer.h"

MainWindow::MainWindow(QWidget *parent) :
    QMainWindow(parent),
    ui(new Ui::MainWindow),_worker_id(0)
{
    ui->setupUi(this);
}

MainWindow::~MainWindow()
{
    //通知生产者结束工作
    for(auto iter = _map_producers.begin();
        iter != _map_producers.end(); iter++){
        iter.value()->stopWork(iter.key());
    }

    //等待线程退出
    for(auto& thread: _map_producer_thread){
        thread->wait();
    }

    //通知消费者结束工作
    for(auto iter = _map_consumers.begin();
        iter != _map_consumers.end(); iter++){
        iter.value()->stopWork(iter.key());
    }

    //等待线程退出
    for(auto& thread: _map_consumer_thread){
        thread->wait();
    }


    delete ui;
}

void MainWindow::on_addProducerBtn_clicked()
{
    //创建线程
    auto thread = std::make_shared<QThread>();
    //增加id
    _worker_id++;
    //创建生产者
    auto producer = new Producer(_worker_id);
    //放入线程
    producer->moveToThread(thread.get());
    //连接线程启动信号
    connect(thread.get(),&QThread::started, producer, &Producer::produceWork);
    //连接线程结束信号, 这里为了延长thread声明周期
    connect(thread.get(), &QThread::finished, this, [=]() {
            thread->wait();
        });


    //连接停止信号,注意下面的方式是错误的
    //因为producer线程任务produceWork死循环,导致无法响应其他信号
    //connect(this, &MainWindow::sig_stop_produce, producer, &Producer::stopWork);
    //改为在MainWindow发送线程触发调用,内部再调用stopWork,此时是在MainWindow所属线程触发
    connect(this, &MainWindow::sig_stop_produce,this,[producer](int id){
        producer->stopWork(id);
    });

    //连接刷新信息信号
    connect(producer,&Producer::sig_up_text,this, &MainWindow::slot_up_text);

    //连接任务退出信号,因为线程执行的是死循环任务,所以接收者不能是thread
    //而且接收者不能是mainwindow,此时mainwindow如果处于退出等待状态也会卡死
    connect(producer,&Producer::sig_finished, [=](int id){
        qDebug()<<QString("收到任务[%1]退出信号").arg(id);
        producer->deleteLater();
        thread->quit();
    });

    //线程放入容器管理
    _producer_ids.push_back(_worker_id);
    //生产者map
    _map_producer_thread.insert(_worker_id,thread);
    //生产者放入map
    _map_producers.insert(_worker_id,producer);

    //更新生产者数量
    ui->producerLb->setText(QString("%1").arg(_producer_ids.size()));

    //更新提示信息
    ui->textEdit->append(QString("创建生产者线程:[%1]").arg(_worker_id));

    thread->start();
}

void MainWindow::on_delProducerBtn_clicked()
{
    if(_producer_ids.isEmpty()){
        QMessageBox::information(nullptr, "提示信息","没有生产者线程运行");
        return;
    }
    //获取首部元素
    auto id = _producer_ids.front();
    //弹出
    _producer_ids.pop_front();
    //从map中移除thread
    _map_producer_thread.remove(id);
    //从map中移除生产者
    _map_producers.remove(id);
    //发送信号停止
    emit sig_stop_produce(id);

    //更新生产者数量
    ui->producerLb->setText(QString("%1").arg(_producer_ids.size()));
    //更新文本信息
    ui->textEdit->append(QString("销毁生产者线程[%1]").arg(id));
}

void MainWindow::slot_up_text(int id, int num, bool full)
{
    if (full) {
        this->ui->textEdit->append(QString("队列已满!!! 线程:[%1] 无法将数据[%2]放入队列").arg(id).arg(num));
    }
    else {
        this->ui->textEdit->append(QString("线程:[%1] 将数据[%2]放入队列").arg(id).arg(num));
    }

}

void MainWindow::slot_consume_text(int id, int num, bool b_empty)
{
    if (b_empty) {
        this->ui->textEdit->append(QString("队列为空!!! 线程:[%1] 无法取出数据").arg(id));
    }
    else {
        this->ui->textEdit->append(QString("线程:[%1] 将数据[%2]取出队列").arg(id).arg(num));
    }
}

void MainWindow::on_addConsumerBtn_clicked()
{
    //创建线程
    auto thread = std::make_shared<QThread>();
    //增加id
    _worker_id++;
    //创建生产者
    auto consumer = new Consumer(_worker_id);
    //放入线程
    consumer->moveToThread(thread.get());
    //连接线程启动信号
    connect(thread.get(),&QThread::started, consumer, &Consumer::consumeWork);
    //连接线程结束信号, 这里为了延长thread声明周期
    connect(thread.get(), &QThread::finished, this, [=]() {
            thread->wait();
        });


    //连接停止信号,注意下面的方式是错误的
    //因为producer线程任务produceWork死循环,导致无法响应其他信号
    //connect(this, &MainWindow::sig_stop_produce, producer, &Producer::stopWork);
    //改为在MainWindow发送线程触发调用,内部再调用stopWork,此时是在MainWindow所属线程触发
    connect(this, &MainWindow::sig_stop_consumer,this,[consumer](int id){
        consumer->stopWork(id);
    });

    //连接刷新信息信号
    connect(consumer,&Consumer::sig_consume_up,this, &MainWindow::slot_consume_text);

    //连接任务退出信号,因为线程执行的是死循环任务,所以接收者不能是thread
    //而且接收者不能是mainwindow,此时mainwindow如果处于退出等待状态也会卡死
    connect(consumer,&Consumer::sig_finished, [=](int id){
        qDebug()<<QString("收到任务[%1]退出信号").arg(id);
        consumer->deleteLater();
        thread->quit();
    });

    //线程放入容器管理
    _consumer_ids.push_back(_worker_id);
    //生产者map
    _map_consumer_thread.insert(_worker_id,thread);
    //生产者放入map
    _map_consumers.insert(_worker_id,consumer);

    //更新生产者数量
    ui->consumerLb->setText(QString("%1").arg(_consumer_ids.size()));

    //更新提示信息
    ui->textEdit->append(QString("创建消费者线程:[%1]").arg(_worker_id));

    thread->start();
}

void MainWindow::on_delConsumerBtn_clicked()
{
    if(_consumer_ids.isEmpty()){
        QMessageBox::information(nullptr, "提示信息","没有生产者线程运行");
        return;
    }
    //获取首部元素
    auto id = _consumer_ids.front();
    //弹出
    _consumer_ids.pop_front();
    //从map中移除thread
    _map_consumer_thread.remove(id);
    //从map中移除生产者
    _map_consumers.remove(id);
    //发送信号停止
    emit sig_stop_consumer(id);

    //更新生产者数量
    ui->consumerLb->setText(QString("%1").arg(_consumer_ids.size()));
    //更新文本信息
    ui->textEdit->append(QString("销毁消费者线程[%1]").arg(id));
}

启动界面

主函数中启动界面展示

#include "mainwindow.h"
#include <QApplication>

int main(int argc, char *argv[])
{
    QApplication a(argc, argv);
    MainWindow w;
    w.show();

    return a.exec();
}

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐