一、MQTT 是什么?核心架构

MQTT 是发布/订阅模式物联网通信协议
架构只有 3 部分:

  1. Broker MQTT 服务端(中转站)
    mosquitto 程序,只负责转发消息,不处理业务
  2. MQTT 客户端(你的 C++ 程序)
    • subscribe(主题):订阅 → 接收别人发给这个主题的消息
    • publish(主题,内容):发布 → 向这个主题广播消息
  3. 通信方式:长连接 TCP

通信流程

任意客户端发布消息 → Broker 收到 → 转发给所有订阅该主题的客户端
客户端互相不知道对方IP,只靠主题Topic通信


二、Linux 搭建本地 MQTT 服务端 Mosquitto

1. 安装服务端 + 命令行测试工具

sudo apt update
sudo apt install mosquitto mosquitto-clients

2. 启动服务 + 开机自启

# 启动MQTT服务
sudo systemctl start mosquitto

# 设置开机自动运行
sudo systemctl enable mosquitto

3. 查看服务运行状态

sudo systemctl status mosquitto

出现 active (running) 代表服务正常

  • 默认地址:localhost
  • 默认端口:1883
  • 默认无账号密码,直接连接

4. 命令行原生测试(验证服务是否正常)

终端1:订阅主题(等待收消息)

mosquitto_sub -t test/topic

终端2:发布消息(发送消息)

mosquitto_pub -t test/topic -m "Hello MQTT"

终端1立刻收到消息 → 服务端完全正常


三、安装 C++ MQTT 客户端库 libmosquitto

sudo apt install libmosquitto-dev

四、C++ MqttClient 完整流程逐行详解

1. 构造函数做了什么

MqttClient::MqttClient(...)
{
    // 1. 初始化mosquitto库
    mosquitto_lib_init();

    // 2. 创建客户端实例
    // 参数1:客户端ID
    // 参数2:是否清理旧会话
    // 参数3:this → 当前client对象地址,回调要用
    mosq_ = mosquitto_new(client_id.c_str(), true, this);
}

这里传入的 this = main 里 MqttClient client(...) 这个对象本身的地址

2. 注册三个核心回调(重中之重)

// 1. 连接结果回调:连接成功/失败后自动调用
mosquitto_connect_callback_set(mosq_, on_connect_callback);

// 2. 消息接收回调:订阅主题收到消息自动调用
mosquitto_message_callback_set(mosq_, on_message_callback);

// 3. 断开连接回调:掉线/主动断开自动调用
mosquitto_disconnect_callback_set(mosq_, on_disconnect_callback);

3. 回调函数参数完整解释

① 连接回调 on_connect_callback

void on_connect_callback(struct mosquitto*, void* obj, int result)
  1. 第1参数:库内部客户端对象,不用管
  2. 第2参数 obj:就是你传的 this,强转得到C++对象
  3. 第3参数 result:连接状态
    • 0 = 连接成功
    • 非0 = 连接失败

执行 mosquitto_connect() 连接完成后,立刻触发此回调

② 消息回调 on_message_callback

void on_message_callback(struct mosquitto*, void* obj, const struct mosquitto_message* msg)
  1. 第1参数:库内部对象
  2. 第2参数 obj:C++对象指针
  3. 第3参数 msg:消息结构体
    • msg->topic:消息主题
    • msg->payload消息数据指针(地址)
    • msg->payloadlen:消息长度

判断逻辑必须:

if(payload != nullptr && payloadlen > 0)

先判断指针不为空,防止空指针崩溃,再判断长度

③ 断开回调 on_disconnect_callback

void on_disconnect_callback(struct mosquitto*, void* obj, int rc)
  1. rc=0:主动正常断开
  2. rc≠0:网络异常、掉线、服务端断开

4. 连接 Broker

int rc = mosquitto_connect(mosq_, 主机, 端口, keepalive_);

keepalive 心跳详解(60秒)

  1. 不是连接超时!不是连不上就退出!
  2. 含义:60秒内没有任何业务消息收发
  3. libmosquitto 自动发送PING心跳包,你不用写任何代码
  4. TCP静默断线检测不到,靠心跳证明客户端存活
  5. 超过2倍时长无心跳,Broker判定设备离线,断开连接
  6. 你不publish、不收消息,完全不影响心跳正常运行

5. 订阅主题

client.subscribe("test/topic", 1);

告诉Broker:这个主题的所有消息,都转发给我

6. 发布消息

client.publish("test/topic", "内容", 0, false);

向主题广播消息,所有订阅该主题客户端都会收到

7. 消息循环 loop()

void MqttClient::loop()
{
    while(true)
    {
        mosquitto_loop(mosq_, -1, 1);
        // 处理收发消息、心跳、重连
    }
}
  • 无限循环阻塞,持续监听网络事件
  • 物联网设备本来就需要一直运行不退出
  • 主线程用线程跑loop,不卡住其他业务
  • Ctrl+C 优雅退出

8. 析构函数释放资源

~MqttClient()
{
    mosquitto_disconnect(mosq_);    // 断开连接
    mosquitto_destroy(mosq_);        // 销毁客户端
    mosquitto_lib_cleanup();         // 清理库资源
}

五、C++程序编译 + 运行完整命令

1. 编译

g++ mqtt_client.cpp -o mqtt_client -lmosquitto -pthread

2. 运行连接本地MQTT服务端

./mqtt_client localhost 1883

六、完整双向测试流程(必做)

方式1:C++ ↔ 命令行互相通信

  1. 运行你的C++程序,订阅 test/topic
  2. 新开终端发布消息
mosquitto_pub -t test/topic -m "终端发来消息"
  1. C++程序立刻回调打印收到内容

方式2:两个C++客户端互相聊天

  1. 终端A运行C++,订阅test/topic
  2. 终端B运行C++,发布消息
  3. 终端A自动收到

七、常见问题汇总

  1. Connection refused
    mosquitto 服务没启动,执行 sudo systemctl start mosquitto

  2. 程序一直不退出
    loop是无限死循环,物联网常驻程序本来就这样正常

  3. 收不到消息
    主题名字大小写、符号必须完全一模一样

  4. 长时间没发消息会不会掉线?
    不会,库自动心跳保活


八、极简整体流程总结

  1. 安装启动Mosquitto服务端
  2. C++创建MqttClient对象
  3. 注册连接、消息、断开三个回调
  4. 设置60秒keepalive心跳
  5. 连接Broker
  6. 订阅主题等待收消息
  7. 发布消息发送数据
  8. loop循环持续监听,Ctrl+C正常退出
  9. 程序退出自动释放所有MQTT资源
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