第8篇_怎么把这套开源 MQTT 客户端真正用起来
前面几篇,我们把这套开源客户端的骨架已经基本讲清楚了:
- MQTT 在哪一层
- CONNECT / CONNACK 怎么走
- PUBLISH / QoS1 / QoS2 为什么会难
- SUBSCRIBE / UNSUBSCRIBE 怎么校验
- 主状态机为什么是核心
- 现场出了问题该怎么查
这一篇我们不再偏协议,直接回工程落地:
这套开源 MQTT 客户端,怎么真正接到 PLC 工程里跑起来?
先给结论:
真正把 MQTT 用起来,不是“拖个功能块进来”就完了。
一、先看最小接入思路
把这套客户端放进工程里,可以先粗暴理解成下面这条链:

你真正关心的就是 FB_MqttClient 这一层。
它帮你把下面这些事包起来了:
- TCP 长连接
- MQTT 报文构造
- MQTT 报文解析
- QoS1 / QoS2 ACK 链
- 超时与重发
- 订阅和退订
二、上电后最基本的启动顺序
建议按这个顺序来:
- 先
bEnable := TRUE - 再给 Broker 地址、端口、Client ID、协议版本
- 再拉
bConnect - 等
bMqttConnected = TRUE - 再去做 Publish / Subscribe / Unsubscribe
顺序不要乱。
特别是不要一边连一边立刻乱触发一堆发布订阅。
先让连接态站稳,后续状态机会清楚很多。
三、最关键的参数先看表
下面这些参数,是接入时最值得优先确认的。
| 参数 | 作用 | 建议 |
|---|---|---|
sBrokerIP |
Broker 地址 | 先用固定 IP 验证 |
uiBrokerPort |
Broker 端口 | 常见 1883 |
sClientID |
客户端标识 | 保证唯一 |
eVersion |
MQTT 版本 | 先和 Broker 对齐 |
uiKeepAlive |
心跳周期 | 先用中等值,如 30~60 秒 |
bCleanSession / 相关会话参数 |
会话策略 | 测试期先明确,不要模糊 |
ePubQoS / eSubQoS |
发布订阅 QoS | 先从 QoS0 验证,再逐步升 QoS |
你会发现,这些参数里最容易出大问题的,不是端口,而是:
eVersion- 会话语义
- QoS 选择
四、先用什么 Broker 测最稳
按你已经验证过的环境,当前最典型有三类:
| Broker 类型 | 特点 | 适合干什么 |
|---|---|---|
| PLC 内置 Broker | 环境最贴近 PLC 本体 | 先做基础联通与 3.1.1 验证 |
| EMQX | 5.0 能力强,调试方便 | 测 5.0、能力协商、多客户端 |
| Mosquitto | 轻量、经典 | 做基础兼容性验证 |
我的建议是:
第一阶段
先用 PLC 内置 Broker 或 Mosquitto
目标:
- 确认 TCP 通
- 确认 CONNECT / CONNACK 正常
- 确认 QoS0 收发正常
第二阶段
再切 EMQX
目标:
- 验证 MQTT 5.0
- 验证多客户端
- 验证共享订阅 / 订阅标识符 / Topic Alias 等边界
五、为什么我强烈建议按 QoS0 -> QoS1 -> QoS2 逐级上
不要一上来就高频 QoS2。
那是最容易把自己调试节奏打乱的。
推荐顺序:
第一步:QoS0
先验证:
- 连得上
- 发得出去
- 收得到
- Topic 没配错
第二步:QoS1
再验证:
PUBACK链是否正常- 高频时会不会超时
第三步:QoS2
最后验证:
PUBREC/PUBREL/PUBCOMP完整链- 高频和多客户端下的稳定性
这个顺序背后只有一句话:
先验证“能通信”,再验证“能可靠通信”,最后再验证“能长期稳定可靠通信”。
六、最推荐的测试环境组合
如果你要自己复用这套流程,我最推荐的测试组合是:

