Abstract

这一篇是项目复盘视角,重点讲这套开源客户端为什么开始做、为什么后来必须重构接收链和状态机、QoS1 / QoS2 为什么是成熟度分水岭,以及这个项目真正的长期价值到底在哪里。

前面几篇,我们一直站在“读者怎么学”的角度讲这套项目。

这一篇换个视角。
我们直接讲项目本身。

也就是:

这套开源 MQTT 客户端,到底是怎么从“想做一个免费替代方案”,一步一步长成现在这个样子的?

先给结论:

这不是一个“把几个报文拼出来”就结束的小项目。


一、最开始的动机其实很简单

这个项目的起点,并不复杂:

  1. CODESYS 官方 MQTT Client SL 要额外成本
  2. 很多自动化工程场景只是想快速接入 MQTT
  3. 工程师需要的是能看、能改、能排障的源码,不只是一个黑盒库

所以一开始的目标就很明确:

做一套真正能在 CODESYS / PLC 场景里落地的开源 MQTT 客户端。

这件事的价值不在“自己写轮子”这四个字。
而在于:

  • 降低使用门槛
  • 让更多人能学会 MQTT 实现
  • 给自动化工程师一个可控、可维护、可扩展的基础版本

二、第一阶段最容易陷入的误区:以为“能发包就差不多了”

很多协议类项目一开始都会经历这个阶段:

  • CONNECT 发出去了
  • Broker 回了
  • PUBLISH 也能跑

这时候很容易产生一个错觉:

差不多做完了。

但真进到 QoS1 / QoS2、高频、多客户端、订阅退订之后,问题就会全部冒出来。

也就是说:

MQTT 客户端最早做出来的,通常只是“演示能力”;


三、真正把项目难度拉上来的,是 QoS1 / QoS2

如果只做:

  • QoS0
  • 单主题
  • 低频

那很多实现都能“看起来没问题”。

但一旦进入:

  • QoS1
  • QoS2
  • 高频
  • 多客户端同主题 / 多主题

问题会非常集中地暴露出来。

核心原因我们前面讲过了:

  • ACK 链变长
  • inflight 必须存在
  • 超时重发必须可靠
  • 接收链和发送链不能互相堵

所以这个项目真正从“能跑”走向“能用”的分水岭,就是把 QoS1 / QoS2 高频稳定性彻底打通。


四、为什么后来必须重构接收链

这一点其实非常关键。

很多 MQTT demo 风格实现,会给每种入站报文都单独写一个处理方法,然后主流程一个个跳。

这类做法在简单场景能工作。
但高频和多帧粘连场景下,问题会很快出现:

  • ACK 处理不及时
  • 同一扫描周期里多个入站帧消化不完
  • 即时响应时序不顺

所以后来项目必须收敛成以:

  • M_ProcessReceive
  • M_ProcessPendingFrames

为核心的接收面处理逻辑。

这不是为了代码好看,而是为了把:

  • 多帧连续处理
  • 即时 ACK 优先发回
  • 等待状态推进

真正做顺。


五、为什么状态机必须越做越“硬”

刚开始做这类客户端时,很多动作会倾向于:

  • 哪里需要就哪里发
  • 哪里收到就哪里改状态

这种写法在小范围内没什么问题。
但随着功能和边界越来越多,状态机必须硬起来。

也就是:

  1. 每条协议链有明确状态段
  2. 每种 ACK 有明确等待关系
  3. 失败统一收口
  4. 重连逻辑统一入口

最终才会形成现在这类比较清晰的主骨架:

连接链
发布链
订阅链
心跳链
异常收口链

这一步其实是项目成熟度真正提升的标志。


六、为什么 Broker 差异一定要进设计,不是后面再说

很多人做协议实现时,会下意识假设:

只要我按标准写了,所有 Broker 表现就该一样。

理论上当然希望如此。
工程上不能这么想。

因为真实环境里,你会遇到:

  • PLC 内置 Broker
  • EMQX
  • Mosquitto
  • 各种不同客户端配合

它们在版本支持、属性支持、共享订阅、订阅标识符、Topic Alias、默认会话策略上,行为不一定完全一致。

所以这个项目后面必须把:

  • CONNACK 能力协商
  • Broker 支持边界
  • 运行时参数约束

都纳入设计。


七、为什么这个项目后面还会配一整套知识库

代码本身当然很重要。
但只放代码,很多人还是学不会。

因为真正难的是下面这条映射:

标准 -> 报文 -> 十六进制 -> 状态机 -> ST 代码

所以后面配套做知识库,本质上不是“顺便整理点文档”,而是在解决另一个更深的问题:

怎么让自动化工程师真正看懂这套源码,而不是只能拿去复制。

这也是为什么后面才会有:

  • 控制报文字段页
  • 十六进制拆解页
  • 状态机图
  • 握手时序图
  • ST 方法逐段讲解

八、这个项目最重要的经验,我觉得有 4 条

经验 1:先保证骨架正确,再追求特性丰富

没有稳定状态机和 ACK 链,再多 5.0 特性也是虚的。

经验 2:能高频稳定跑 QoS1 / QoS2,才是真的过线

QoS0 只能证明“基本能通信”。
高频 QoS1 / QoS2 才能说明实现真正闭环。

经验 3:Broker 差异不是噪音,是设计输入

尤其是 5.0,不能只看标准书写得多漂亮。

经验 4:源码要可读,不只是可运行

开源项目如果用户看不懂,那传播力和可维护性都会打折。


九、这套开源项目真正的价值,我认为不止是“省钱”

当然,免费替代官方收费库,本身就是非常现实的价值。
但它还不止这个。

我觉得更大的价值有 3 个:

1. 可控

出了问题,你能看源码,不是只能猜。

2. 可学

它不是黑盒,非常适合把 MQTT 实现真正学透。

3. 可扩展

后面无论你要继续加:

  • 更完整的 5.0 特性
  • 更复杂的会话恢复
  • 安全增强
  • 工程定制逻辑

都能在现有骨架上继续推进。


十、如果让我给这个项目定一个阶段性定位

我会这样说:

这不是一个“最花哨”的 MQTT 客户端。

这句话我觉得比单纯说“支持 3.1.1 和 5.0”更重要。

因为开源项目真正的生命力,很多时候不在功能表,而在:

  • 能不能被理解
  • 能不能被接手
  • 能不能继续成长

十一、这一篇你最该记住的 5 句话

  1. 这个项目最开始是为了解决现实成本问题,但后来真正拉开价值差距的是工程可控性。
  2. MQTT 客户端从“能跑”到“能用”,最难跨过去的是 QoS1 / QoS2 高频稳定性。
  3. 接收链重构、状态机硬化和 Broker 差异纳入设计,是项目成熟的关键节点。
  4. 开源项目真正有传播力,不只是因为免费,还因为它能让人学会、看懂、接手。
  5. 这套客户端的最大意义,不只是替代一个库,而是给自动化工程师一套真正能掌控的 MQTT 实现骨架。

