前面几篇,我们把这套开源客户端的骨架已经基本讲清楚了:

  • MQTT 在哪一层
  • CONNECT / CONNACK 怎么走
  • PUBLISH / QoS1 / QoS2 为什么会难
  • SUBSCRIBE / UNSUBSCRIBE 怎么校验
  • 主状态机为什么是核心
  • 现场出了问题该怎么查

这一篇我们不再偏协议,直接回工程落地:

这套开源 MQTT 客户端,怎么真正接到 PLC 工程里跑起来?

先给结论:

真正把 MQTT 用起来,不是“拖个功能块进来”就完了。


一、先看最小接入思路

把这套客户端放进工程里,可以先粗暴理解成下面这条链:

你真正关心的就是 FB_MqttClient 这一层。

它帮你把下面这些事包起来了:

  • TCP 长连接
  • MQTT 报文构造
  • MQTT 报文解析
  • QoS1 / QoS2 ACK 链
  • 超时与重发
  • 订阅和退订

二、上电后最基本的启动顺序

建议按这个顺序来:

  1. bEnable := TRUE
  2. 再给 Broker 地址、端口、Client ID、协议版本
  3. 再拉 bConnect
  4. bMqttConnected = TRUE
  5. 再去做 Publish / Subscribe / Unsubscribe

顺序不要乱。

特别是不要一边连一边立刻乱触发一堆发布订阅。
先让连接态站稳,后续状态机会清楚很多。


三、最关键的参数先看表

下面这些参数,是接入时最值得优先确认的。

参数 作用 建议
sBrokerIP Broker 地址 先用固定 IP 验证
uiBrokerPort Broker 端口 常见 1883
sClientID 客户端标识 保证唯一
eVersion MQTT 版本 先和 Broker 对齐
uiKeepAlive 心跳周期 先用中等值,如 30~60 秒
bCleanSession / 相关会话参数 会话策略 测试期先明确,不要模糊
ePubQoS / eSubQoS 发布订阅 QoS 先从 QoS0 验证,再逐步升 QoS

你会发现,这些参数里最容易出大问题的,不是端口,而是:

  • eVersion
  • 会话语义
  • QoS 选择

四、先用什么 Broker 测最稳

按你已经验证过的环境,当前最典型有三类:

Broker 类型 特点 适合干什么
PLC 内置 Broker 环境最贴近 PLC 本体 先做基础联通与 3.1.1 验证
EMQX 5.0 能力强,调试方便 测 5.0、能力协商、多客户端
Mosquitto 轻量、经典 做基础兼容性验证

我的建议是:

第一阶段

先用 PLC 内置 BrokerMosquitto

目标:

  • 确认 TCP 通
  • 确认 CONNECT / CONNACK 正常
  • 确认 QoS0 收发正常

第二阶段

再切 EMQX

目标:

  • 验证 MQTT 5.0
  • 验证多客户端
  • 验证共享订阅 / 订阅标识符 / Topic Alias 等边界

五、为什么我强烈建议按 QoS0 -> QoS1 -> QoS2 逐级上

不要一上来就高频 QoS2。
那是最容易把自己调试节奏打乱的。

推荐顺序:

第一步:QoS0

先验证:

  • 连得上
  • 发得出去
  • 收得到
  • Topic 没配错

第二步:QoS1

再验证:

  • PUBACK 链是否正常
  • 高频时会不会超时

第三步:QoS2

最后验证:

  • PUBREC/PUBREL/PUBCOMP 完整链
  • 高频和多客户端下的稳定性

这个顺序背后只有一句话:

先验证“能通信”,再验证“能可靠通信”,最后再验证“能长期稳定可靠通信”。


六、最推荐的测试环境组合

如果你要自己复用这套流程,我最推荐的测试组合是:

这个组合的好处是:

  1. PLC 端能真实跑你的 ST 客户端
  2. PC 端有两个不同客户端做交叉验证
  3. IDE 能直接在线盯状态机和变量

这比“只拿一个客户端单机自测”有价值得多。


七、如果 PLC 能 ping 通 Broker,但就是连不上 MQTT,先别急着怀疑代码

这个坑你已经实际踩过了,非常典型。

很多时候问题不在 MQTT 报文,而在:

  • Windows 网卡网络类型
  • 防火墙策略
  • Broker 监听地址
  • PLC 到 PC 的 TCP 端口可达性

所以看到:

  • PLC 能 ping 通
  • IDE 也能在线监控
  • MQTTBox 和通信猫都能连
  • 但 PLC 程序 TcpConnect timeout

不要直接改 ST 代码。
先查 PC 网络环境和 Broker 监听。

这是很典型的“环境问题伪装成协议问题”。


