Abstract

这一篇先不讲报文细节,只解决 3 个基础问题:为什么要做这套开源客户端、MQTT 到底处于哪一层、以及当前项目为什么是 NBS 上的 TCP 再往上跑 MQTT。适合刚准备从 PLC 进入 MQTT 的自动化工程师先建立整体坐标。

很多自动化工程师刚接触 MQTT 时,最容易卡住的地方其实不是代码,而是两个更前面的事:

  1. 库怎么选
  2. 协议到底跑在哪一层

第一个问题很现实。
很多人准备在 CODESYS 里接 MQTT,结果一看官方 MQTT Client SL,需要额外成本,第一反应通常都是:

这玩意儿还要花钱?

第二个问题更普遍。
很多自动化工程师一听到协议,就容易把:

  • 以太网
  • IP
  • TCP
  • MQTT
  • Modbus TCP
  • HTTP

全混到一起。

最后脑子里只剩一句话:

反正都是通信协议,但它们到底谁在上面、谁在下面,完全理不清。

所以这一篇我不打算先讲报文字节,也不先讲 QoS。
这一篇只做三件事:

  1. 说清楚为什么我要自己开源一套 CODESYS MQTT 客户端
  2. 用最短路径把协议分层讲清楚
  3. 明确这套项目到底是什么架构:NBS 上的 TCP,再往上跑 MQTT

如果你把这一篇看明白了,后面看 CONNECT、PUBLISH、QoS、状态机和 ST 代码,都会顺很多。


一、为什么我要自己写一套开源 MQTT 客户端

结论先说:

不是为了炫技,是为了让 MQTT 在 PLC 里更便宜、更透明、更好查问题。

1. 最现实的问题:成本

很多场景并不是大项目,只是:

  • 做个验证
  • 做个预研
  • 做个 demo
  • 给设备补一个 MQTT 功能

这种时候,如果一上来就被库授权卡住,心里肯定会犯嘀咕。

所以最直接的动机就是:

做一套完全开源、真正能落地的 MQTT 客户端。

2. 比收费更痛的是黑盒

自动化现场最难受的,不是完全不能跑,而是:

  • 能连,但偶尔掉线
  • 能收,但时不时报 timeout
  • QoS0 正常,一到 QoS1 / QoS2 就开始抽风
  • 重连以后订阅丢了

这时候如果底层是黑盒,你基本只能靠猜。

但如果源码在你手里,你至少能顺着代码一路往里看:

  • CONNECT 怎么发
  • ACK 怎么等
  • 超时在哪里判
  • 状态机为什么跳到 iTcpDisconnect

这就是开源方案最核心的价值之一:

不是只让你用,而是让你敢查、敢改、敢接手。

3. 自动化工程师真正缺的,不是概念,而是“从协议到代码”的桥

很多资料会讲 MQTT 很轻量、很流行、很适合 IoT。
这些都没错。

但对搞 PLC 的人来说,真正痛点是:

你别光告诉我它适合,你得告诉我它到底怎么落到代码里。

这也是我做这套项目和知识库最看重的一点:

  • 不只讲概念
  • 不只讲工具
  • 重点讲:标准 -> 报文 -> 状态机 -> ST 代码

二、先把最容易混的事讲清:协议分层到底怎么看

这一段非常重要。

因为自动化工程师一旦把分层理顺了,后面很多协议都不会再乱。


三、先看一个最粗暴的结论

1. 五层模型里,你可以先这样理解

层级 你可以先把它理解成什么 典型例子
应用层 业务规则 MQTT、HTTP、Modbus TCP 的应用数据格式
传输层 负责端到端传输 TCP、UDP
网络层 负责找路由、找 IP IP
数据链路层 负责同一链路上的帧传输 以太网 MAC、交换机这一层
物理层 线缆、电口、光口、电信号 网线、光纤、RJ45

2. 七层模型本质上只是拆得更细

很多教材喜欢讲 OSI 七层模型,但对工程入门来说,先不用搞那么复杂。

你可以简单记成:

  • 七层是“更细的理论讲法”
  • 五层是“更适合干活时理解的讲法”

对我们当前这个项目来说,用五层模型已经完全够了。


四、MQTT 在哪一层

直接给答案:

MQTT 是应用层协议。

什么意思?

就是说 MQTT 自己不负责:

  • 电信号怎么跑
  • 网线怎么传
  • IP 怎么路由
  • 数据包怎么重传