十二、系列收官

到这里,这套系列教程的主线就全部收完了。

如果你从第一篇一路看到这里,理论上你已经能把下面这条链完整接起来:

这也是我最想做到的一件事:

不只是让大家“知道 MQTT”,

如果这套内容对你有帮助,也欢迎继续关注后续版本迭代和知识库扩展。
下一步,才是真正把这套东西用在更多项目里。


完整 ST 代码

复制使用说明

  • 这部分给出的是与本篇主题直接对应的完整 ST 代码,不是零碎片段。
  • 如果你只是想先跑通,优先整段复制,不要只摘几行变量或几条赋值语句。
  • 如果是 METHOD,请确认它仍然属于 FB_MqttClient;如果是 PROGRAM,请确认相关 DUT、GVL、FB 已一并导入。

代码阅读重点

  • 先按 报文结构 -> 状态机入口 -> 关键变量 -> 返回结果 的顺序看。
  • 再把正文里的十六进制拆解和这里的字节写入、字节解析语句一行行对上。
  • 最后回到在线调试,重点盯 uiTxLengthuiRxLengtheStatexWaitingForAck 这类状态量。

完整代码 1:FB_MqttClient

  • 对应源码路径:10 MQTT/MqttClient_V1_0/Device/Application/MQTT/POUs/MqttClient NBS/FB_MqttClient.st
  • 复制使用说明:这篇讲的是工程抽象过程,所以最适合直接给出主功能块全貌。
  • 阅读重点:重点看状态机、TCP 封装、收发报文方法、在途队列、错误管理是怎么被收敛到一个统一功能块里的。
/// =======================================================================
/// 名称      : FB_MqttClient
/// 功能      : MQTT 客户端(支持 MQTT 3.1.1 / MQTT 5.0)
/// 说明      : 实现 MQTT 连接、发布、订阅、接收、心跳与重连状态机
/// 编程人员  : ControlRookie
/// 时间      : 2026-01-10
/// 版本      : V2.1
/// =======================================================================
{attribute 'hide_all_locals'}
FUNCTION_BLOCK FB_MqttClient
VAR_INPUT
    // 连接配置
	bEnable                : BOOL := TRUE;                               // 使能客户端
	bConnect               : BOOL;                                       // 连接命令
	sBrokerIP              : STRING := '192.168.20.222';                // Broker IP 地址
	uiPort                 : UINT := 1883;                               // Broker 端口号
	sClientID              : STRING;                                     // 客户端标识符
	sUsername              : STRING;                                     // 用户名
	sPassword              : STRING;                                     // 密码

    // 协议版本
	eVersion               : E_MqttVersion := E_MqttVersion.byMqttVersion311; // MQTT 协议版本

    // 连接参数
	bCleanSession          : BOOL := TRUE;                               // 是否清理会话
	uiKeepAlive            : UINT := 60;                                 // 心跳周期(秒)
	bUseSSL                : BOOL := FALSE;                              // 是否启用 SSL
	bAutoReconnect         : BOOL := TRUE;                               // 是否自动重连
	uiReconnectDelay       : UINT := 5000;                               // 重连延时(毫秒)

    // MQTT 5.0 连接属性
	udiSessionExpiry       : UDINT := 0;                                 // 会话过期间隔
	uiReceiveMax           : UINT := 65535;                              // 最大接收数量
	udMaxPacketSize        : UDINT := 4096;                              // 最大报文长度
	bRequestResponseInfo   : BOOL := FALSE;                              // 是否请求响应信息
	bRequestProblemInfo    : BOOL := TRUE;                               // 是否请求问题信息

    // 遗嘱消息
	bWillFlag              : BOOL := FALSE;                              // 是否启用遗嘱消息
	bWillRetain            : BOOL := FALSE;                              // 遗嘱消息保留标志
	eWillQoS               : E_MqttQoS := E_MqttQoS.byQoS0;             // 遗嘱消息 QoS
	sWillTopic             : STRING;                                     // 遗嘱主题
	sWillMessage           : STRING;                                     // 遗嘱消息内容

    // 发布参数
	bPublish               : BOOL := FALSE;                              // 发布命令
	sPubTopic              : STRING := 'CodeSys';                        // 发布主题
	sPubPayload            : STRING := 'This is CodeSys';                // 发布载荷
	ePubQoS                : E_MqttQoS;                                  // 发布 QoS
	bPubRetain             : BOOL;                                       // 发布保留标志

    // 订阅参数
	bSubscribe             : BOOL := FALSE;                              // 订阅命令
	sSubTopic              : STRING := 'CodeSys';                        // 订阅主题
	eSubQoS                : E_MqttQoS;                                  // 订阅请求 QoS
	udiSubscriptionId      : UDINT;                                      // MQTT 5.0 订阅标识符(>0 时包含)
	bUnsubscribe           : BOOL;                                       // 取消订阅命令
	sUnsubTopic            : STRING := 'CodeSys';                        // 取消订阅主题
END_VAR
VAR_OUTPUT
    // 状态信息
	eState                 : E_MqttState;                                // 客户端当前状态
	bIsConnected           : BOOL;                                        // TCP 连接状态
	bMqttConnected         : BOOL;                                        // MQTT 协议连接状态
	bError                 : BOOL;                                        // 错误标志
	eErrorID               : NBS.ERROR;                                   // 错误码
	sDiagMsg               : STRING;                                      // 诊断信息

    // 订阅列表管理
	aSubscriptions         : ARRAY[1..GVL_Mqtt.cnMaxSubscriptions] OF ST_MqttSubscription; // 订阅列表
	uiSubscriptionCount    : UINT := 0;                                  // 当前订阅数量

    // 接收消息
	sRecTopic              : STRING;                                      // 最新接收主题
	sRecPayload            : STRING;                                      // 最新接收载荷
	aRecTopicList          : ARRAY[0..GVL_Mqtt.cnMaxHistory] OF STRING;   // 接收主题历史
	aRecPayloadList        : ARRAY[0..GVL_Mqtt.cnMaxHistory] OF STRING;   // 接收载荷历史
	byReceivedQoS          : BYTE;                                        // 最新接收消息 QoS
	bReceivedRetain        : BOOL;                                        // 最新接收消息保留标志