八、最推荐的基础测试项

建议把测试拆成 4 组。

A 组:连接层

测试项 目标
TCP 建连 验证端口可达
CONNECT / CONNACK 验证 MQTT 握手
断开重连 验证异常恢复

B 组:基础业务层

测试项 目标
QoS0 发布 最小业务闭环
QoS0 订阅 最小接收闭环
Retain 接收 验证标志位与接收面

C 组:高可靠层

测试项 目标
QoS1 发布订阅 验证 PUBACK
QoS2 发布订阅 验证完整四段链
高频多客户端 压 ACK 和状态机

D 组:边界与恢复层

测试项 目标
取消订阅 验证 UNSUBACK
重连后订阅行为 看会话恢复逻辑
MQTT 5.0 特性 看能力协商与边界

九、联调时最该盯的变量

真正联调时,不要什么都看。
优先盯这几个点最有效:

变量 / 现象 用途
eState 看当前卡在哪个状态
bMqttConnected 看 MQTT 层是否真正建连
xWaitingForAck 看是否一直在等某个确认
byExpectedMsgType 看当前到底在等谁
uiSubscriptionCount 看订阅列表是否被清空或恢复
sDiagMsg 看最后的错误说明

这几个点基本能覆盖大多数联调判断。


十、上线前至少该通过哪些标准

如果站在“这套功能真能用”的角度,我建议至少满足下面这些最低标准:

  1. TCP 建连稳定,不反复超时
  2. CONNECT / CONNACK 在目标 Broker 下稳定通过
  3. QoS0 长时间发布订阅正常
  4. QoS1 高频多主题场景稳定
  5. QoS2 高频多主题场景稳定
  6. 退订不再卡状态
  7. 重连后行为符合你的会话预期
  8. MQTT 5.0 环境下能正确受 Broker 能力约束

注意第 4 和第 5 条。
如果这两条过不了,不要急着说“Phase1 能用了”。


十一、这套开源客户端适合什么场景

我觉得它最适合下面这几类用户:

用户 场景
自动化工程师 设备接云、边缘采集、上位协同
CODESYS 用户 不想被官方收费库卡住
想学协议实现的人 想把标准、报文、状态机、代码串起来看
想二开的人 要自己改功能、加策略、做适配

它不是一个“点一下就无脑全自动”的黑盒库。
但恰恰因为不是黑盒,才更适合工程掌控和长期维护。


十二、这一篇你最该记住的 5 句话

  1. 接入 MQTT,不是拖个功能块进来就完了,环境、参数、Broker、测试项都要配合。
  2. 调试顺序一定要从 QoS0 开始,逐级上到 QoS1 和 QoS2。
  3. PLC 能 ping 通 Broker,不等于 PLC 的 MQTT 一定能连通。
  4. 联调时优先盯状态机、等待标志、期待报文类型和诊断信息。
  5. 真正可用的标准,不是“偶尔能收发”,而是高频 QoS1 / QoS2 也能长期稳定。

十三、下篇预告

下一篇我们专门聊一个很容易引战、但工程上必须说清楚的话题:

MQTT 5.0 到底值不值得上?

会重点讲:

  • 5.0 的价值到底在哪
  • 工业现场到底该不该一上来就开 5.0
  • “支持 5.0” 和 “跑稳 5.0” 到底差在哪里

完整 ST 代码

复制使用说明

  • 这部分给出的是与本篇主题直接对应的完整 ST 代码,不是零碎片段。
  • 如果你只是想先跑通,优先整段复制,不要只摘几行变量或几条赋值语句。
  • 如果是 METHOD,请确认它仍然属于 FB_MqttClient;如果是 PROGRAM,请确认相关 DUT、GVL、FB 已一并导入。

代码阅读重点

  • 先按 报文结构 -> 状态机入口 -> 关键变量 -> 返回结果 的顺序看。
  • 再把正文里的十六进制拆解和这里的字节写入、字节解析语句一行行对上。
  • 最后回到在线调试,重点盯 uiTxLengthuiRxLengtheStatexWaitingForAck 这类状态量。

完整代码 1:PRG_Test

  • 对应源码路径:10 MQTT/MqttClient_V1_0/Device/Application/MQTT/POUs/PRG_Test.st
  • 复制使用说明:这一篇重实践,所以最适合直接给出完整测试程序。复制后改 IP、ClientID、Topic 就能起跑。
  • 阅读重点:先看连接参数,再看发布/订阅命令输入,最后在线监控输出引脚,就能快速把整套功能跑通。