这个组合的好处是:
- PLC 端能真实跑你的 ST 客户端
- PC 端有两个不同客户端做交叉验证
- IDE 能直接在线盯状态机和变量
这比“只拿一个客户端单机自测”有价值得多。
七、如果 PLC 能 ping 通 Broker,但就是连不上 MQTT,先别急着怀疑代码
这个坑你已经实际踩过了,非常典型。
很多时候问题不在 MQTT 报文,而在:
- Windows 网卡网络类型
- 防火墙策略
- Broker 监听地址
- PLC 到 PC 的 TCP 端口可达性
所以看到:
- PLC 能 ping 通
- IDE 也能在线监控
- MQTTBox 和通信猫都能连
- 但 PLC 程序
TcpConnect timeout
不要直接改 ST 代码。
先查 PC 网络环境和 Broker 监听。
这是很典型的“环境问题伪装成协议问题”。
八、最推荐的基础测试项
建议把测试拆成 4 组。
A 组:连接层
| 测试项 | 目标 |
|---|---|
| TCP 建连 | 验证端口可达 |
| CONNECT / CONNACK | 验证 MQTT 握手 |
| 断开重连 | 验证异常恢复 |
B 组:基础业务层
| 测试项 | 目标 |
|---|---|
| QoS0 发布 | 最小业务闭环 |
| QoS0 订阅 | 最小接收闭环 |
| Retain 接收 | 验证标志位与接收面 |
C 组:高可靠层
| 测试项 | 目标 |
|---|---|
| QoS1 发布订阅 | 验证 PUBACK 链 |
| QoS2 发布订阅 | 验证完整四段链 |
| 高频多客户端 | 压 ACK 和状态机 |
D 组:边界与恢复层
| 测试项 | 目标 |
|---|---|
| 取消订阅 | 验证 UNSUBACK 链 |
| 重连后订阅行为 | 看会话恢复逻辑 |
| MQTT 5.0 特性 | 看能力协商与边界 |
九、联调时最该盯的变量
真正联调时,不要什么都看。
优先盯这几个点最有效:
| 变量 / 现象 | 用途 |
|---|---|
eState |
看当前卡在哪个状态 |
bMqttConnected |
看 MQTT 层是否真正建连 |
xWaitingForAck |
看是否一直在等某个确认 |
byExpectedMsgType |
看当前到底在等谁 |
uiSubscriptionCount |
看订阅列表是否被清空或恢复 |
sDiagMsg |
看最后的错误说明 |
这几个点基本能覆盖大多数联调判断。
十、上线前至少该通过哪些标准
如果站在“这套功能真能用”的角度,我建议至少满足下面这些最低标准:
- TCP 建连稳定,不反复超时
- CONNECT / CONNACK 在目标 Broker 下稳定通过
- QoS0 长时间发布订阅正常
- QoS1 高频多主题场景稳定
- QoS2 高频多主题场景稳定
- 退订不再卡状态
- 重连后行为符合你的会话预期
- MQTT 5.0 环境下能正确受 Broker 能力约束
注意第 4 和第 5 条。
如果这两条过不了,不要急着说“Phase1 能用了”。
十一、这套开源客户端适合什么场景
我觉得它最适合下面这几类用户:
| 用户 | 场景 |
|---|---|
| 自动化工程师 | 设备接云、边缘采集、上位协同 |
| CODESYS 用户 | 不想被官方收费库卡住 |
| 想学协议实现的人 | 想把标准、报文、状态机、代码串起来看 |
| 想二开的人 | 要自己改功能、加策略、做适配 |
它不是一个“点一下就无脑全自动”的黑盒库。
但恰恰因为不是黑盒,才更适合工程掌控和长期维护。
十二、这一篇你最该记住的 5 句话
- 接入 MQTT,不是拖个功能块进来就完了,环境、参数、Broker、测试项都要配合。
- 调试顺序一定要从 QoS0 开始,逐级上到 QoS1 和 QoS2。
- PLC 能 ping 通 Broker,不等于 PLC 的 MQTT 一定能连通。
- 联调时优先盯状态机、等待标志、期待报文类型和诊断信息。
- 真正可用的标准,不是“偶尔能收发”,而是高频 QoS1 / QoS2 也能长期稳定。
十三、下篇预告
下一篇我们专门聊一个很容易引战、但工程上必须说清楚的话题:
MQTT 5.0 到底值不值得上?
会重点讲:
- 5.0 的价值到底在哪
- 工业现场到底该不该一上来就开 5.0
- “支持 5.0” 和 “跑稳 5.0” 到底差在哪里
完整 ST 代码
复制使用说明
- 这部分给出的是与本篇主题直接对应的完整 ST 代码,不是零碎片段。
- 如果你只是想先跑通,优先整段复制,不要只摘几行变量或几条赋值语句。
- 如果是
METHOD,请确认它仍然属于FB_MqttClient;如果是PROGRAM,请确认相关 DUT、GVL、FB 已一并导入。
代码阅读重点
- 先按
报文结构 -> 状态机入口 -> 关键变量 -> 返回结果的顺序看。 - 再把正文里的十六进制拆解和这里的字节写入、字节解析语句一行行对上。
- 最后回到在线调试,重点盯
uiTxLength、uiRxLength、eState、xWaitingForAck这类状态量。
完整代码 1:PRG_Test
- 对应源码路径:
10 MQTT/MqttClient_V1_0/Device/Application/MQTT/POUs/PRG_Test.st - 复制使用说明:这一篇重实践,所以最适合直接给出完整测试程序。复制后改 IP、ClientID、Topic 就能起跑。
- 阅读重点:先看连接参数,再看发布/订阅命令输入,最后在线监控输出引脚,就能快速把整套功能跑通。
/// =======================================================================
/// 名称 : PRG_Test
/// 功能 : MQTT 客户端测试程序
/// 说明 : 用于驱动 FB_MqttClient 进行连接、发布、订阅与接收测试
/// 编程人员 : ControlRookie
/// 时间 : 2026-05-05
/// 版本 : V1.0
/// =======================================================================
PROGRAM PRG_Test
VAR
bInit : BOOL := TRUE; // 初始化执行标志
fbMqttClient : FB_MqttClient; // MQTT 客户端实例
bEnable : BOOL := TRUE; // 使能客户端
bConnect : BOOL; // 连接命令
sBrokerIP : STRING := '192.168.20.100'; // Broker IP 地址
uiPort : UINT := 1883; // Broker 端口号
sClientID : STRING := 'CodeSys_PLC'; // 客户端标识符
sUsername : STRING := ''; // 用户名
sPassword : STRING := ''; // 密码
eVersion : E_MqttVersion := E_MqttVersion.byMqttVersion311; // MQTT 协议版本
bCleanSession : BOOL := TRUE; // 清理会话标志
uiKeepAlive : UINT := 60; // 心跳周期(秒)
bUseSSL : BOOL := FALSE; // 是否启用 SSL
bAutoReconnect : BOOL := TRUE; // 是否自动重连
uiReconnectDelay : UINT := 5000; // 重连延时(毫秒)
udiSessionExpiry : UDINT := 0; // 会话过期间隔
uiReceiveMax : UINT := 65535; // 最大接收数量
udMaxPacketSize : UDINT := 4096; // 最大报文长度
bWillFlag : BOOL := TRUE; // 是否启用遗嘱消息
sWillTopic : STRING := 'CodeSys'; // 遗嘱主题
sWillMessage : STRING := 'CodeSys Offline'; // 遗嘱消息内容
eWillQoS : E_MqttQoS := E_MqttQoS.byQoS1; // 遗嘱消息 QoS
bWillRetain : BOOL := FALSE; // 遗嘱消息保留标志
bPublish : BOOL; // 发布命令
sPubTopic : STRING := 'CodeSys'; // 发布主题
sPubPayload : STRING := 'CodeSys'; // 发布载荷
ePubQoS : E_MqttQoS := E_MqttQoS.byQoS1; // 发布 QoS
bPubRetain : BOOL := FALSE; // 发布保留标志
bSubscribe : BOOL; // 订阅命令
bUnsubscribe : BOOL; // 取消订阅命令
sSubTopic : STRING := 'CodeSys'; // 订阅主题
sUnsubTopic : STRING := 'CodeSys'; // 取消订阅主题
eSubQoS : E_MqttQoS := E_MqttQoS.byQoS1; // 订阅请求 QoS
udiSubscriptionId : UDINT := 0; // MQTT 5.0 订阅标识符
eMqttState : E_MqttState; // 客户端当前状态
bIsConnected : BOOL; // TCP 连接状态
bMqttConnected : BOOL; // MQTT 连接状态
bError : BOOL; // 错误标志
eErrorID : NBS.ERROR; // 错误码
sDiagMsg : STRING; // 诊断信息
aSubscriptions : ARRAY[1..GVL_Mqtt.cnMaxSubscriptions] OF ST_MqttSubscription; // 订阅列表
uiSubscriptionCount : UINT; // 订阅数量
sRecTopic : STRING; // 最新接收主题
sRecPayload : STRING; // 最新接收载荷
aRecTopicList : ARRAY[0..GVL_Mqtt.cnMaxHistory] OF STRING; // 接收主题历史
aRecPayloadList : ARRAY[0..GVL_Mqtt.cnMaxHistory] OF STRING; // 接收载荷历史
byReceivedQoS : BYTE; // 最新接收 QoS
bReceivedRetain : BOOL; // 最新接收保留标志
END_VAR
// === IMPLEMENTATION ===
IF bInit THEN
bInit := FALSE;
END_IF
/// MQTT客户端
fbMqttClient(
bEnable := bEnable,
bConnect := bConnect,
sBrokerIP := sBrokerIP,
uiPort := uiPort,
sClientID := sClientID,
sUsername := sUsername,
sPassword := sPassword,
eVersion := eVersion,
bCleanSession := bCleanSession,
uiKeepAlive := uiKeepAlive,
bUseSSL := bUseSSL,
bAutoReconnect := bAutoReconnect,
uiReconnectDelay := uiReconnectDelay,
udiSessionExpiry := udiSessionExpiry,
uiReceiveMax := uiReceiveMax,
udMaxPacketSize := udMaxPacketSize,
bWillFlag := bWillFlag,
sWillTopic := sWillTopic,
sWillMessage := sWillMessage,
eWillQoS := eWillQoS,
bWillRetain := bWillRetain,
bPublish := bPublish,
sPubTopic := sPubTopic,
sPubPayload := sPubPayload,
ePubQoS := ePubQoS,
bPubRetain := bPubRetain,
bSubscribe := bSubscribe,
sSubTopic := sSubTopic,
eSubQoS := eSubQoS,
udiSubscriptionId := udiSubscriptionId,
bUnsubscribe := bUnsubscribe,
sUnsubTopic := sUnsubTopic,
eState => eMqttState,
bIsConnected => bIsConnected,
bMqttConnected => bMqttConnected,
bError => bError,
eErrorID => eErrorID,
sDiagMsg => sDiagMsg,
aSubscriptions => aSubscriptions,
uiSubscriptionCount => uiSubscriptionCount,
sRecTopic => sRecTopic,
sRecPayload => sRecPayload,
aRecTopicList => aRecTopicList,
aRecPayloadList => aRecPayloadList,
byReceivedQoS => byReceivedQoS,
bReceivedRetain => bReceivedRetain);
完整代码 2:PLC_PRG
- 对应源码路径:
10 MQTT/MqttClient_V1_0/Device/Application/PLC_PRG.st - 复制使用说明:这是示例工程主入口,适合做“最小可运行工程”模板。
- 阅读重点:重点看工程入口怎么挂接测试程序,这一步很简单,但很多人第一次导入例程就是卡在这里。
PROGRAM PLC_PRG
VAR
bInit : BOOL := TRUE;
fbMqttClient : FB_MqttClient;
bEnable : BOOL;
sBrokerIP : STRING := '192.168.20.100';
//sBrokerIP : STRING := '127.0.0.1';
uiPort : UINT := 1883;
sUsername : STRING := 'PLC';
sPassword : STRING := '123456';
sWillTopic : STRING := 'CodeSys';
sWillMessage : STRING := 'CodeSys is Offline';
sWillRetain : BOOL;
bCleanSession : BOOL := FALSE;
uiQoS : UINT := 0;
wKeepAlive : WORD := 0;
bAutoReconnect : BOOL := TRUE;
sPayload : STRING(255) := 'This is CodeSys';
sTopicPublish : STRING := 'CodeSys';
sTopicSubscribe : STRING := 'CodeSys';
bRetain : BOOL := TRUE;
bPublish : BOOL;
bSubscribe : BOOL;
sClientID : STRING := 'CodeSys PLC';
sLastRecMsg : STRING;
sLastRecMsgTopic : STRING;
aLastRecMsgs : ARRAY [0..24] OF STRING;
aLastMsgTopics : ARRAY [0..24] OF STRING;
bMsgArrivedIn : BOOL;
sDiagMsg : STRING;
eState : E_MqttState;
bIsConnected : BOOL;
bError : BOOL;
eError : NBS.ERROR;
END_VAR
// === IMPLEMENTATION ===
PRG_Test();
系列导航
- 系列定位:第 8 篇
- 上一篇:第7篇 MQTT 现场排障
- 下一篇:加更1 MQTT 5.0 到底值不值得上
项目与资料
- 开源项目名称:MqttClient_V1_0
- GitHub 仓库关键字:ControlRookie MqttClient_V1_0
- 配套资料说明:我已经同步整理 MQTT 标准、控制报文、十六进制拆解、状态机图和 ST 代码映射知识库,后续会持续补全和公开入口
适合谁收藏
- 正在做 CODESYS / PLC / MQTT 项目的人
- 想把 MQTT 从报文真正看到 ST 代码的人
- 正在排查 QoS1 / QoS2 超时、掉线、重连问题的人
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)