END_VAR
VAR
	rtrigConnect          : R_TRIG;                                      // 连接命令上升沿检测
	ftrigConnect          : F_TRIG;                                      // 连接命令下降沿检测
	rtrigPublish          : R_TRIG;                                      // 发布命令上升沿检测
	rtrigSubscribe        : R_TRIG;                                      // 订阅命令上升沿检测
	rtrigUnsubscribe      : R_TRIG;                                      // 取消订阅命令上升沿检测

	// 超时计时
	tTimeout                : TIME;                                       // 当前超时时间设定
	tonTimer                : TON;                                        // 通用超时定时器
	tonKeepAlive            : TON;                                        // 心跳定时器
	tonReconnect            : TON;                                        // 重连延时定时器
	eLastState              : E_MqttState;                                // 上一周期状态

	// TCP连接对象
	fbTcpClient              : NBS.TCP_Client;                             // TCP 客户端实例
	fbTcpRead                : NBS.TCP_Read;                               // TCP 读取实例
	fbTcpWrite               : NBS.TCP_Write;                              // TCP 写入实例
	hConnection              : NBS.CAA.HANDLE;                             // TCP 连接句柄
	bTcpConnect              : BOOL;                                       // TCP 连接使能标志
	bTcpRead                 : BOOL;                                       // TCP 读取使能标志
	bTcpWrite                : BOOL;                                       // TCP 写入使能标志
	bWriteDoneLatched        : BOOL;                                       // TCP 写入完成锁存标志
	bHasRead                 : BOOL;                                       // 本周期已读取完成标志
	bHasWritten              : BOOL;                                       // 本周期已写入完成标志
	udiBytesRead             : UDINT;                                      // 本次 TCP 读取字节数
	aTxBuf                   : ARRAY[0..GVL_Mqtt.cnSendBufferSize - 1] OF BYTE; // 发送缓冲区
	aRxBuf                   : ARRAY[0..GVL_Mqtt.cnRecvBufferSize - 1] OF BYTE; // 接收缓冲区
	uiTxLength               : UINT;                                       // 待发送字节数
	uiRxLength               : UINT;                                       // 已接收字节数

	// 系统时间
	stTimeZone               : Util.TimeZone := (iBias := 480);            // 时区设置
	uliSysTime               : ULINT;                                      // 当前系统时间

	// MQTT协议相关
	bDup                     : BOOL;                                       // DUP 重发标志
	uiPacketId               : UINT := 1;                                 // 下一个报文标识符
	uiQoS2PacketId           : UINT;                                       // QoS2 流程报文标识符
	uiExpectedPacketId       : UINT;                                       // 当前期待确认的报文标识符
	byExpectedMsgType        : BYTE;                                       // 当前期待的报文类型
	xWaitingForAck           : BOOL;                                       // 等待通用 ACK 标志
	xWaitingForSubAck        : BOOL;                                       // 等待 SUBACK 标志
	xWaitingForUnsubAck      : BOOL;                                       // 等待 UNSUBACK 标志
	bPingPending             : BOOL;                                       // 心跳响应等待标志
	uiSendQuota              : UINT := GVL_Mqtt.cnDefaultReceiveMax;       // MQTT 5.0 发送配额
	uiInflightCount          : UINT;                                       // 在途消息数量
	uiTopicAliasCount        : UINT;                                       // 主题别名数量
	uiNextTopicAlias         : UINT := 1;                                  // 下一个发送主题别名
	uiPendingSubPacketId     : UINT;                                       // 待确认订阅报文标识符
	uiPendingUnsubPacketId   : UINT;                                       // 待确认取消订阅报文标识符
	uiRetryInflightIndex     : UINT;                                       // 待重发在途消息索引
	uiRxInFlightQosCount     : UINT;                                       // 接收侧未完成 QoS>0 消息数量
	xPendingImmediateTx      : BOOL;                                       // 接收路径即时回包待发送标志
	aInflight                : ARRAY[1..GVL_Mqtt.cnMaxInflight] OF ST_MqttInflightMessage; // 出站在途队列
	aTopicAlias              : ARRAY[1..GVL_Mqtt.cnMaxTopicAlias] OF ST_MqttTopicAlias; // 主题别名表
	aRxQoS2PacketIds         : ARRAY[1..GVL_Mqtt.cnMaxInflight] OF UINT;   // 入站 QoS2 去重表

	// 事件标志
	xConnectedEvent          : BOOL;                                       // 连接成功事件
	xDisconnectedEvent       : BOOL;                                       // 断开连接事件
	xSubscribedEvent         : BOOL;                                       // 订阅成功事件
	xUnsubscribedEvent       : BOOL;                                       // 取消订阅成功事件
	xPublishedEvent          : BOOL;                                       // 发布成功事件
	bMessageReceived         : BOOL;                                       // 收到消息事件

	// 统计信息
	uiMessagesSent           : UDINT;                                      // 已发送消息数量
	uiMessagesReceived       : UDINT;                                      // 已接收消息数量
	dtLastMessageTime        : DATE_AND_TIME;                              // 最后消息时间

	// 重连管理
	uiReconnectAttempts      : UINT := 0;                                  // 已重连次数
	uiMaxReconnectAttempts   : UINT := 10;                                 // 最大重连次数

	// MQTT 5.0 服务器属性(CONNACK解析后存储)
	uiServerReceiveMax       : UINT := 65535;                              // 服务端最大接收数量
	byServerMaxQoS           : BYTE := 2;                                  // 服务端支持的最大 QoS
	bServerRetainAvailable   : BOOL := TRUE;                               // 服务端是否支持保留消息
	udServerMaxPacketSize    : UDINT := GVL_Mqtt.cnMaxPacketSize;          // 服务端允许的最大报文长度
	uiServerTopicAliasMax    : UINT := 0;                                  // 服务端允许的最大主题别名
	bServerWildcardSubAvail  : BOOL := TRUE;                               // 服务端是否支持通配符订阅
	bServerSubIdAvail        : BOOL := TRUE;                               // 服务端是否支持订阅标识符
	bServerSharedSubAvail    : BOOL := TRUE;                               // 服务端是否支持共享订阅

END_VAR

// === IMPLEMENTATION ===
IF NOT bEnable AND (eState <> E_MqttState.iDisconnected) THEN
    IF bMqttConnected AND bIsConnected THEN
        eState := E_MqttState.iDisconnect;
    END_IF

    IF NOT bMqttConnected AND bIsConnected THEN
        eState := E_MqttState.iTcpDisconnect;
    END_IF
    tTimeout := T#0S;

    xConnectedEvent := FALSE;
    xDisconnectedEvent := FALSE;
    xSubscribedEvent := FALSE;
    xPublishedEvent := FALSE;

	uiMessagesSent := 0;
	uiMessagesReceived := 0;
	uiPacketId := 0;
	uiQoS2PacketId := 0;
	uiExpectedPacketId := 0;
	uiPendingSubPacketId := 0;
	uiPendingUnsubPacketId := 0;
	byExpectedMsgType := 0;
	xWaitingForAck := FALSE;
	xWaitingForSubAck := FALSE;
	xWaitingForUnsubAck := FALSE;
	bPingPending := FALSE;
	uiRetryInflightIndex := 0;
	M_InflightClear();
	M_TopicAliasClear();
	M_SubListClear();
	THIS^.M_ResetError();

    RETURN;