它负责的是:

  • 客户端怎么连接 Broker
  • 主题怎么组织
  • 消息怎么发布
  • 订阅怎么确认
  • QoS 怎么走确认链

换句话说,MQTT 关心的是“业务通信规则”,不是“底层传输细节”。


五、那 TCP 又在哪一层

TCP 在传输层。

TCP 负责的事情,简单说就是:

  • 建连接
  • 保证顺序
  • 丢了能重传
  • 对端收没收到有机制可依

所以你可以把 MQTT 和 TCP 的关系先粗暴理解成:

  • TCP 负责把字节可靠地送过去
  • MQTT 负责规定这些字节在业务上是什么意思

这句话很值钱,后面反复能用上。


六、我们这个例子到底是什么结构

这套 MqttClient_V1_0 当前走的是非常典型的一条链:

如果映射到当前 CODESYS 工程里,可以进一步理解成:

所以这套实现的核心不是“凭空发 MQTT”,而是:

在 NBS 提供的 TCP 能力之上,再去实现 MQTT 这一层协议逻辑。

这也是为什么你在代码里会同时看到两类东西:

1. TCP 层对象

  • NBS.TCP_Client
  • NBS.TCP_Read
  • NBS.TCP_Write

它们负责:

  • 连 TCP
  • 读 TCP
  • 写 TCP

2. MQTT 层逻辑

  • M_BuildConnectPacket
  • M_BuildPublishPacket
  • M_BuildSubscribePacket
  • M_ProcessReceive
  • M_HandleConnAck

它们负责:

  • 按 MQTT 标准去组织报文
  • 按 MQTT 标准去解析报文
  • 管理 MQTT 状态机

这两层千万别混。


七、MQTT 一定要跑在 TCP 上吗

这是很多人会问的问题。

先说工程结论

标准 MQTT 最常见、最主流、最正统的运行方式,就是跑在 TCP 长连接上。

所以我们这个项目,当前就是:

TCP 上的 MQTT

那能不能跑在 UDP 上?

标准 MQTT 本身,主流实现不是跑在 UDP 上。
因为 MQTT 很多关键能力天然依赖可靠传输,尤其是:

  • QoS1
  • QoS2
  • 长连接
  • KeepAlive
  • 有序确认链

这些东西跟 TCP 的契合度非常高。

如果你硬要用 UDP,那很多可靠性问题就得自己兜,非常麻烦。

那能不能跑在串口总线上?

如果你问的是“这套 MqttClient_V1_0 当前能不能直接跑在串口上”,答案是:

不能。

因为它现在的基础设施就是:

  • NBS 的 TCP 连接
  • TCP 读写

不是串口驱动,不是串口帧协议。

那 MQTT 能不能通过别的承载方式存在?

理论上你可以看到一些变种场景,比如:

  • MQTT over WebSocket

但那也是在特定网络框架下的另一层封装,不是说 MQTT 自己就脱离 TCP 乱跑了。

所以对我们这个系列、这个项目来说,你可以先牢牢记住一句:

当前项目讲的是:基于 NBS 的 TCP 上 MQTT。

这句话先钉死,后面就不容易混。


八、自动化工程师最容易混淆的几个点

这一段我专门列出来,因为现场真的很常见。

容易混淆 1:把以太网和 TCP 当成同一个东西

不是一回事。

  • 以太网更偏底层链路
  • TCP 是传输层

它们不是同级概念。

容易混淆 2:把 Modbus TCP 和 TCP 当成同一个东西

也不是一回事。

你可以先这样理解:

  • TCP 是“路”
  • Modbus TCP 和 MQTT 都是“跑在这条路上的应用协议”

容易混淆 3:把 MQTT 和 Broker 当成一回事

也不对。

  • MQTT 是协议
  • Broker 是按这个协议提供服务的中间人

容易混淆 4:把“能 ping 通”理解成“MQTT 一定能通”

这个也错得很常见。

ping 通,只能说明:

  • IP 层基本可达

但 MQTT 能不能通,还取决于:

  • TCP 端口能不能连
  • Broker 在不在线
  • CONNECT / CONNACK 能不能走通

这也是为什么后面我们一定要把:

  • TCP
  • MQTT
  • 状态机

分开看。


九、MQTT 通信主线,其实先记住三件事就够了

后面我们会讲很多细节。
但在这一篇里,你先记住这三件事就够了。

1. 先连上

客户端先去找 Broker,打招呼,握手。