/// =======================================================================
/// 名称      : PRG_Test
/// 功能      : MQTT 客户端测试程序
/// 说明      : 用于驱动 FB_MqttClient 进行连接、发布、订阅与接收测试
/// 编程人员  : ControlRookie
/// 时间      : 2026-05-05
/// 版本      : V1.0
/// =======================================================================
PROGRAM PRG_Test
VAR
	bInit                    : BOOL := TRUE;                                                    // 初始化执行标志
	fbMqttClient            : FB_MqttClient;                                                    // MQTT 客户端实例
	bEnable                  : BOOL := TRUE;                                                    // 使能客户端
	bConnect                 : BOOL;                                                            // 连接命令
	sBrokerIP                : STRING := '192.168.20.100';                                     // Broker IP 地址
	uiPort                   : UINT := 1883;                                                   // Broker 端口号
	sClientID                : STRING := 'CodeSys_PLC';                                        // 客户端标识符
	sUsername                : STRING := '';                                                   // 用户名
	sPassword                : STRING := '';                                                   // 密码
	eVersion                 : E_MqttVersion := E_MqttVersion.byMqttVersion311;                // MQTT 协议版本
	bCleanSession            : BOOL := TRUE;                                                   // 清理会话标志
	uiKeepAlive              : UINT := 60;                                                     // 心跳周期(秒)
	bUseSSL                  : BOOL := FALSE;                                                  // 是否启用 SSL
	bAutoReconnect           : BOOL := TRUE;                                                   // 是否自动重连
	uiReconnectDelay         : UINT := 5000;                                                   // 重连延时(毫秒)
	udiSessionExpiry         : UDINT := 0;                                                     // 会话过期间隔
	uiReceiveMax             : UINT := 65535;                                                  // 最大接收数量
	udMaxPacketSize          : UDINT := 4096;                                                  // 最大报文长度
	bWillFlag                : BOOL := TRUE;                                                   // 是否启用遗嘱消息
	sWillTopic               : STRING := 'CodeSys';                                            // 遗嘱主题
	sWillMessage             : STRING := 'CodeSys Offline';                                    // 遗嘱消息内容
	eWillQoS                 : E_MqttQoS := E_MqttQoS.byQoS1;                                 // 遗嘱消息 QoS
	bWillRetain              : BOOL := FALSE;                                                  // 遗嘱消息保留标志
	bPublish                 : BOOL;                                                            // 发布命令
	sPubTopic                : STRING := 'CodeSys';                                            // 发布主题
	sPubPayload              : STRING := 'CodeSys';                                            // 发布载荷
	ePubQoS                  : E_MqttQoS := E_MqttQoS.byQoS1;                                 // 发布 QoS
	bPubRetain               : BOOL := FALSE;                                                  // 发布保留标志
	bSubscribe               : BOOL;                                                            // 订阅命令
	bUnsubscribe             : BOOL;                                                            // 取消订阅命令
	sSubTopic                : STRING := 'CodeSys';                                            // 订阅主题
	sUnsubTopic              : STRING := 'CodeSys';                                            // 取消订阅主题
	eSubQoS                  : E_MqttQoS := E_MqttQoS.byQoS1;                                 // 订阅请求 QoS
	udiSubscriptionId        : UDINT := 0;                                                    // MQTT 5.0 订阅标识符
	eMqttState               : E_MqttState;                                                    // 客户端当前状态
	bIsConnected             : BOOL;                                                            // TCP 连接状态
	bMqttConnected           : BOOL;                                                            // MQTT 连接状态
	bError                   : BOOL;                                                            // 错误标志
	eErrorID                 : NBS.ERROR;                                                       // 错误码
	sDiagMsg                 : STRING;                                                          // 诊断信息
	aSubscriptions           : ARRAY[1..GVL_Mqtt.cnMaxSubscriptions] OF ST_MqttSubscription;  // 订阅列表
	uiSubscriptionCount      : UINT;                                                            // 订阅数量
	sRecTopic                : STRING;                                                          // 最新接收主题
	sRecPayload              : STRING;                                                          // 最新接收载荷
	aRecTopicList            : ARRAY[0..GVL_Mqtt.cnMaxHistory] OF STRING;                      // 接收主题历史
	aRecPayloadList          : ARRAY[0..GVL_Mqtt.cnMaxHistory] OF STRING;                      // 接收载荷历史
	byReceivedQoS            : BYTE;                                                            // 最新接收 QoS
	bReceivedRetain          : BOOL;                                                            // 最新接收保留标志
END_VAR

// === IMPLEMENTATION ===
	IF bInit THEN
	bInit := FALSE;
END_IF