END_IF

/// 系统时间
stTimeZone.iBias := 480;
uliSysTime := GetLocalDateTime(tzTimeZone := stTimeZone);

/// 边沿检测
rtrigConnect(CLK := bConnect);
ftrigConnect(CLK := bConnect);
rtrigPublish(CLK := bPublish);
rtrigSubscribe(CLK := bSubscribe);
rtrigUnsubscribe(CLK := bUnsubscribe);

/// 清除单次事件标志
xConnectedEvent := FALSE;
xDisconnectedEvent := FALSE;
xSubscribedEvent := FALSE;
xUnsubscribedEvent := FALSE;
xPublishedEvent := FALSE;
bMessageReceived := FALSE;

/// 核心状态机
CASE eState OF
    //=======================================================================
    // 禁用状态
    //=======================================================================
	E_MqttState.iDisconnected:
		bTcpRead := FALSE;
		bTcpWrite := FALSE;

		IF rtrigConnect.Q OR (bAutoReconnect AND (uiReconnectAttempts > 0)) THEN
			IF sBrokerIP <> '' AND sClientID <> '' THEN
				bError := FALSE;
				eErrorID := TO_INT(E_ReasonCode.uiErrNoError);
				sDiagMsg := '';
				eState := E_MqttState.iTcpConnect;
            ELSE
                M_SetError(TO_UINT(E_ReasonCode.uiErrInvalidParameter), 'Invalid IP or ClientId');
            END_IF
        END_IF

    //=======================================================================
    // TCP连接中
    //=======================================================================
    E_MqttState.iTcpConnect:
        bTcpConnect := TRUE;

        IF bIsConnected AND hConnection <> 0 THEN
            eState := E_MqttState.iConnect;
        ELSIF fbTcpClient.xError THEN
            M_SetError(TO_UINT(E_ReasonCode.uiErrTcpConnectFailed), CONCAT('TCP error: ', INT_TO_STRING(fbTcpClient.eError)));
            eState := E_MqttState.iTcpDisconnect;
        ELSE
            tTimeout := T#5S;
            IF tonTimer.Q THEN
                M_SetError(TO_UINT(E_ReasonCode.uiErrTimeout), 'TcpConnect timeout');
                eState := E_MqttState.iTcpDisconnect;
            END_IF
        END_IF

    //=======================================================================
    // 发送MQTT CONNECT报文
    //=======================================================================
    E_MqttState.iConnect:
        IF NOT bHasWritten THEN
            IF NOT M_BuildConnectPacket() THEN
                eState := E_MqttState.iTcpDisconnect;
            END_IF
        END_IF

        IF eState = E_MqttState.iConnect THEN
            bTcpWrite := TRUE;
        END_IF
        IF (eState = E_MqttState.iConnect) AND bHasWritten THEN
            bTcpWrite := FALSE;
            xWaitingForAck := TRUE;
            byExpectedMsgType := E_MqttPacketType.byConnAck;
            eState := E_MqttState.iConnAck;
        ELSIF (eState = E_MqttState.iConnect) AND fbTcpWrite.xError THEN
            M_SetError(TO_UINT(E_ReasonCode.uiErrTcpSendFailed), CONCAT('Send error: ', INT_TO_STRING(fbTcpWrite.eError)));
            eState := E_MqttState.iTcpDisconnect;
        ELSIF eState = E_MqttState.iConnect THEN
            tTimeout := T#2S;
            IF tonTimer.Q THEN
                M_SetError(TO_UINT(E_ReasonCode.uiErrTimeout), 'MQTT connection timeout');
                eState := E_MqttState.iTcpDisconnect;
            END_IF
        END_IF

    //=======================================================================
    // 等待CONNACK响应
    //=======================================================================
	E_MqttState.iConnAck:
		tTimeout := T#2S;

		IF (uiRxLength > 0) OR M_ReadIntoBuffer() THEN
			tTimeout := T#0S;
			IF M_HandleConnAck() THEN
                xConnectedEvent := TRUE;
                tonKeepAlive(IN := FALSE);
                uiReconnectAttempts := 0;
                eState := E_MqttState.iConnected;
            ELSE
                M_SetError(TO_UINT(E_ReasonCode.uiErrConnAckRefused), sDiagMsg);
                eState := E_MqttState.iTcpDisconnect;
            END_IF
        ELSIF fbTcpRead.xError THEN
            M_SetError(TO_UINT(E_ReasonCode.uiErrTcpReceiveFailed), CONCAT('Receive error: ', INT_TO_STRING(fbTcpRead.eError)));
            eState := E_MqttState.iTcpDisconnect;
        ELSIF tonTimer.Q THEN
            M_SetError(TO_UINT(E_ReasonCode.uiErrTimeout), 'ConnAck timeout');
            eState := E_MqttState.iTcpDisconnect;
        END_IF

    //=======================================================================
    // 已连接状态
    //=======================================================================
	E_MqttState.iConnected:
		IF NOT bIsConnected THEN
			M_SetError(TO_UINT(E_ReasonCode.uiErrTimeout), 'TCP disconnected');
			eState := E_MqttState.iTcpDisconnect;
		END_IF

		IF (eState = E_MqttState.iConnected) AND xPendingImmediateTx THEN
			bTcpWrite := TRUE;
			tTimeout := GVL_Mqtt.cnResponseTimeout;
			IF bHasWritten THEN
				bTcpWrite := FALSE;
				tTimeout := T#0S;
				IF uiRxInFlightQosCount > 0 THEN
					IF (uiTxLength > 0) AND ((aTxBuf[0] AND GVL_Mqtt.cnHdrTypeMask) = E_MqttPacketType.byPubAck) THEN
						uiRxInFlightQosCount := uiRxInFlightQosCount - 1;
					END_IF
				END_IF
				uiTxLength := 0;
				xPendingImmediateTx := FALSE;
			ELSIF fbTcpWrite.xError THEN
				bTcpWrite := FALSE;
				tTimeout := T#0S;
				xPendingImmediateTx := FALSE;
				M_SetError(TO_UINT(E_ReasonCode.uiErrTcpSendFailed), 'Immediate response send failed');
				eState := E_MqttState.iTcpDisconnect;
			ELSIF tonTimer.Q THEN
				bTcpWrite := FALSE;
				tTimeout := T#0S;
				xPendingImmediateTx := FALSE;
				M_SetError(TO_UINT(E_ReasonCode.uiErrTimeout), 'Immediate response timeout');
				eState := E_MqttState.iTcpDisconnect;
			END_IF
		ELSIF eState = E_MqttState.iConnected THEN
			tTimeout := T#0S;
		END_IF

		IF (eState = E_MqttState.iConnected) AND (NOT xPendingImmediateTx) THEN
			IF (uiRxLength > 0) OR M_ReadIntoBuffer() THEN
				tonKeepAlive(IN := FALSE);
				M_ProcessPendingFrames();
			END_IF
		END_IF

		IF (eState = E_MqttState.iConnected) AND (NOT xPendingImmediateTx) THEN
			uiRetryInflightIndex := M_InflightCheckTimeout();
			IF uiRetryInflightIndex > 0 THEN
				CASE aInflight[uiRetryInflightIndex].eState OF
					E_MqttInflightState.iPublishSent:
						ePubQoS := aInflight[uiRetryInflightIndex].eQoS;
						eState := E_MqttState.iPublish;
					E_MqttInflightState.iPubRelSent:
						uiQoS2PacketId := aInflight[uiRetryInflightIndex].uiPacketId;
						eState := E_MqttState.iPubRel;
				ELSE
						uiRetryInflightIndex := 0;
				END_CASE
			END_IF

			IF tonKeepAlive.Q THEN
				tonKeepAlive(IN := FALSE);
				eState := E_MqttState.iPingReq;
			END_IF

			IF (eState = E_MqttState.iConnected) AND rtrigPublish.Q AND NOT xWaitingForAck AND NOT xWaitingForSubAck AND NOT xWaitingForUnsubAck AND NOT bPingPending THEN
				tonKeepAlive(IN := FALSE);
				uiRetryInflightIndex := 0;
				eState := E_MqttState.iPublish;
			END_IF

			IF (eState = E_MqttState.iConnected) AND rtrigSubscribe.Q AND NOT xWaitingForAck AND NOT xWaitingForSubAck AND NOT xWaitingForUnsubAck AND NOT bPingPending THEN
				tonKeepAlive(IN := FALSE);
				eState := E_MqttState.iSubscribe;
			END_IF

			IF (eState = E_MqttState.iConnected) AND rtrigUnsubscribe.Q AND NOT xWaitingForAck AND NOT xWaitingForSubAck AND NOT xWaitingForUnsubAck AND NOT bPingPending THEN
				tonKeepAlive(IN := FALSE);
				eState := E_MqttState.iUnsubscribe;
			END_IF

			IF (eState = E_MqttState.iConnected) AND ftrigConnect.Q THEN
				eState := E_MqttState.iDisconnect;
			END_IF
		END_IF

    //=======================================================================
    // 心跳请求
    //=======================================================================
    E_MqttState.iPingReq:
		IF NOT bHasWritten THEN
			IF NOT M_BuildPingReqPacket() THEN
				eState := E_MqttState.iTcpDisconnect;
			END_IF
		END_IF

		IF eState = E_MqttState.iPingReq THEN
			bTcpWrite := TRUE;
		END_IF
		IF (eState = E_MqttState.iPingReq) AND bHasWritten THEN
			bTcpWrite := FALSE;
			bPingPending := TRUE;
			eState := E_MqttState.iPingResp;
		ELSIF (eState = E_MqttState.iPingReq) AND fbTcpWrite.xError THEN
			bTcpWrite := FALSE;
			M_SetError(TO_UINT(E_ReasonCode.uiErrTcpSendFailed), 'PingReq send failed');
			eState := E_MqttState.iTcpDisconnect;
		ELSIF eState = E_MqttState.iPingReq THEN
			tTimeout := T#2S;
			IF tonTimer.Q THEN
				M_SetError(TO_UINT(E_ReasonCode.uiErrTimeout), 'PingReq timeout');
				eState := E_MqttState.iTcpDisconnect;
			END_IF
		END_IF

	E_MqttState.iPingResp:
		tTimeout := T#2S;
		IF (uiRxLength > 0) OR M_ReadIntoBuffer() THEN
			IF M_ProcessPendingFrames() AND NOT bPingPending THEN
				tTimeout := T#0S;
				eState := E_MqttState.iConnected;
			END_IF
		ELSIF fbTcpRead.xError THEN
			M_SetError(TO_UINT(E_ReasonCode.uiErrTcpReceiveFailed), 'PingResp receive failed');
			eState := E_MqttState.iTcpDisconnect;
		ELSIF tonTimer.Q THEN
			bPingPending := FALSE;
			M_SetError(TO_UINT(E_ReasonCode.uiErrKeepAliveTimeout), 'PingResp timeout');
			eState := E_MqttState.iTcpDisconnect;
		END_IF

    //=======================================================================
    // 发布消息
    //=======================================================================
    E_MqttState.iPublish:
        IF NOT bHasWritten THEN
            IF NOT M_BuildPublishPacket() THEN
                IF NOT bError THEN
                    M_SetError(TO_UINT(E_ReasonCode.uiErrInvalidParameter), 'Build publish packet failed');
                END_IF
                uiRetryInflightIndex := 0;
                eState := E_MqttState.iConnected;
            END_IF
        END_IF

        IF eState = E_MqttState.iPublish THEN
            bTcpWrite := TRUE;
        END_IF
        IF (eState = E_MqttState.iPublish) AND bHasWritten THEN
            bTcpWrite := FALSE;
            uiMessagesSent := uiMessagesSent + 1;
            dtLastMessageTime := ULINT_TO_DT(uliSysTime / 1000);

            CASE ePubQoS OF
                E_MqttQoS.byQoS0:
                    xPublishedEvent := TRUE;
                    eState := E_MqttState.iConnected;
					uiRetryInflightIndex := 0;

                E_MqttQoS.byQoS1:
                    xWaitingForAck := TRUE;
                    byExpectedMsgType := E_MqttPacketType.byPubAck;
                    eState := E_MqttState.iPubAck;

                E_MqttQoS.byQoS2:
                    xWaitingForAck := TRUE;
                    byExpectedMsgType := E_MqttPacketType.byPubRec;
                    eState := E_MqttState.iPubRec;
            ELSE
                M_SetError(TO_UINT(E_ReasonCode.uiErrInvalidParameter), 'Invalid QoS level');
                eState := E_MqttState.iConnected;
            END_CASE
        ELSIF (eState = E_MqttState.iPublish) AND fbTcpWrite.xError THEN
            bTcpWrite := FALSE;
            M_SetError(TO_UINT(E_ReasonCode.uiErrTcpSendFailed), 'Publish send failed');
            eState := E_MqttState.iTcpDisconnect;
        ELSIF eState = E_MqttState.iPublish THEN
            tTimeout := GVL_Mqtt.cnPublishTimeout;
            IF tonTimer.Q THEN
                M_SetError(TO_UINT(E_ReasonCode.uiErrTimeout), 'MQTT Publish timeout');
                eState := E_MqttState.iTcpDisconnect;
            END_IF
        END_IF

    //=======================================================================
    // 等待PUBACK (QoS 1)
    //=======================================================================
    E_MqttState.iPubAck:
        tTimeout := T#2S;

		IF (uiRxLength > 0) OR M_ReadIntoBuffer() THEN
			tTimeout := T#0S;
			IF M_ProcessPendingFrames() AND NOT xWaitingForAck THEN
                tTimeout := T#0S;
                eState := E_MqttState.iConnected;
            END_IF
        ELSIF fbTcpRead.xError THEN
            M_SetError(TO_UINT(E_ReasonCode.uiErrTcpReceiveFailed), CONCAT('Receive error: ', INT_TO_STRING(fbTcpRead.eError)));
            eState := E_MqttState.iTcpDisconnect;
        ELSIF tonTimer.Q THEN
            M_SetError(TO_UINT(E_ReasonCode.uiErrTimeout), 'PubAck timeout');
            eState := E_MqttState.iTcpDisconnect;
        END_IF

    //=======================================================================
    // 发布收到 (QoS 2 - 步骤1)
    //=======================================================================
    E_MqttState.iPubRec:
        tTimeout := T#2S;
		IF (uiRxLength > 0) OR M_ReadIntoBuffer() THEN
			tTimeout := T#0S;
			IF M_ProcessPendingFrames() AND NOT xWaitingForAck THEN
                tTimeout := T#0S;
                eState := E_MqttState.iPubRel;
            END_IF
        ELSIF fbTcpRead.xError THEN
            M_SetError(TO_UINT(E_ReasonCode.uiErrTcpReceiveFailed), CONCAT('Receive error: ', INT_TO_STRING(fbTcpRead.eError)));
            eState := E_MqttState.iTcpDisconnect;
        ELSIF tonTimer.Q Then
            M_SetError(TO_UINT(E_ReasonCode.uiErrTimeout), 'PubRec timeout');
            eState := E_MqttState.iTcpDisconnect;
        END_IF

    //=======================================================================
    // 发布释放 (QoS 2 - 步骤2)
    //=======================================================================
    E_MqttState.iPubRel:
        IF NOT bHasWritten THEN
            IF NOT M_BuildPubRelPacket() THEN
                eState := E_MqttState.iTcpDisconnect;
            END_IF
        END_IF

        IF eState = E_MqttState.iPubRel THEN
            bTcpWrite := TRUE;
        END_IF
        IF (eState = E_MqttState.iPubRel) AND bHasWritten THEN
            bTcpWrite := FALSE;
            xWaitingForAck := TRUE;
            byExpectedMsgType := E_MqttPacketType.byPubComp;
            M_InflightUpdateState(
                uiPacketId := uiQoS2PacketId,
                eNewState := E_MqttInflightState.iPubRelSent);
            eState := E_MqttState.iPubComp;
        ELSIF (eState = E_MqttState.iPubRel) AND fbTcpWrite.xError THEN
            bTcpWrite := FALSE;
            M_SetError(TO_UINT(E_ReasonCode.uiErrTcpSendFailed), 'PubRel send failed');
            eState := E_MqttState.iTcpDisconnect;
        ELSIF eState = E_MqttState.iPubRel THEN
            tTimeout := T#2S;
            IF tonTimer.Q THEN
                M_SetError(TO_UINT(E_ReasonCode.uiErrTimeout), 'MQTT PubRel timeout');
                eState := E_MqttState.iTcpDisconnect;
            END_IF
        END_IF

    //=======================================================================
    // QoS 2 消息发布完成 (QoS 2 - 步骤3)
    //=======================================================================
    E_MqttState.iPubComp:
        tTimeout := T#2S;
		IF (uiRxLength > 0) OR M_ReadIntoBuffer() THEN
			tTimeout := T#0S;
			IF M_ProcessPendingFrames() AND NOT xWaitingForAck THEN
                tTimeout := T#0S;
                eState := E_MqttState.iConnected;
            END_IF
        ELSIF fbTcpRead.xError THEN
            M_SetError(TO_UINT(E_ReasonCode.uiErrTcpReceiveFailed), CONCAT('Receive error: ', INT_TO_STRING(fbTcpRead.eError)));
            eState := E_MqttState.iTcpDisconnect;
        ELSIF tonTimer.Q THEN
            M_SetError(TO_UINT(E_ReasonCode.uiErrTimeout), 'PubComp timeout');
            eState := E_MqttState.iTcpDisconnect;
        END_IF

    //=======================================================================
    // 客户端订阅请求
    //=======================================================================
    E_MqttState.iSubscribe:
        IF NOT bHasWritten THEN
            IF NOT M_BuildSubscribePacket() THEN
                eState := E_MqttState.iConnected;
            END_IF
        END_IF

        IF eState = E_MqttState.iSubscribe THEN
            bTcpWrite := TRUE;
        END_IF
        IF (eState = E_MqttState.iSubscribe) AND bHasWritten THEN
            bTcpWrite := FALSE;
            xWaitingForSubAck := TRUE;
            byExpectedMsgType := E_MqttPacketType.bySubAck;
            eState := E_MqttState.iSubAck;

        ELSIF (eState = E_MqttState.iSubscribe) AND fbTcpWrite.xError THEN
            bTcpWrite := FALSE;
            M_SetError(TO_UINT(E_ReasonCode.uiErrTcpSendFailed), 'Subscribe send failed');
            eState := E_MqttState.iTcpDisconnect;
        ELSIF eState = E_MqttState.iSubscribe THEN
            tTimeout := T#2S;
            IF tonTimer.Q Then
                M_SetError(TO_UINT(E_ReasonCode.uiErrTimeout), 'MQTT Subscribe timeout');
                eState := E_MqttState.iTcpDisconnect;
            END_IF
        END_IF

    E_MqttState.iSubAck:
        tTimeout := T#2S;
        IF (uiRxLength > 0) OR M_ReadIntoBuffer() THEN
            IF M_ProcessPendingFrames() AND NOT xWaitingForSubAck THEN
                tTimeout := T#0S;
                eState := E_MqttState.iConnected;
            END_IF
        ELSIF fbTcpRead.xError THEN
            M_SetError(TO_UINT(E_ReasonCode.uiErrTcpReceiveFailed), 'SubAck receive failed');
            eState := E_MqttState.iTcpDisconnect;
        ELSIF tonTimer.Q THEN
            xWaitingForSubAck := FALSE;
            M_SetError(TO_UINT(E_ReasonCode.uiErrTimeout), 'SubAck timeout');
            eState := E_MqttState.iTcpDisconnect;
        END_IF

    //=======================================================================
    // 客户端取消订阅请求
    //=======================================================================
    E_MqttState.iUnsubscribe:
        IF NOT bHasWritten THEN
            IF NOT M_BuildUnsubscribePacket() THEN
                eState := E_MqttState.iConnected;
            END_IF
        END_IF

        IF eState = E_MqttState.iUnsubscribe THEN
            bTcpWrite := TRUE;
        END_IF
        IF (eState = E_MqttState.iUnsubscribe) AND bHasWritten Then
            bTcpWrite := FALSE;
            xWaitingForUnsubAck := TRUE;
            byExpectedMsgType := E_MqttPacketType.byUnsubAck;
            eState := E_MqttState.iUnsubAck;

        ELSIF (eState = E_MqttState.iUnsubscribe) AND fbTcpWrite.xError THEN
            bTcpWrite := FALSE;
            M_SetError(TO_UINT(E_ReasonCode.uiErrTcpSendFailed), 'Unsubscribe send failed');
            eState := E_MqttState.iTcpDisconnect;
        ELSIF eState = E_MqttState.iUnsubscribe THEN
            tTimeout := T#2S;
            IF tonTimer.Q THEN
                M_SetError(TO_UINT(E_ReasonCode.uiErrTimeout), 'MQTT Unsubscribe timeout');
                eState := E_MqttState.iTcpDisconnect;
            END_IF
        END_IF

    E_MqttState.iUnsubAck:
        tTimeout := T#2S;
        IF (uiRxLength > 0) OR M_ReadIntoBuffer() THEN
            IF M_ProcessPendingFrames() AND NOT xWaitingForUnsubAck THEN
                tTimeout := T#0S;
                eState := E_MqttState.iConnected;
            END_IF
        ELSIF fbTcpRead.xError THEN
            M_SetError(TO_UINT(E_ReasonCode.uiErrTcpReceiveFailed), 'UnsubAck receive failed');
            eState := E_MqttState.iTcpDisconnect;
        ELSIF tonTimer.Q THEN
            xWaitingForUnsubAck := FALSE;
            M_SetError(TO_UINT(E_ReasonCode.uiErrTimeout), 'UnsubAck timeout');
            eState := E_MqttState.iTcpDisconnect;
        END_IF

    //=======================================================================
    // 客户端断开连接
    //=======================================================================
    E_MqttState.iDisconnect:
        IF NOT bHasWritten THEN
            IF NOT M_BuildDisconnectPacket() THEN
                eState := E_MqttState.iTcpDisconnect;
            END_IF
        END_IF

        IF eState = E_MqttState.iDisconnect THEN
            bTcpWrite := TRUE;
        END_IF
        IF (eState = E_MqttState.iDisconnect) AND bHasWritten THEN
            bTcpWrite := FALSE;
            eState := E_MqttState.iTcpDisconnect;

        ELSIF (eState = E_MqttState.iDisconnect) AND fbTcpWrite.xError THEN
            bTcpWrite := FALSE;
            M_SetError(TO_UINT(E_ReasonCode.uiErrTcpSendFailed), 'Disconnect send failed');
            eState := E_MqttState.iTcpDisconnect;
        ELSIF eState = E_MqttState.iDisconnect THEN
            tTimeout := T#2S;
            IF tonTimer.Q THEN
                M_SetError(TO_UINT(E_ReasonCode.uiErrTimeout), 'MQTT Disconnect timeout');
                eState := E_MqttState.iTcpDisconnect;
            END_IF
        END_IF

    //=======================================================================
    // TCP断开连接
    //=======================================================================
    E_MqttState.iTcpDisconnect:
        bTcpConnect := FALSE;
        bTcpRead := FALSE;
        bTcpWrite := FALSE;
        xDisconnectedEvent := TRUE;
        xWaitingForAck := FALSE;
        xWaitingForSubAck := FALSE;
        xWaitingForUnsubAck := FALSE;
        bPingPending := FALSE;
        xPendingImmediateTx := FALSE;
        uiPendingSubPacketId := 0;
        uiPendingUnsubPacketId := 0;
        uiRetryInflightIndex := 0;

        // 自动重连逻辑
        IF bAutoReconnect AND uiReconnectAttempts < uiMaxReconnectAttempts THEN
            tonReconnect(IN := TRUE, PT := UINT_TO_TIME(uiReconnectDelay));
            IF tonReconnect.Q THEN
                tonReconnect(IN := FALSE);
                uiReconnectAttempts := uiReconnectAttempts + 1;
                eState := E_MqttState.iDisconnected;
            END_IF
        ELSE
            tonReconnect(IN := FALSE);
            eState := E_MqttState.iDisconnected;
        END_IF