这一条链后面就是:

  • CONNECT
  • CONNACK

2. 再发消息

发消息的核心报文就是:

  • PUBLISH

如果 QoS 不同,还会带出不同确认链。

3. 再订阅消息

你想收什么,就先去登记:

  • SUBSCRIBE
  • UNSUBSCRIBE

说到底,MQTT 的业务主线并不复杂。
真正复杂的,是:

  • ACK 怎么走
  • 状态机怎么推进
  • 超时怎么处理
  • 高 QoS 怎么稳

而这些,正是后面几篇要狠狠干透的东西。


十、这套开源项目后面会重点讲什么

接下来的系列,不会走“空讲概念”的路线。
我会重点围绕下面这条主线讲:

也就是说,后面每一篇的重点都不是“背定义”,而是尽量把这四个问题讲清楚:

  1. 这个报文标准里怎么规定
  2. 抓包里长什么样
  3. 状态机为什么这样走
  4. ST 代码为什么这样写

这才是我认为最适合自动化工程师的讲法。


十一、这一篇你现在最该记住的结论

最后直接收口。

核心结论 1

MQTT 是应用层协议,不是传输层协议。

核心结论 2

我们这个项目当前讲的是:基于 NBS 的 TCP 上 MQTT。

核心结论 3

TCP 负责把字节可靠送达,MQTT 负责规定这些字节在业务上是什么意思。

核心结论 4

后面真正有技术含量的部分,不只是“会发包”,而是“报文、状态机、ACK、超时和代码结构怎么全对齐”。

如果这四条你现在脑子里已经有印象了,那这一篇就值了。


十二、下篇预告

下一篇,我们正式进入第一组核心报文:

CONNECT / CONNACK

也就是:

  • 客户端到底怎么和 Broker 打招呼
  • Broker 到底怎么回
  • 为什么 MQTT 连接不是 TCP 通了就结束

从下一篇开始,我们就会第一次正式把:

  • 报文结构
  • 十六进制
  • 状态机
  • ST 方法

这几层一起对上。


系列说明

这是一个基于开源项目 MqttClient_V1_0 的 MQTT 系列教程。
整套内容会围绕:

  • MQTT 报文
  • 十六进制拆解
  • 状态机
  • ST 代码实现
  • 工程排障

来展开。

如果你也在做:

  • CODESYS
  • PLC
  • 工业通信
  • MQTT 项目

欢迎先收藏这个系列。

如果这篇对你有帮助,也欢迎:

  • 点个收藏
  • 留个评论
  • 关注后续更新

下一篇,我们正式开始拆第一条握手链。 😄

完整 ST 代码

复制使用说明

  • 这部分给出的是与本篇主题直接对应的完整 ST 代码,不是零碎片段。
  • 如果你只是想先跑通,优先整段复制,不要只摘几行变量或几条赋值语句。
  • 如果是 METHOD,请确认它仍然属于 FB_MqttClient;如果是 PROGRAM,请确认相关 DUT、GVL、FB 已一并导入。

代码阅读重点

  • 先按 报文结构 -> 状态机入口 -> 关键变量 -> 返回结果 的顺序看。
  • 再把正文里的十六进制拆解和这里的字节写入、字节解析语句一行行对上。
  • 最后回到在线调试,重点盯 uiTxLengthuiRxLengtheStatexWaitingForAck 这类状态量。

完整代码 1:PRG_Test

  • 对应源码路径:10 MQTT/MqttClient_V1_0/Device/Application/MQTT/POUs/PRG_Test.st
  • 复制使用说明:这是最适合先复制试跑的测试程序,直接展示了外部工程如何调用 FB_MqttClient
  • 阅读重点:先看输入参数区,再看 fbMqttClient(...) 的整段实例调用,你会立刻明白 MQTT 功能块在工程里的落地方式。
