MQTT 超完整全套笔记(原理+服务端搭建+C++客户端全流程+回调+心跳+完整测试)
·
一、MQTT 是什么?核心架构
MQTT 是发布/订阅模式物联网通信协议
架构只有 3 部分:
- Broker MQTT 服务端(中转站)
mosquitto 程序,只负责转发消息,不处理业务 - MQTT 客户端(你的 C++ 程序)
subscribe(主题):订阅 → 接收别人发给这个主题的消息publish(主题,内容):发布 → 向这个主题广播消息
- 通信方式:长连接 TCP
通信流程
任意客户端发布消息 → Broker 收到 → 转发给所有订阅该主题的客户端
客户端互相不知道对方IP,只靠主题Topic通信
二、Linux 搭建本地 MQTT 服务端 Mosquitto
1. 安装服务端 + 命令行测试工具
sudo apt update
sudo apt install mosquitto mosquitto-clients
2. 启动服务 + 开机自启
# 启动MQTT服务
sudo systemctl start mosquitto
# 设置开机自动运行
sudo systemctl enable mosquitto
3. 查看服务运行状态
sudo systemctl status mosquitto
出现 active (running) 代表服务正常
- 默认地址:
localhost - 默认端口:
1883 - 默认无账号密码,直接连接
4. 命令行原生测试(验证服务是否正常)
终端1:订阅主题(等待收消息)
mosquitto_sub -t test/topic
终端2:发布消息(发送消息)
mosquitto_pub -t test/topic -m "Hello MQTT"
终端1立刻收到消息 → 服务端完全正常
三、安装 C++ MQTT 客户端库 libmosquitto
sudo apt install libmosquitto-dev
四、C++ MqttClient 完整流程逐行详解
1. 构造函数做了什么
MqttClient::MqttClient(...)
{
// 1. 初始化mosquitto库
mosquitto_lib_init();
// 2. 创建客户端实例
// 参数1:客户端ID
// 参数2:是否清理旧会话
// 参数3:this → 当前client对象地址,回调要用
mosq_ = mosquitto_new(client_id.c_str(), true, this);
}
这里传入的 this = main 里 MqttClient client(...) 这个对象本身的地址
2. 注册三个核心回调(重中之重)
// 1. 连接结果回调:连接成功/失败后自动调用
mosquitto_connect_callback_set(mosq_, on_connect_callback);
// 2. 消息接收回调:订阅主题收到消息自动调用
mosquitto_message_callback_set(mosq_, on_message_callback);
// 3. 断开连接回调:掉线/主动断开自动调用
mosquitto_disconnect_callback_set(mosq_, on_disconnect_callback);
3. 回调函数参数完整解释
① 连接回调 on_connect_callback
void on_connect_callback(struct mosquitto*, void* obj, int result)
- 第1参数:库内部客户端对象,不用管
- 第2参数
obj:就是你传的this,强转得到C++对象 - 第3参数
result:连接状态0= 连接成功- 非0 = 连接失败
执行 mosquitto_connect() 连接完成后,立刻触发此回调
② 消息回调 on_message_callback
void on_message_callback(struct mosquitto*, void* obj, const struct mosquitto_message* msg)
- 第1参数:库内部对象
- 第2参数
obj:C++对象指针 - 第3参数
msg:消息结构体msg->topic:消息主题msg->payload:消息数据指针(地址)msg->payloadlen:消息长度
判断逻辑必须:
if(payload != nullptr && payloadlen > 0)
先判断指针不为空,防止空指针崩溃,再判断长度
③ 断开回调 on_disconnect_callback
void on_disconnect_callback(struct mosquitto*, void* obj, int rc)
- rc=0:主动正常断开
- rc≠0:网络异常、掉线、服务端断开
4. 连接 Broker
int rc = mosquitto_connect(mosq_, 主机, 端口, keepalive_);
keepalive 心跳详解(60秒)
- 不是连接超时!不是连不上就退出!
- 含义:60秒内没有任何业务消息收发
- libmosquitto 自动发送PING心跳包,你不用写任何代码
- TCP静默断线检测不到,靠心跳证明客户端存活
- 超过2倍时长无心跳,Broker判定设备离线,断开连接
- 你不publish、不收消息,完全不影响心跳正常运行
5. 订阅主题
client.subscribe("test/topic", 1);
告诉Broker:这个主题的所有消息,都转发给我
6. 发布消息
client.publish("test/topic", "内容", 0, false);
向主题广播消息,所有订阅该主题客户端都会收到
7. 消息循环 loop()
void MqttClient::loop()
{
while(true)
{
mosquitto_loop(mosq_, -1, 1);
// 处理收发消息、心跳、重连
}
}
- 无限循环阻塞,持续监听网络事件
- 物联网设备本来就需要一直运行不退出
- 主线程用线程跑loop,不卡住其他业务
- 按
Ctrl+C优雅退出
8. 析构函数释放资源
~MqttClient()
{
mosquitto_disconnect(mosq_); // 断开连接
mosquitto_destroy(mosq_); // 销毁客户端
mosquitto_lib_cleanup(); // 清理库资源
}
五、C++程序编译 + 运行完整命令
1. 编译
g++ mqtt_client.cpp -o mqtt_client -lmosquitto -pthread
2. 运行连接本地MQTT服务端
./mqtt_client localhost 1883
六、完整双向测试流程(必做)
方式1:C++ ↔ 命令行互相通信
- 运行你的C++程序,订阅
test/topic - 新开终端发布消息
mosquitto_pub -t test/topic -m "终端发来消息"
- C++程序立刻回调打印收到内容
方式2:两个C++客户端互相聊天
- 终端A运行C++,订阅test/topic
- 终端B运行C++,发布消息
- 终端A自动收到
七、常见问题汇总
-
Connection refused
mosquitto 服务没启动,执行sudo systemctl start mosquitto -
程序一直不退出
loop是无限死循环,物联网常驻程序本来就这样正常 -
收不到消息
主题名字大小写、符号必须完全一模一样 -
长时间没发消息会不会掉线?
不会,库自动心跳保活
八、极简整体流程总结
- 安装启动Mosquitto服务端
- C++创建MqttClient对象
- 注册连接、消息、断开三个回调
- 设置60秒keepalive心跳
- 连接Broker
- 订阅主题等待收消息
- 发布消息发送数据
- loop循环持续监听,Ctrl+C正常退出
- 程序退出自动释放所有MQTT资源
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)