/// MQTT客户端
fbMqttClient(
	bEnable                := bEnable,
	bConnect               := bConnect,
	sBrokerIP              := sBrokerIP,
	uiPort                 := uiPort,
	sClientID              := sClientID,
	sUsername              := sUsername,
	sPassword              := sPassword,
	eVersion               := eVersion,
	bCleanSession          := bCleanSession,
	uiKeepAlive            := uiKeepAlive,
	bUseSSL                := bUseSSL,
	bAutoReconnect         := bAutoReconnect,
	uiReconnectDelay       := uiReconnectDelay,
	udiSessionExpiry       := udiSessionExpiry,
	uiReceiveMax           := uiReceiveMax,
	udMaxPacketSize        := udMaxPacketSize,
	bWillFlag              := bWillFlag,
	sWillTopic             := sWillTopic,
	sWillMessage           := sWillMessage,
	eWillQoS               := eWillQoS,
	bWillRetain            := bWillRetain,
	bPublish               := bPublish,
	sPubTopic              := sPubTopic,
	sPubPayload            := sPubPayload,
	ePubQoS                := ePubQoS,
	bPubRetain             := bPubRetain,
	bSubscribe             := bSubscribe,
	sSubTopic              := sSubTopic,
	eSubQoS                := eSubQoS,
	udiSubscriptionId      := udiSubscriptionId,
	bUnsubscribe           := bUnsubscribe,
	sUnsubTopic            := sUnsubTopic,
	eState                 => eMqttState,
	bIsConnected           => bIsConnected,
	bMqttConnected         => bMqttConnected,
	bError                 => bError,
	eErrorID               => eErrorID,
	sDiagMsg               => sDiagMsg,
	aSubscriptions         => aSubscriptions,
	uiSubscriptionCount    => uiSubscriptionCount,
	sRecTopic              => sRecTopic,
	sRecPayload            => sRecPayload,
	aRecTopicList          => aRecTopicList,
	aRecPayloadList        => aRecPayloadList,
	byReceivedQoS          => byReceivedQoS,
	bReceivedRetain        => bReceivedRetain);

完整代码 2:PLC_PRG

  • 对应源码路径:10 MQTT/MqttClient_V1_0/Device/Application/PLC_PRG.st
  • 复制使用说明:这是示例工程主入口,适合做“最小可运行工程”模板。
  • 阅读重点:重点看工程入口怎么挂接测试程序,这一步很简单,但很多人第一次导入例程就是卡在这里。
PROGRAM PLC_PRG
VAR
	bInit				: BOOL := TRUE;
	fbMqttClient		: FB_MqttClient;

	bEnable				: BOOL;
	sBrokerIP			: STRING := '192.168.20.100';
	//sBrokerIP			: STRING := '127.0.0.1';
	uiPort				: UINT := 1883;
	sUsername			: STRING := 'PLC';
	sPassword			: STRING := '123456';
	sWillTopic			: STRING := 'CodeSys';
	sWillMessage		: STRING := 'CodeSys is Offline';
	sWillRetain			: BOOL;
	bCleanSession		: BOOL := FALSE;
	uiQoS				: UINT := 0;
	wKeepAlive			: WORD := 0;
	bAutoReconnect		: BOOL := TRUE;
	sPayload			: STRING(255) := 'This is CodeSys';
	sTopicPublish		: STRING := 'CodeSys';
	sTopicSubscribe		: STRING := 'CodeSys';
	bRetain				: BOOL := TRUE;
	bPublish			: BOOL;
	bSubscribe			: BOOL;
	sClientID			: STRING := 'CodeSys PLC';
	sLastRecMsg			: STRING;
	sLastRecMsgTopic	: STRING;
	aLastRecMsgs		: ARRAY [0..24] OF STRING;
	aLastMsgTopics		: ARRAY [0..24] OF STRING;
	bMsgArrivedIn		: BOOL;
	sDiagMsg			: STRING;
	eState				: E_MqttState;
	bIsConnected		: BOOL;
	bError				: BOOL;
	eError				: NBS.ERROR;
END_VAR

// === IMPLEMENTATION ===
PRG_Test();

系列导航

  • 系列定位:第 8 篇
  • 上一篇:第7篇 MQTT 现场排障
  • 下一篇:加更1 MQTT 5.0 到底值不值得上

项目与资料

  • 开源项目名称:MqttClient_V1_0
  • GitHub 仓库关键字:ControlRookie MqttClient_V1_0
  • 配套资料说明:我已经同步整理 MQTT 标准、控制报文、十六进制拆解、状态机图和 ST 代码映射知识库,后续会持续补全和公开入口

适合谁收藏

  • 正在做 CODESYS / PLC / MQTT 项目的人
  • 想把 MQTT 从报文真正看到 ST 代码的人
  • 正在排查 QoS1 / QoS2 超时、掉线、重连问题的人
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