ELSE
        M_SetError(TO_UINT(E_ReasonCode.uiErrInvalidState), 'Invalid state');
        eState := E_MqttState.iDisconnected;
END_CASE

/// =======================================================================
/// 标志位
/// =======================================================================
bMqttConnected S= xConnectedEvent;
bMqttConnected R= xDisconnectedEvent;

/// =======================================================================
/// 超时计时器
/// =======================================================================
tonTimer(
    IN := (eLastState = eState) AND (tTimeout <> T#0S),
    PT := tTimeout);
eLastState := eState;

/// KeepAlive心跳(BUG-09修复:在所有连接状态下运行)
tonKeepAlive(
    IN := (uiKeepAlive <> 0) AND bIsConnected
          AND (eState <> E_MqttState.iDisconnected)
          AND (eState <> E_MqttState.iTcpConnect)
          AND (eState <> E_MqttState.iTcpDisconnect),
    PT := UINT_TO_TIME(uiKeepAlive * 1000));

/// =======================================================================
/// TCP
/// =======================================================================
M_TcpClient(
    bEnable := bTcpConnect,
    sIP := sBrokerIP,
    uiPortNum := uiPort,
    bIsConnected => bIsConnected);

M_TcpRead(
    bEnable := bTcpRead,
    pDataReceive := ADR(aRxBuf[uiRxLength]),
    udiDataSize := SIZEOF(aRxBuf) - uiRxLength,
    bDone => bHasRead,
    udiBytesRead => udiBytesRead);

M_TcpWrite(
    bExecute := bTcpWrite,
    pDataSend := ADR(aTxBuf),
    udiDataSize := uiTxLength,
    bDone => bHasWritten);

完整代码 2:PRG_Test

  • 对应源码路径:10 MQTT/MqttClient_V1_0/Device/Application/MQTT/POUs/PRG_Test.st
  • 复制使用说明:主功能块之外,再配一份测试程序,读者会更容易理解这套库是怎么从“库代码”落到“工程调用”的。
  • 阅读重点:重点看上层调用视角,理解为什么真正好用的库,不只是内部方法写对,还得让外部工程容易接。
/// =======================================================================
/// 名称      : PRG_Test
/// 功能      : MQTT 客户端测试程序
/// 说明      : 用于驱动 FB_MqttClient 进行连接、发布、订阅与接收测试
/// 编程人员  : ControlRookie
/// 时间      : 2026-05-05
/// 版本      : V1.0
/// =======================================================================
PROGRAM PRG_Test
VAR
	bInit                    : BOOL := TRUE;                                                    // 初始化执行标志
	fbMqttClient            : FB_MqttClient;                                                    // MQTT 客户端实例
	bEnable                  : BOOL := TRUE;                                                    // 使能客户端
	bConnect                 : BOOL;                                                            // 连接命令
	sBrokerIP                : STRING := '192.168.20.100';                                     // Broker IP 地址
	uiPort                   : UINT := 1883;                                                   // Broker 端口号
	sClientID                : STRING := 'CodeSys_PLC';                                        // 客户端标识符
	sUsername                : STRING := '';                                                   // 用户名
	sPassword                : STRING := '';                                                   // 密码
	eVersion                 : E_MqttVersion := E_MqttVersion.byMqttVersion311;                // MQTT 协议版本
	bCleanSession            : BOOL := TRUE;                                                   // 清理会话标志
	uiKeepAlive              : UINT := 60;                                                     // 心跳周期(秒)
	bUseSSL                  : BOOL := FALSE;                                                  // 是否启用 SSL
	bAutoReconnect           : BOOL := TRUE;                                                   // 是否自动重连
	uiReconnectDelay         : UINT := 5000;                                                   // 重连延时(毫秒)
	udiSessionExpiry         : UDINT := 0;                                                     // 会话过期间隔
	uiReceiveMax             : UINT := 65535;                                                  // 最大接收数量
	udMaxPacketSize          : UDINT := 4096;                                                  // 最大报文长度
	bWillFlag                : BOOL := TRUE;                                                   // 是否启用遗嘱消息
	sWillTopic               : STRING := 'CodeSys';                                            // 遗嘱主题
	sWillMessage             : STRING := 'CodeSys Offline';                                    // 遗嘱消息内容
	eWillQoS                 : E_MqttQoS := E_MqttQoS.byQoS1;                                 // 遗嘱消息 QoS
	bWillRetain              : BOOL := FALSE;                                                  // 遗嘱消息保留标志
	bPublish                 : BOOL;                                                            // 发布命令
	sPubTopic                : STRING := 'CodeSys';                                            // 发布主题
	sPubPayload              : STRING := 'CodeSys';                                            // 发布载荷
	ePubQoS                  : E_MqttQoS := E_MqttQoS.byQoS1;                                 // 发布 QoS
	bPubRetain               : BOOL := FALSE;                                                  // 发布保留标志
	bSubscribe               : BOOL;                                                            // 订阅命令
	bUnsubscribe             : BOOL;                                                            // 取消订阅命令
	sSubTopic                : STRING := 'CodeSys';                                            // 订阅主题
	sUnsubTopic              : STRING := 'CodeSys';                                            // 取消订阅主题
	eSubQoS                  : E_MqttQoS := E_MqttQoS.byQoS1;                                 // 订阅请求 QoS
	udiSubscriptionId        : UDINT := 0;                                                    // MQTT 5.0 订阅标识符
	eMqttState               : E_MqttState;                                                    // 客户端当前状态
	bIsConnected             : BOOL;                                                            // TCP 连接状态
	bMqttConnected           : BOOL;                                                            // MQTT 连接状态
	bError                   : BOOL;                                                            // 错误标志
	eErrorID                 : NBS.ERROR;                                                       // 错误码
	sDiagMsg                 : STRING;                                                          // 诊断信息
	aSubscriptions           : ARRAY[1..GVL_Mqtt.cnMaxSubscriptions] OF ST_MqttSubscription;  // 订阅列表
	uiSubscriptionCount      : UINT;                                                            // 订阅数量
	sRecTopic                : STRING;                                                          // 最新接收主题
	sRecPayload              : STRING;                                                          // 最新接收载荷
	aRecTopicList            : ARRAY[0..GVL_Mqtt.cnMaxHistory] OF STRING;                      // 接收主题历史
	aRecPayloadList          : ARRAY[0..GVL_Mqtt.cnMaxHistory] OF STRING;                      // 接收载荷历史
	byReceivedQoS            : BYTE;                                                            // 最新接收 QoS
	bReceivedRetain          : BOOL;                                                            // 最新接收保留标志
END_VAR

// === IMPLEMENTATION ===
	IF bInit THEN
	bInit := FALSE;
END_IF

/// MQTT客户端
fbMqttClient(
	bEnable                := bEnable,
	bConnect               := bConnect,
	sBrokerIP              := sBrokerIP,
	uiPort                 := uiPort,
	sClientID              := sClientID,
	sUsername              := sUsername,
	sPassword              := sPassword,
	eVersion               := eVersion,
	bCleanSession          := bCleanSession,
	uiKeepAlive            := uiKeepAlive,
	bUseSSL                := bUseSSL,
	bAutoReconnect         := bAutoReconnect,
	uiReconnectDelay       := uiReconnectDelay,
	udiSessionExpiry       := udiSessionExpiry,
	uiReceiveMax           := uiReceiveMax,
	udMaxPacketSize        := udMaxPacketSize,
	bWillFlag              := bWillFlag,
	sWillTopic             := sWillTopic,
	sWillMessage           := sWillMessage,
	eWillQoS               := eWillQoS,
	bWillRetain            := bWillRetain,
	bPublish               := bPublish,
	sPubTopic              := sPubTopic,
	sPubPayload            := sPubPayload,
	ePubQoS                := ePubQoS,
	bPubRetain             := bPubRetain,
	bSubscribe             := bSubscribe,
	sSubTopic              := sSubTopic,
	eSubQoS                := eSubQoS,
	udiSubscriptionId      := udiSubscriptionId,
	bUnsubscribe           := bUnsubscribe,
	sUnsubTopic            := sUnsubTopic,
	eState                 => eMqttState,
	bIsConnected           => bIsConnected,
	bMqttConnected         => bMqttConnected,
	bError                 => bError,
	eErrorID               => eErrorID,
	sDiagMsg               => sDiagMsg,
	aSubscriptions         => aSubscriptions,
	uiSubscriptionCount    => uiSubscriptionCount,
	sRecTopic              => sRecTopic,
	sRecPayload            => sRecPayload,
	aRecTopicList          => aRecTopicList,
	aRecPayloadList        => aRecPayloadList,
	byReceivedQoS          => byReceivedQoS,
	bReceivedRetain        => bReceivedRetain);

系列导航

  • 系列定位:加更篇 2
  • 上一篇:加更1 MQTT 5.0 到底值不值得上
  • 下一篇:系列收官,无下一篇

项目与资料

  • 开源项目名称:MqttClient_V1_0
  • GitHub 仓库关键字:ControlRookie MqttClient_V1_0
  • 配套资料说明:我已经同步整理 MQTT 标准、控制报文、十六进制拆解、状态机图和 ST 代码映射知识库,后续会持续补全和公开入口

适合谁收藏

  • 正在做 CODESYS / PLC / MQTT 项目的人
  • 想把 MQTT 从报文真正看到 ST 代码的人
  • 正在排查 QoS1 / QoS2 超时、掉线、重连问题的人
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