/// =======================================================================
/// 名称      : PRG_Test
/// 功能      : MQTT 客户端测试程序
/// 说明      : 用于驱动 FB_MqttClient 进行连接、发布、订阅与接收测试
/// 编程人员  : ControlRookie
/// 时间      : 2026-05-05
/// 版本      : V1.0
/// =======================================================================
PROGRAM PRG_Test
VAR
	bInit                    : BOOL := TRUE;                                                    // 初始化执行标志
	fbMqttClient            : FB_MqttClient;                                                    // MQTT 客户端实例
	bEnable                  : BOOL := TRUE;                                                    // 使能客户端
	bConnect                 : BOOL;                                                            // 连接命令
	sBrokerIP                : STRING := '192.168.20.100';                                     // Broker IP 地址
	uiPort                   : UINT := 1883;                                                   // Broker 端口号
	sClientID                : STRING := 'CodeSys_PLC';                                        // 客户端标识符
	sUsername                : STRING := '';                                                   // 用户名
	sPassword                : STRING := '';                                                   // 密码
	eVersion                 : E_MqttVersion := E_MqttVersion.byMqttVersion311;                // MQTT 协议版本
	bCleanSession            : BOOL := TRUE;                                                   // 清理会话标志
	uiKeepAlive              : UINT := 60;                                                     // 心跳周期(秒)
	bUseSSL                  : BOOL := FALSE;                                                  // 是否启用 SSL
	bAutoReconnect           : BOOL := TRUE;                                                   // 是否自动重连
	uiReconnectDelay         : UINT := 5000;                                                   // 重连延时(毫秒)
	udiSessionExpiry         : UDINT := 0;                                                     // 会话过期间隔
	uiReceiveMax             : UINT := 65535;                                                  // 最大接收数量
	udMaxPacketSize          : UDINT := 4096;                                                  // 最大报文长度
	bWillFlag                : BOOL := TRUE;                                                   // 是否启用遗嘱消息
	sWillTopic               : STRING := 'CodeSys';                                            // 遗嘱主题
	sWillMessage             : STRING := 'CodeSys Offline';                                    // 遗嘱消息内容
	eWillQoS                 : E_MqttQoS := E_MqttQoS.byQoS1;                                 // 遗嘱消息 QoS
	bWillRetain              : BOOL := FALSE;                                                  // 遗嘱消息保留标志
	bPublish                 : BOOL;                                                            // 发布命令
	sPubTopic                : STRING := 'CodeSys';                                            // 发布主题
	sPubPayload              : STRING := 'CodeSys';                                            // 发布载荷
	ePubQoS                  : E_MqttQoS := E_MqttQoS.byQoS1;                                 // 发布 QoS
	bPubRetain               : BOOL := FALSE;                                                  // 发布保留标志
	bSubscribe               : BOOL;                                                            // 订阅命令
	bUnsubscribe             : BOOL;                                                            // 取消订阅命令
	sSubTopic                : STRING := 'CodeSys';                                            // 订阅主题
	sUnsubTopic              : STRING := 'CodeSys';                                            // 取消订阅主题
	eSubQoS                  : E_MqttQoS := E_MqttQoS.byQoS1;                                 // 订阅请求 QoS
	udiSubscriptionId        : UDINT := 0;                                                    // MQTT 5.0 订阅标识符
	eMqttState               : E_MqttState;                                                    // 客户端当前状态
	bIsConnected             : BOOL;                                                            // TCP 连接状态
	bMqttConnected           : BOOL;                                                            // MQTT 连接状态
	bError                   : BOOL;                                                            // 错误标志
	eErrorID                 : NBS.ERROR;                                                       // 错误码
	sDiagMsg                 : STRING;                                                          // 诊断信息
	aSubscriptions           : ARRAY[1..GVL_Mqtt.cnMaxSubscriptions] OF ST_MqttSubscription;  // 订阅列表
	uiSubscriptionCount      : UINT;                                                            // 订阅数量
	sRecTopic                : STRING;                                                          // 最新接收主题
	sRecPayload              : STRING;                                                          // 最新接收载荷
	aRecTopicList            : ARRAY[0..GVL_Mqtt.cnMaxHistory] OF STRING;                      // 接收主题历史
	aRecPayloadList          : ARRAY[0..GVL_Mqtt.cnMaxHistory] OF STRING;                      // 接收载荷历史
	byReceivedQoS            : BYTE;                                                            // 最新接收 QoS
	bReceivedRetain          : BOOL;                                                            // 最新接收保留标志
END_VAR

// === IMPLEMENTATION ===
	IF bInit THEN
	bInit := FALSE;
END_IF

/// MQTT客户端
fbMqttClient(
	bEnable                := bEnable,
	bConnect               := bConnect,
	sBrokerIP              := sBrokerIP,
	uiPort                 := uiPort,
	sClientID              := sClientID,
	sUsername              := sUsername,
	sPassword              := sPassword,
	eVersion               := eVersion,
	bCleanSession          := bCleanSession,
	uiKeepAlive            := uiKeepAlive,
	bUseSSL                := bUseSSL,
	bAutoReconnect         := bAutoReconnect,
	uiReconnectDelay       := uiReconnectDelay,
	udiSessionExpiry       := udiSessionExpiry,
	uiReceiveMax           := uiReceiveMax,
	udMaxPacketSize        := udMaxPacketSize,
	bWillFlag              := bWillFlag,
	sWillTopic             := sWillTopic,
	sWillMessage           := sWillMessage,
	eWillQoS               := eWillQoS,
	bWillRetain            := bWillRetain,
	bPublish               := bPublish,
	sPubTopic              := sPubTopic,
	sPubPayload            := sPubPayload,
	ePubQoS                := ePubQoS,
	bPubRetain             := bPubRetain,
	bSubscribe             := bSubscribe,
	sSubTopic              := sSubTopic,
	eSubQoS                := eSubQoS,
	udiSubscriptionId      := udiSubscriptionId,
	bUnsubscribe           := bUnsubscribe,
	sUnsubTopic            := sUnsubTopic,
	eState                 => eMqttState,
	bIsConnected           => bIsConnected,
	bMqttConnected         => bMqttConnected,
	bError                 => bError,
	eErrorID               => eErrorID,
	sDiagMsg               => sDiagMsg,
	aSubscriptions         => aSubscriptions,
	uiSubscriptionCount    => uiSubscriptionCount,
	sRecTopic              => sRecTopic,
	sRecPayload            => sRecPayload,
	aRecTopicList          => aRecTopicList,
	aRecPayloadList        => aRecPayloadList,
	byReceivedQoS          => byReceivedQoS,
	bReceivedRetain        => bReceivedRetain);

完整代码 2:PLC_PRG

  • 对应源码路径:10 MQTT/MqttClient_V1_0/Device/Application/PLC_PRG.st
  • 复制使用说明:这是最小运行入口,适合和 PRG_Test 配套复制。
  • 阅读重点:重点看 PRG_Test(); 这一层封装关系,理解示例工程是怎么把测试程序挂到主循环里的。
PROGRAM PLC_PRG
VAR
	bInit				: BOOL := TRUE;
	fbMqttClient		: FB_MqttClient;

	bEnable				: BOOL;
	sBrokerIP			: STRING := '192.168.20.100';
	//sBrokerIP			: STRING := '127.0.0.1';
	uiPort				: UINT := 1883;
	sUsername			: STRING := 'PLC';
	sPassword			: STRING := '123456';
	sWillTopic			: STRING := 'CodeSys';
	sWillMessage		: STRING := 'CodeSys is Offline';
	sWillRetain			: BOOL;
	bCleanSession		: BOOL := FALSE;
	uiQoS				: UINT := 0;
	wKeepAlive			: WORD := 0;
	bAutoReconnect		: BOOL := TRUE;
	sPayload			: STRING(255) := 'This is CodeSys';
	sTopicPublish		: STRING := 'CodeSys';
	sTopicSubscribe		: STRING := 'CodeSys';
	bRetain				: BOOL := TRUE;
	bPublish			: BOOL;
	bSubscribe			: BOOL;
	sClientID			: STRING := 'CodeSys PLC';
	sLastRecMsg			: STRING;
	sLastRecMsgTopic	: STRING;
	aLastRecMsgs		: ARRAY [0..24] OF STRING;
	aLastMsgTopics		: ARRAY [0..24] OF STRING;
	bMsgArrivedIn		: BOOL;
	sDiagMsg			: STRING;
	eState				: E_MqttState;
	bIsConnected		: BOOL;
	bError				: BOOL;
	eError				: NBS.ERROR;
END_VAR

// === IMPLEMENTATION ===
PRG_Test();

系列导航

  • 系列定位:第 1 篇
  • 上一篇:无,这是系列开篇
  • 下一篇:第2篇 CONNECT / CONNACK:十六进制和 ST 代码怎么对上

项目与资料

  • 开源项目名称:MqttClient_V1_0
  • GitHub 仓库关键字:ControlRookie MqttClient_V1_0
  • 配套资料说明:我已经同步整理 MQTT 标准、控制报文、十六进制拆解、状态机图和 ST 代码映射知识库,后续会持续补全和公开入口

适合谁收藏

  • 正在做 CODESYS / PLC / MQTT 项目的人
  • 想把 MQTT 从报文真正看到 ST 代码的人
  • 正在排查 QoS1 / QoS2 超时、掉线、重连问题的人
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