加更2_这套开源MQTT客户端_我是怎么一步一步做出来的
Abstract
这一篇是项目复盘视角,重点讲这套开源客户端为什么开始做、为什么后来必须重构接收链和状态机、QoS1 / QoS2 为什么是成熟度分水岭,以及这个项目真正的长期价值到底在哪里。
前面几篇,我们一直站在“读者怎么学”的角度讲这套项目。
这一篇换个视角。
我们直接讲项目本身。
也就是:
这套开源 MQTT 客户端,到底是怎么从“想做一个免费替代方案”,一步一步长成现在这个样子的?
先给结论:
这不是一个“把几个报文拼出来”就结束的小项目。
一、最开始的动机其实很简单
这个项目的起点,并不复杂:
- CODESYS 官方 MQTT Client SL 要额外成本
- 很多自动化工程场景只是想快速接入 MQTT
- 工程师需要的是能看、能改、能排障的源码,不只是一个黑盒库
所以一开始的目标就很明确:
做一套真正能在 CODESYS / PLC 场景里落地的开源 MQTT 客户端。
这件事的价值不在“自己写轮子”这四个字。
而在于:
- 降低使用门槛
- 让更多人能学会 MQTT 实现
- 给自动化工程师一个可控、可维护、可扩展的基础版本
二、第一阶段最容易陷入的误区:以为“能发包就差不多了”
很多协议类项目一开始都会经历这个阶段:
- CONNECT 发出去了
- Broker 回了
- PUBLISH 也能跑
这时候很容易产生一个错觉:
差不多做完了。
但真进到 QoS1 / QoS2、高频、多客户端、订阅退订之后,问题就会全部冒出来。
也就是说:
MQTT 客户端最早做出来的,通常只是“演示能力”;
三、真正把项目难度拉上来的,是 QoS1 / QoS2
如果只做:
- QoS0
- 单主题
- 低频
那很多实现都能“看起来没问题”。
但一旦进入:
- QoS1
- QoS2
- 高频
- 多客户端同主题 / 多主题
问题会非常集中地暴露出来。
核心原因我们前面讲过了:
- ACK 链变长
- inflight 必须存在
- 超时重发必须可靠
- 接收链和发送链不能互相堵
所以这个项目真正从“能跑”走向“能用”的分水岭,就是把 QoS1 / QoS2 高频稳定性彻底打通。
四、为什么后来必须重构接收链
这一点其实非常关键。
很多 MQTT demo 风格实现,会给每种入站报文都单独写一个处理方法,然后主流程一个个跳。
这类做法在简单场景能工作。
但高频和多帧粘连场景下,问题会很快出现:
- ACK 处理不及时
- 同一扫描周期里多个入站帧消化不完
- 即时响应时序不顺
所以后来项目必须收敛成以:
M_ProcessReceiveM_ProcessPendingFrames
为核心的接收面处理逻辑。
这不是为了代码好看,而是为了把:
- 多帧连续处理
- 即时 ACK 优先发回
- 等待状态推进
真正做顺。
五、为什么状态机必须越做越“硬”
刚开始做这类客户端时,很多动作会倾向于:
- 哪里需要就哪里发
- 哪里收到就哪里改状态
这种写法在小范围内没什么问题。
但随着功能和边界越来越多,状态机必须硬起来。
也就是:
- 每条协议链有明确状态段
- 每种 ACK 有明确等待关系
- 失败统一收口
- 重连逻辑统一入口
最终才会形成现在这类比较清晰的主骨架:
连接链
发布链
订阅链
心跳链
异常收口链
这一步其实是项目成熟度真正提升的标志。
六、为什么 Broker 差异一定要进设计,不是后面再说
很多人做协议实现时,会下意识假设:
只要我按标准写了,所有 Broker 表现就该一样。
理论上当然希望如此。
工程上不能这么想。
因为真实环境里,你会遇到:
- PLC 内置 Broker
- EMQX
- Mosquitto
- 各种不同客户端配合
它们在版本支持、属性支持、共享订阅、订阅标识符、Topic Alias、默认会话策略上,行为不一定完全一致。
所以这个项目后面必须把:
CONNACK能力协商- Broker 支持边界
- 运行时参数约束
都纳入设计。
七、为什么这个项目后面还会配一整套知识库
代码本身当然很重要。
但只放代码,很多人还是学不会。
因为真正难的是下面这条映射:
标准 -> 报文 -> 十六进制 -> 状态机 -> ST 代码
所以后面配套做知识库,本质上不是“顺便整理点文档”,而是在解决另一个更深的问题:
怎么让自动化工程师真正看懂这套源码,而不是只能拿去复制。
这也是为什么后面才会有:
- 控制报文字段页
- 十六进制拆解页
- 状态机图
- 握手时序图
- ST 方法逐段讲解
八、这个项目最重要的经验,我觉得有 4 条
经验 1:先保证骨架正确,再追求特性丰富
没有稳定状态机和 ACK 链,再多 5.0 特性也是虚的。
经验 2:能高频稳定跑 QoS1 / QoS2,才是真的过线
QoS0 只能证明“基本能通信”。
高频 QoS1 / QoS2 才能说明实现真正闭环。
经验 3:Broker 差异不是噪音,是设计输入
尤其是 5.0,不能只看标准书写得多漂亮。
经验 4:源码要可读,不只是可运行
开源项目如果用户看不懂,那传播力和可维护性都会打折。
九、这套开源项目真正的价值,我认为不止是“省钱”
当然,免费替代官方收费库,本身就是非常现实的价值。
但它还不止这个。
我觉得更大的价值有 3 个:
1. 可控
出了问题,你能看源码,不是只能猜。
2. 可学
它不是黑盒,非常适合把 MQTT 实现真正学透。
3. 可扩展
后面无论你要继续加:
- 更完整的 5.0 特性
- 更复杂的会话恢复
- 安全增强
- 工程定制逻辑
都能在现有骨架上继续推进。
十、如果让我给这个项目定一个阶段性定位
我会这样说:
这不是一个“最花哨”的 MQTT 客户端。
这句话我觉得比单纯说“支持 3.1.1 和 5.0”更重要。
因为开源项目真正的生命力,很多时候不在功能表,而在:
- 能不能被理解
- 能不能被接手
- 能不能继续成长
十一、这一篇你最该记住的 5 句话
- 这个项目最开始是为了解决现实成本问题,但后来真正拉开价值差距的是工程可控性。
- MQTT 客户端从“能跑”到“能用”,最难跨过去的是 QoS1 / QoS2 高频稳定性。
- 接收链重构、状态机硬化和 Broker 差异纳入设计,是项目成熟的关键节点。
- 开源项目真正有传播力,不只是因为免费,还因为它能让人学会、看懂、接手。
- 这套客户端的最大意义,不只是替代一个库,而是给自动化工程师一套真正能掌控的 MQTT 实现骨架。
十二、系列收官
到这里,这套系列教程的主线就全部收完了。
如果你从第一篇一路看到这里,理论上你已经能把下面这条链完整接起来:

这也是我最想做到的一件事:
不只是让大家“知道 MQTT”,
如果这套内容对你有帮助,也欢迎继续关注后续版本迭代和知识库扩展。
下一步,才是真正把这套东西用在更多项目里。
完整 ST 代码
复制使用说明
- 这部分给出的是与本篇主题直接对应的完整 ST 代码,不是零碎片段。
- 如果你只是想先跑通,优先整段复制,不要只摘几行变量或几条赋值语句。
- 如果是
METHOD,请确认它仍然属于FB_MqttClient;如果是PROGRAM,请确认相关 DUT、GVL、FB 已一并导入。
代码阅读重点
- 先按
报文结构 -> 状态机入口 -> 关键变量 -> 返回结果的顺序看。 - 再把正文里的十六进制拆解和这里的字节写入、字节解析语句一行行对上。
- 最后回到在线调试,重点盯
uiTxLength、uiRxLength、eState、xWaitingForAck这类状态量。
完整代码 1:FB_MqttClient
- 对应源码路径:
10 MQTT/MqttClient_V1_0/Device/Application/MQTT/POUs/MqttClient NBS/FB_MqttClient.st - 复制使用说明:这篇讲的是工程抽象过程,所以最适合直接给出主功能块全貌。
- 阅读重点:重点看状态机、TCP 封装、收发报文方法、在途队列、错误管理是怎么被收敛到一个统一功能块里的。
/// =======================================================================
/// 名称 : FB_MqttClient
/// 功能 : MQTT 客户端(支持 MQTT 3.1.1 / MQTT 5.0)
/// 说明 : 实现 MQTT 连接、发布、订阅、接收、心跳与重连状态机
/// 编程人员 : ControlRookie
/// 时间 : 2026-01-10
/// 版本 : V2.1
/// =======================================================================
{attribute 'hide_all_locals'}
FUNCTION_BLOCK FB_MqttClient
VAR_INPUT
// 连接配置
bEnable : BOOL := TRUE; // 使能客户端
bConnect : BOOL; // 连接命令
sBrokerIP : STRING := '192.168.20.222'; // Broker IP 地址
uiPort : UINT := 1883; // Broker 端口号
sClientID : STRING; // 客户端标识符
sUsername : STRING; // 用户名
sPassword : STRING; // 密码
// 协议版本
eVersion : E_MqttVersion := E_MqttVersion.byMqttVersion311; // MQTT 协议版本
// 连接参数
bCleanSession : BOOL := TRUE; // 是否清理会话
uiKeepAlive : UINT := 60; // 心跳周期(秒)
bUseSSL : BOOL := FALSE; // 是否启用 SSL
bAutoReconnect : BOOL := TRUE; // 是否自动重连
uiReconnectDelay : UINT := 5000; // 重连延时(毫秒)
// MQTT 5.0 连接属性
udiSessionExpiry : UDINT := 0; // 会话过期间隔
uiReceiveMax : UINT := 65535; // 最大接收数量
udMaxPacketSize : UDINT := 4096; // 最大报文长度
bRequestResponseInfo : BOOL := FALSE; // 是否请求响应信息
bRequestProblemInfo : BOOL := TRUE; // 是否请求问题信息
// 遗嘱消息
bWillFlag : BOOL := FALSE; // 是否启用遗嘱消息
bWillRetain : BOOL := FALSE; // 遗嘱消息保留标志
eWillQoS : E_MqttQoS := E_MqttQoS.byQoS0; // 遗嘱消息 QoS
sWillTopic : STRING; // 遗嘱主题
sWillMessage : STRING; // 遗嘱消息内容
// 发布参数
bPublish : BOOL := FALSE; // 发布命令
sPubTopic : STRING := 'CodeSys'; // 发布主题
sPubPayload : STRING := 'This is CodeSys'; // 发布载荷
ePubQoS : E_MqttQoS; // 发布 QoS
bPubRetain : BOOL; // 发布保留标志
// 订阅参数
bSubscribe : BOOL := FALSE; // 订阅命令
sSubTopic : STRING := 'CodeSys'; // 订阅主题
eSubQoS : E_MqttQoS; // 订阅请求 QoS
udiSubscriptionId : UDINT; // MQTT 5.0 订阅标识符(>0 时包含)
bUnsubscribe : BOOL; // 取消订阅命令
sUnsubTopic : STRING := 'CodeSys'; // 取消订阅主题
END_VAR
VAR_OUTPUT
// 状态信息
eState : E_MqttState; // 客户端当前状态
bIsConnected : BOOL; // TCP 连接状态
bMqttConnected : BOOL; // MQTT 协议连接状态
bError : BOOL; // 错误标志
eErrorID : NBS.ERROR; // 错误码
sDiagMsg : STRING; // 诊断信息
// 订阅列表管理
aSubscriptions : ARRAY[1..GVL_Mqtt.cnMaxSubscriptions] OF ST_MqttSubscription; // 订阅列表
uiSubscriptionCount : UINT := 0; // 当前订阅数量
// 接收消息
sRecTopic : STRING; // 最新接收主题
sRecPayload : STRING; // 最新接收载荷
aRecTopicList : ARRAY[0..GVL_Mqtt.cnMaxHistory] OF STRING; // 接收主题历史
aRecPayloadList : ARRAY[0..GVL_Mqtt.cnMaxHistory] OF STRING; // 接收载荷历史
byReceivedQoS : BYTE; // 最新接收消息 QoS
bReceivedRetain : BOOL; // 最新接收消息保留标志
END_VAR
VAR
rtrigConnect : R_TRIG; // 连接命令上升沿检测
ftrigConnect : F_TRIG; // 连接命令下降沿检测
rtrigPublish : R_TRIG; // 发布命令上升沿检测
rtrigSubscribe : R_TRIG; // 订阅命令上升沿检测
rtrigUnsubscribe : R_TRIG; // 取消订阅命令上升沿检测
// 超时计时
tTimeout : TIME; // 当前超时时间设定
tonTimer : TON; // 通用超时定时器
tonKeepAlive : TON; // 心跳定时器
tonReconnect : TON; // 重连延时定时器
eLastState : E_MqttState; // 上一周期状态
// TCP连接对象
fbTcpClient : NBS.TCP_Client; // TCP 客户端实例
fbTcpRead : NBS.TCP_Read; // TCP 读取实例
fbTcpWrite : NBS.TCP_Write; // TCP 写入实例
hConnection : NBS.CAA.HANDLE; // TCP 连接句柄
bTcpConnect : BOOL; // TCP 连接使能标志
bTcpRead : BOOL; // TCP 读取使能标志
bTcpWrite : BOOL; // TCP 写入使能标志
bWriteDoneLatched : BOOL; // TCP 写入完成锁存标志
bHasRead : BOOL; // 本周期已读取完成标志
bHasWritten : BOOL; // 本周期已写入完成标志
udiBytesRead : UDINT; // 本次 TCP 读取字节数
aTxBuf : ARRAY[0..GVL_Mqtt.cnSendBufferSize - 1] OF BYTE; // 发送缓冲区
aRxBuf : ARRAY[0..GVL_Mqtt.cnRecvBufferSize - 1] OF BYTE; // 接收缓冲区
uiTxLength : UINT; // 待发送字节数
uiRxLength : UINT; // 已接收字节数
// 系统时间
stTimeZone : Util.TimeZone := (iBias := 480); // 时区设置
uliSysTime : ULINT; // 当前系统时间
// MQTT协议相关
bDup : BOOL; // DUP 重发标志
uiPacketId : UINT := 1; // 下一个报文标识符
uiQoS2PacketId : UINT; // QoS2 流程报文标识符
uiExpectedPacketId : UINT; // 当前期待确认的报文标识符
byExpectedMsgType : BYTE; // 当前期待的报文类型
xWaitingForAck : BOOL; // 等待通用 ACK 标志
xWaitingForSubAck : BOOL; // 等待 SUBACK 标志
xWaitingForUnsubAck : BOOL; // 等待 UNSUBACK 标志
bPingPending : BOOL; // 心跳响应等待标志
uiSendQuota : UINT := GVL_Mqtt.cnDefaultReceiveMax; // MQTT 5.0 发送配额
uiInflightCount : UINT; // 在途消息数量
uiTopicAliasCount : UINT; // 主题别名数量
uiNextTopicAlias : UINT := 1; // 下一个发送主题别名
uiPendingSubPacketId : UINT; // 待确认订阅报文标识符
uiPendingUnsubPacketId : UINT; // 待确认取消订阅报文标识符
uiRetryInflightIndex : UINT; // 待重发在途消息索引
uiRxInFlightQosCount : UINT; // 接收侧未完成 QoS>0 消息数量
xPendingImmediateTx : BOOL; // 接收路径即时回包待发送标志
aInflight : ARRAY[1..GVL_Mqtt.cnMaxInflight] OF ST_MqttInflightMessage; // 出站在途队列
aTopicAlias : ARRAY[1..GVL_Mqtt.cnMaxTopicAlias] OF ST_MqttTopicAlias; // 主题别名表
aRxQoS2PacketIds : ARRAY[1..GVL_Mqtt.cnMaxInflight] OF UINT; // 入站 QoS2 去重表
// 事件标志
xConnectedEvent : BOOL; // 连接成功事件
xDisconnectedEvent : BOOL; // 断开连接事件
xSubscribedEvent : BOOL; // 订阅成功事件
xUnsubscribedEvent : BOOL; // 取消订阅成功事件
xPublishedEvent : BOOL; // 发布成功事件
bMessageReceived : BOOL; // 收到消息事件
// 统计信息
uiMessagesSent : UDINT; // 已发送消息数量
uiMessagesReceived : UDINT; // 已接收消息数量
dtLastMessageTime : DATE_AND_TIME; // 最后消息时间
// 重连管理
uiReconnectAttempts : UINT := 0; // 已重连次数
uiMaxReconnectAttempts : UINT := 10; // 最大重连次数
// MQTT 5.0 服务器属性(CONNACK解析后存储)
uiServerReceiveMax : UINT := 65535; // 服务端最大接收数量
byServerMaxQoS : BYTE := 2; // 服务端支持的最大 QoS
bServerRetainAvailable : BOOL := TRUE; // 服务端是否支持保留消息
udServerMaxPacketSize : UDINT := GVL_Mqtt.cnMaxPacketSize; // 服务端允许的最大报文长度
uiServerTopicAliasMax : UINT := 0; // 服务端允许的最大主题别名
bServerWildcardSubAvail : BOOL := TRUE; // 服务端是否支持通配符订阅
bServerSubIdAvail : BOOL := TRUE; // 服务端是否支持订阅标识符
bServerSharedSubAvail : BOOL := TRUE; // 服务端是否支持共享订阅
END_VAR
// === IMPLEMENTATION ===
IF NOT bEnable AND (eState <> E_MqttState.iDisconnected) THEN
IF bMqttConnected AND bIsConnected THEN
eState := E_MqttState.iDisconnect;
END_IF
IF NOT bMqttConnected AND bIsConnected THEN
eState := E_MqttState.iTcpDisconnect;
END_IF
tTimeout := T#0S;
xConnectedEvent := FALSE;
xDisconnectedEvent := FALSE;
xSubscribedEvent := FALSE;
xPublishedEvent := FALSE;
uiMessagesSent := 0;
uiMessagesReceived := 0;
uiPacketId := 0;
uiQoS2PacketId := 0;
uiExpectedPacketId := 0;
uiPendingSubPacketId := 0;
uiPendingUnsubPacketId := 0;
byExpectedMsgType := 0;
xWaitingForAck := FALSE;
xWaitingForSubAck := FALSE;
xWaitingForUnsubAck := FALSE;
bPingPending := FALSE;
uiRetryInflightIndex := 0;
M_InflightClear();
M_TopicAliasClear();
M_SubListClear();
THIS^.M_ResetError();
RETURN;
END_IF
/// 系统时间
stTimeZone.iBias := 480;
uliSysTime := GetLocalDateTime(tzTimeZone := stTimeZone);
/// 边沿检测
rtrigConnect(CLK := bConnect);
ftrigConnect(CLK := bConnect);
rtrigPublish(CLK := bPublish);
rtrigSubscribe(CLK := bSubscribe);
rtrigUnsubscribe(CLK := bUnsubscribe);
/// 清除单次事件标志
xConnectedEvent := FALSE;
xDisconnectedEvent := FALSE;
xSubscribedEvent := FALSE;
xUnsubscribedEvent := FALSE;
xPublishedEvent := FALSE;
bMessageReceived := FALSE;
/// 核心状态机
CASE eState OF
//=======================================================================
// 禁用状态
//=======================================================================
E_MqttState.iDisconnected:
bTcpRead := FALSE;
bTcpWrite := FALSE;
IF rtrigConnect.Q OR (bAutoReconnect AND (uiReconnectAttempts > 0)) THEN
IF sBrokerIP <> '' AND sClientID <> '' THEN
bError := FALSE;
eErrorID := TO_INT(E_ReasonCode.uiErrNoError);
sDiagMsg := '';
eState := E_MqttState.iTcpConnect;
ELSE
M_SetError(TO_UINT(E_ReasonCode.uiErrInvalidParameter), 'Invalid IP or ClientId');
END_IF
END_IF
//=======================================================================
// TCP连接中
//=======================================================================
E_MqttState.iTcpConnect:
bTcpConnect := TRUE;
IF bIsConnected AND hConnection <> 0 THEN
eState := E_MqttState.iConnect;
ELSIF fbTcpClient.xError THEN
M_SetError(TO_UINT(E_ReasonCode.uiErrTcpConnectFailed), CONCAT('TCP error: ', INT_TO_STRING(fbTcpClient.eError)));
eState := E_MqttState.iTcpDisconnect;
ELSE
tTimeout := T#5S;
IF tonTimer.Q THEN
M_SetError(TO_UINT(E_ReasonCode.uiErrTimeout), 'TcpConnect timeout');
eState := E_MqttState.iTcpDisconnect;
END_IF
END_IF
//=======================================================================
// 发送MQTT CONNECT报文
//=======================================================================
E_MqttState.iConnect:
IF NOT bHasWritten THEN
IF NOT M_BuildConnectPacket() THEN
eState := E_MqttState.iTcpDisconnect;
END_IF
END_IF
IF eState = E_MqttState.iConnect THEN
bTcpWrite := TRUE;
END_IF
IF (eState = E_MqttState.iConnect) AND bHasWritten THEN
bTcpWrite := FALSE;
xWaitingForAck := TRUE;
byExpectedMsgType := E_MqttPacketType.byConnAck;
eState := E_MqttState.iConnAck;
ELSIF (eState = E_MqttState.iConnect) AND fbTcpWrite.xError THEN
M_SetError(TO_UINT(E_ReasonCode.uiErrTcpSendFailed), CONCAT('Send error: ', INT_TO_STRING(fbTcpWrite.eError)));
eState := E_MqttState.iTcpDisconnect;
ELSIF eState = E_MqttState.iConnect THEN
tTimeout := T#2S;
IF tonTimer.Q THEN
M_SetError(TO_UINT(E_ReasonCode.uiErrTimeout), 'MQTT connection timeout');
eState := E_MqttState.iTcpDisconnect;
END_IF
END_IF
//=======================================================================
// 等待CONNACK响应
//=======================================================================
E_MqttState.iConnAck:
tTimeout := T#2S;
IF (uiRxLength > 0) OR M_ReadIntoBuffer() THEN
tTimeout := T#0S;
IF M_HandleConnAck() THEN
xConnectedEvent := TRUE;
tonKeepAlive(IN := FALSE);
uiReconnectAttempts := 0;
eState := E_MqttState.iConnected;
ELSE
M_SetError(TO_UINT(E_ReasonCode.uiErrConnAckRefused), sDiagMsg);
eState := E_MqttState.iTcpDisconnect;
END_IF
ELSIF fbTcpRead.xError THEN
M_SetError(TO_UINT(E_ReasonCode.uiErrTcpReceiveFailed), CONCAT('Receive error: ', INT_TO_STRING(fbTcpRead.eError)));
eState := E_MqttState.iTcpDisconnect;
ELSIF tonTimer.Q THEN
M_SetError(TO_UINT(E_ReasonCode.uiErrTimeout), 'ConnAck timeout');
eState := E_MqttState.iTcpDisconnect;
END_IF
//=======================================================================
// 已连接状态
//=======================================================================
E_MqttState.iConnected:
IF NOT bIsConnected THEN
M_SetError(TO_UINT(E_ReasonCode.uiErrTimeout), 'TCP disconnected');
eState := E_MqttState.iTcpDisconnect;
END_IF
IF (eState = E_MqttState.iConnected) AND xPendingImmediateTx THEN
bTcpWrite := TRUE;
tTimeout := GVL_Mqtt.cnResponseTimeout;
IF bHasWritten THEN
bTcpWrite := FALSE;
tTimeout := T#0S;
IF uiRxInFlightQosCount > 0 THEN
IF (uiTxLength > 0) AND ((aTxBuf[0] AND GVL_Mqtt.cnHdrTypeMask) = E_MqttPacketType.byPubAck) THEN
uiRxInFlightQosCount := uiRxInFlightQosCount - 1;
END_IF
END_IF
uiTxLength := 0;
xPendingImmediateTx := FALSE;
ELSIF fbTcpWrite.xError THEN
bTcpWrite := FALSE;
tTimeout := T#0S;
xPendingImmediateTx := FALSE;
M_SetError(TO_UINT(E_ReasonCode.uiErrTcpSendFailed), 'Immediate response send failed');
eState := E_MqttState.iTcpDisconnect;
ELSIF tonTimer.Q THEN
bTcpWrite := FALSE;
tTimeout := T#0S;
xPendingImmediateTx := FALSE;
M_SetError(TO_UINT(E_ReasonCode.uiErrTimeout), 'Immediate response timeout');
eState := E_MqttState.iTcpDisconnect;
END_IF
ELSIF eState = E_MqttState.iConnected THEN
tTimeout := T#0S;
END_IF
IF (eState = E_MqttState.iConnected) AND (NOT xPendingImmediateTx) THEN
IF (uiRxLength > 0) OR M_ReadIntoBuffer() THEN
tonKeepAlive(IN := FALSE);
M_ProcessPendingFrames();
END_IF
END_IF
IF (eState = E_MqttState.iConnected) AND (NOT xPendingImmediateTx) THEN
uiRetryInflightIndex := M_InflightCheckTimeout();
IF uiRetryInflightIndex > 0 THEN
CASE aInflight[uiRetryInflightIndex].eState OF
E_MqttInflightState.iPublishSent:
ePubQoS := aInflight[uiRetryInflightIndex].eQoS;
eState := E_MqttState.iPublish;
E_MqttInflightState.iPubRelSent:
uiQoS2PacketId := aInflight[uiRetryInflightIndex].uiPacketId;
eState := E_MqttState.iPubRel;
ELSE
uiRetryInflightIndex := 0;
END_CASE
END_IF
IF tonKeepAlive.Q THEN
tonKeepAlive(IN := FALSE);
eState := E_MqttState.iPingReq;
END_IF
IF (eState = E_MqttState.iConnected) AND rtrigPublish.Q AND NOT xWaitingForAck AND NOT xWaitingForSubAck AND NOT xWaitingForUnsubAck AND NOT bPingPending THEN
tonKeepAlive(IN := FALSE);
uiRetryInflightIndex := 0;
eState := E_MqttState.iPublish;
END_IF
IF (eState = E_MqttState.iConnected) AND rtrigSubscribe.Q AND NOT xWaitingForAck AND NOT xWaitingForSubAck AND NOT xWaitingForUnsubAck AND NOT bPingPending THEN
tonKeepAlive(IN := FALSE);
eState := E_MqttState.iSubscribe;
END_IF
IF (eState = E_MqttState.iConnected) AND rtrigUnsubscribe.Q AND NOT xWaitingForAck AND NOT xWaitingForSubAck AND NOT xWaitingForUnsubAck AND NOT bPingPending THEN
tonKeepAlive(IN := FALSE);
eState := E_MqttState.iUnsubscribe;
END_IF
IF (eState = E_MqttState.iConnected) AND ftrigConnect.Q THEN
eState := E_MqttState.iDisconnect;
END_IF
END_IF
//=======================================================================
// 心跳请求
//=======================================================================
E_MqttState.iPingReq:
IF NOT bHasWritten THEN
IF NOT M_BuildPingReqPacket() THEN
eState := E_MqttState.iTcpDisconnect;
END_IF
END_IF
IF eState = E_MqttState.iPingReq THEN
bTcpWrite := TRUE;
END_IF
IF (eState = E_MqttState.iPingReq) AND bHasWritten THEN
bTcpWrite := FALSE;
bPingPending := TRUE;
eState := E_MqttState.iPingResp;
ELSIF (eState = E_MqttState.iPingReq) AND fbTcpWrite.xError THEN
bTcpWrite := FALSE;
M_SetError(TO_UINT(E_ReasonCode.uiErrTcpSendFailed), 'PingReq send failed');
eState := E_MqttState.iTcpDisconnect;
ELSIF eState = E_MqttState.iPingReq THEN
tTimeout := T#2S;
IF tonTimer.Q THEN
M_SetError(TO_UINT(E_ReasonCode.uiErrTimeout), 'PingReq timeout');
eState := E_MqttState.iTcpDisconnect;
END_IF
END_IF
E_MqttState.iPingResp:
tTimeout := T#2S;
IF (uiRxLength > 0) OR M_ReadIntoBuffer() THEN
IF M_ProcessPendingFrames() AND NOT bPingPending THEN
tTimeout := T#0S;
eState := E_MqttState.iConnected;
END_IF
ELSIF fbTcpRead.xError THEN
M_SetError(TO_UINT(E_ReasonCode.uiErrTcpReceiveFailed), 'PingResp receive failed');
eState := E_MqttState.iTcpDisconnect;
ELSIF tonTimer.Q THEN
bPingPending := FALSE;
M_SetError(TO_UINT(E_ReasonCode.uiErrKeepAliveTimeout), 'PingResp timeout');
eState := E_MqttState.iTcpDisconnect;
END_IF
//=======================================================================
// 发布消息
//=======================================================================
E_MqttState.iPublish:
IF NOT bHasWritten THEN
IF NOT M_BuildPublishPacket() THEN
IF NOT bError THEN
M_SetError(TO_UINT(E_ReasonCode.uiErrInvalidParameter), 'Build publish packet failed');
END_IF
uiRetryInflightIndex := 0;
eState := E_MqttState.iConnected;
END_IF
END_IF
IF eState = E_MqttState.iPublish THEN
bTcpWrite := TRUE;
END_IF
IF (eState = E_MqttState.iPublish) AND bHasWritten THEN
bTcpWrite := FALSE;
uiMessagesSent := uiMessagesSent + 1;
dtLastMessageTime := ULINT_TO_DT(uliSysTime / 1000);
CASE ePubQoS OF
E_MqttQoS.byQoS0:
xPublishedEvent := TRUE;
eState := E_MqttState.iConnected;
uiRetryInflightIndex := 0;
E_MqttQoS.byQoS1:
xWaitingForAck := TRUE;
byExpectedMsgType := E_MqttPacketType.byPubAck;
eState := E_MqttState.iPubAck;
E_MqttQoS.byQoS2:
xWaitingForAck := TRUE;
byExpectedMsgType := E_MqttPacketType.byPubRec;
eState := E_MqttState.iPubRec;
ELSE
M_SetError(TO_UINT(E_ReasonCode.uiErrInvalidParameter), 'Invalid QoS level');
eState := E_MqttState.iConnected;
END_CASE
ELSIF (eState = E_MqttState.iPublish) AND fbTcpWrite.xError THEN
bTcpWrite := FALSE;
M_SetError(TO_UINT(E_ReasonCode.uiErrTcpSendFailed), 'Publish send failed');
eState := E_MqttState.iTcpDisconnect;
ELSIF eState = E_MqttState.iPublish THEN
tTimeout := GVL_Mqtt.cnPublishTimeout;
IF tonTimer.Q THEN
M_SetError(TO_UINT(E_ReasonCode.uiErrTimeout), 'MQTT Publish timeout');
eState := E_MqttState.iTcpDisconnect;
END_IF
END_IF
//=======================================================================
// 等待PUBACK (QoS 1)
//=======================================================================
E_MqttState.iPubAck:
tTimeout := T#2S;
IF (uiRxLength > 0) OR M_ReadIntoBuffer() THEN
tTimeout := T#0S;
IF M_ProcessPendingFrames() AND NOT xWaitingForAck THEN
tTimeout := T#0S;
eState := E_MqttState.iConnected;
END_IF
ELSIF fbTcpRead.xError THEN
M_SetError(TO_UINT(E_ReasonCode.uiErrTcpReceiveFailed), CONCAT('Receive error: ', INT_TO_STRING(fbTcpRead.eError)));
eState := E_MqttState.iTcpDisconnect;
ELSIF tonTimer.Q THEN
M_SetError(TO_UINT(E_ReasonCode.uiErrTimeout), 'PubAck timeout');
eState := E_MqttState.iTcpDisconnect;
END_IF
//=======================================================================
// 发布收到 (QoS 2 - 步骤1)
//=======================================================================
E_MqttState.iPubRec:
tTimeout := T#2S;
IF (uiRxLength > 0) OR M_ReadIntoBuffer() THEN
tTimeout := T#0S;
IF M_ProcessPendingFrames() AND NOT xWaitingForAck THEN
tTimeout := T#0S;
eState := E_MqttState.iPubRel;
END_IF
ELSIF fbTcpRead.xError THEN
M_SetError(TO_UINT(E_ReasonCode.uiErrTcpReceiveFailed), CONCAT('Receive error: ', INT_TO_STRING(fbTcpRead.eError)));
eState := E_MqttState.iTcpDisconnect;
ELSIF tonTimer.Q Then
M_SetError(TO_UINT(E_ReasonCode.uiErrTimeout), 'PubRec timeout');
eState := E_MqttState.iTcpDisconnect;
END_IF
//=======================================================================
// 发布释放 (QoS 2 - 步骤2)
//=======================================================================
E_MqttState.iPubRel:
IF NOT bHasWritten THEN
IF NOT M_BuildPubRelPacket() THEN
eState := E_MqttState.iTcpDisconnect;
END_IF
END_IF
IF eState = E_MqttState.iPubRel THEN
bTcpWrite := TRUE;
END_IF
IF (eState = E_MqttState.iPubRel) AND bHasWritten THEN
bTcpWrite := FALSE;
xWaitingForAck := TRUE;
byExpectedMsgType := E_MqttPacketType.byPubComp;
M_InflightUpdateState(
uiPacketId := uiQoS2PacketId,
eNewState := E_MqttInflightState.iPubRelSent);
eState := E_MqttState.iPubComp;
ELSIF (eState = E_MqttState.iPubRel) AND fbTcpWrite.xError THEN
bTcpWrite := FALSE;
M_SetError(TO_UINT(E_ReasonCode.uiErrTcpSendFailed), 'PubRel send failed');
eState := E_MqttState.iTcpDisconnect;
ELSIF eState = E_MqttState.iPubRel THEN
tTimeout := T#2S;
IF tonTimer.Q THEN
M_SetError(TO_UINT(E_ReasonCode.uiErrTimeout), 'MQTT PubRel timeout');
eState := E_MqttState.iTcpDisconnect;
END_IF
END_IF
//=======================================================================
// QoS 2 消息发布完成 (QoS 2 - 步骤3)
//=======================================================================
E_MqttState.iPubComp:
tTimeout := T#2S;
IF (uiRxLength > 0) OR M_ReadIntoBuffer() THEN
tTimeout := T#0S;
IF M_ProcessPendingFrames() AND NOT xWaitingForAck THEN
tTimeout := T#0S;
eState := E_MqttState.iConnected;
END_IF
ELSIF fbTcpRead.xError THEN
M_SetError(TO_UINT(E_ReasonCode.uiErrTcpReceiveFailed), CONCAT('Receive error: ', INT_TO_STRING(fbTcpRead.eError)));
eState := E_MqttState.iTcpDisconnect;
ELSIF tonTimer.Q THEN
M_SetError(TO_UINT(E_ReasonCode.uiErrTimeout), 'PubComp timeout');
eState := E_MqttState.iTcpDisconnect;
END_IF
//=======================================================================
// 客户端订阅请求
//=======================================================================
E_MqttState.iSubscribe:
IF NOT bHasWritten THEN
IF NOT M_BuildSubscribePacket() THEN
eState := E_MqttState.iConnected;
END_IF
END_IF
IF eState = E_MqttState.iSubscribe THEN
bTcpWrite := TRUE;
END_IF
IF (eState = E_MqttState.iSubscribe) AND bHasWritten THEN
bTcpWrite := FALSE;
xWaitingForSubAck := TRUE;
byExpectedMsgType := E_MqttPacketType.bySubAck;
eState := E_MqttState.iSubAck;
ELSIF (eState = E_MqttState.iSubscribe) AND fbTcpWrite.xError THEN
bTcpWrite := FALSE;
M_SetError(TO_UINT(E_ReasonCode.uiErrTcpSendFailed), 'Subscribe send failed');
eState := E_MqttState.iTcpDisconnect;
ELSIF eState = E_MqttState.iSubscribe THEN
tTimeout := T#2S;
IF tonTimer.Q Then
M_SetError(TO_UINT(E_ReasonCode.uiErrTimeout), 'MQTT Subscribe timeout');
eState := E_MqttState.iTcpDisconnect;
END_IF
END_IF
E_MqttState.iSubAck:
tTimeout := T#2S;
IF (uiRxLength > 0) OR M_ReadIntoBuffer() THEN
IF M_ProcessPendingFrames() AND NOT xWaitingForSubAck THEN
tTimeout := T#0S;
eState := E_MqttState.iConnected;
END_IF
ELSIF fbTcpRead.xError THEN
M_SetError(TO_UINT(E_ReasonCode.uiErrTcpReceiveFailed), 'SubAck receive failed');
eState := E_MqttState.iTcpDisconnect;
ELSIF tonTimer.Q THEN
xWaitingForSubAck := FALSE;
M_SetError(TO_UINT(E_ReasonCode.uiErrTimeout), 'SubAck timeout');
eState := E_MqttState.iTcpDisconnect;
END_IF
//=======================================================================
// 客户端取消订阅请求
//=======================================================================
E_MqttState.iUnsubscribe:
IF NOT bHasWritten THEN
IF NOT M_BuildUnsubscribePacket() THEN
eState := E_MqttState.iConnected;
END_IF
END_IF
IF eState = E_MqttState.iUnsubscribe THEN
bTcpWrite := TRUE;
END_IF
IF (eState = E_MqttState.iUnsubscribe) AND bHasWritten Then
bTcpWrite := FALSE;
xWaitingForUnsubAck := TRUE;
byExpectedMsgType := E_MqttPacketType.byUnsubAck;
eState := E_MqttState.iUnsubAck;
ELSIF (eState = E_MqttState.iUnsubscribe) AND fbTcpWrite.xError THEN
bTcpWrite := FALSE;
M_SetError(TO_UINT(E_ReasonCode.uiErrTcpSendFailed), 'Unsubscribe send failed');
eState := E_MqttState.iTcpDisconnect;
ELSIF eState = E_MqttState.iUnsubscribe THEN
tTimeout := T#2S;
IF tonTimer.Q THEN
M_SetError(TO_UINT(E_ReasonCode.uiErrTimeout), 'MQTT Unsubscribe timeout');
eState := E_MqttState.iTcpDisconnect;
END_IF
END_IF
E_MqttState.iUnsubAck:
tTimeout := T#2S;
IF (uiRxLength > 0) OR M_ReadIntoBuffer() THEN
IF M_ProcessPendingFrames() AND NOT xWaitingForUnsubAck THEN
tTimeout := T#0S;
eState := E_MqttState.iConnected;
END_IF
ELSIF fbTcpRead.xError THEN
M_SetError(TO_UINT(E_ReasonCode.uiErrTcpReceiveFailed), 'UnsubAck receive failed');
eState := E_MqttState.iTcpDisconnect;
ELSIF tonTimer.Q THEN
xWaitingForUnsubAck := FALSE;
M_SetError(TO_UINT(E_ReasonCode.uiErrTimeout), 'UnsubAck timeout');
eState := E_MqttState.iTcpDisconnect;
END_IF
//=======================================================================
// 客户端断开连接
//=======================================================================
E_MqttState.iDisconnect:
IF NOT bHasWritten THEN
IF NOT M_BuildDisconnectPacket() THEN
eState := E_MqttState.iTcpDisconnect;
END_IF
END_IF
IF eState = E_MqttState.iDisconnect THEN
bTcpWrite := TRUE;
END_IF
IF (eState = E_MqttState.iDisconnect) AND bHasWritten THEN
bTcpWrite := FALSE;
eState := E_MqttState.iTcpDisconnect;
ELSIF (eState = E_MqttState.iDisconnect) AND fbTcpWrite.xError THEN
bTcpWrite := FALSE;
M_SetError(TO_UINT(E_ReasonCode.uiErrTcpSendFailed), 'Disconnect send failed');
eState := E_MqttState.iTcpDisconnect;
ELSIF eState = E_MqttState.iDisconnect THEN
tTimeout := T#2S;
IF tonTimer.Q THEN
M_SetError(TO_UINT(E_ReasonCode.uiErrTimeout), 'MQTT Disconnect timeout');
eState := E_MqttState.iTcpDisconnect;
END_IF
END_IF
//=======================================================================
// TCP断开连接
//=======================================================================
E_MqttState.iTcpDisconnect:
bTcpConnect := FALSE;
bTcpRead := FALSE;
bTcpWrite := FALSE;
xDisconnectedEvent := TRUE;
xWaitingForAck := FALSE;
xWaitingForSubAck := FALSE;
xWaitingForUnsubAck := FALSE;
bPingPending := FALSE;
xPendingImmediateTx := FALSE;
uiPendingSubPacketId := 0;
uiPendingUnsubPacketId := 0;
uiRetryInflightIndex := 0;
// 自动重连逻辑
IF bAutoReconnect AND uiReconnectAttempts < uiMaxReconnectAttempts THEN
tonReconnect(IN := TRUE, PT := UINT_TO_TIME(uiReconnectDelay));
IF tonReconnect.Q THEN
tonReconnect(IN := FALSE);
uiReconnectAttempts := uiReconnectAttempts + 1;
eState := E_MqttState.iDisconnected;
END_IF
ELSE
tonReconnect(IN := FALSE);
eState := E_MqttState.iDisconnected;
END_IF
ELSE
M_SetError(TO_UINT(E_ReasonCode.uiErrInvalidState), 'Invalid state');
eState := E_MqttState.iDisconnected;
END_CASE
/// =======================================================================
/// 标志位
/// =======================================================================
bMqttConnected S= xConnectedEvent;
bMqttConnected R= xDisconnectedEvent;
/// =======================================================================
/// 超时计时器
/// =======================================================================
tonTimer(
IN := (eLastState = eState) AND (tTimeout <> T#0S),
PT := tTimeout);
eLastState := eState;
/// KeepAlive心跳(BUG-09修复:在所有连接状态下运行)
tonKeepAlive(
IN := (uiKeepAlive <> 0) AND bIsConnected
AND (eState <> E_MqttState.iDisconnected)
AND (eState <> E_MqttState.iTcpConnect)
AND (eState <> E_MqttState.iTcpDisconnect),
PT := UINT_TO_TIME(uiKeepAlive * 1000));
/// =======================================================================
/// TCP
/// =======================================================================
M_TcpClient(
bEnable := bTcpConnect,
sIP := sBrokerIP,
uiPortNum := uiPort,
bIsConnected => bIsConnected);
M_TcpRead(
bEnable := bTcpRead,
pDataReceive := ADR(aRxBuf[uiRxLength]),
udiDataSize := SIZEOF(aRxBuf) - uiRxLength,
bDone => bHasRead,
udiBytesRead => udiBytesRead);
M_TcpWrite(
bExecute := bTcpWrite,
pDataSend := ADR(aTxBuf),
udiDataSize := uiTxLength,
bDone => bHasWritten);
完整代码 2:PRG_Test
- 对应源码路径:
10 MQTT/MqttClient_V1_0/Device/Application/MQTT/POUs/PRG_Test.st - 复制使用说明:主功能块之外,再配一份测试程序,读者会更容易理解这套库是怎么从“库代码”落到“工程调用”的。
- 阅读重点:重点看上层调用视角,理解为什么真正好用的库,不只是内部方法写对,还得让外部工程容易接。
/// =======================================================================
/// 名称 : PRG_Test
/// 功能 : MQTT 客户端测试程序
/// 说明 : 用于驱动 FB_MqttClient 进行连接、发布、订阅与接收测试
/// 编程人员 : ControlRookie
/// 时间 : 2026-05-05
/// 版本 : V1.0
/// =======================================================================
PROGRAM PRG_Test
VAR
bInit : BOOL := TRUE; // 初始化执行标志
fbMqttClient : FB_MqttClient; // MQTT 客户端实例
bEnable : BOOL := TRUE; // 使能客户端
bConnect : BOOL; // 连接命令
sBrokerIP : STRING := '192.168.20.100'; // Broker IP 地址
uiPort : UINT := 1883; // Broker 端口号
sClientID : STRING := 'CodeSys_PLC'; // 客户端标识符
sUsername : STRING := ''; // 用户名
sPassword : STRING := ''; // 密码
eVersion : E_MqttVersion := E_MqttVersion.byMqttVersion311; // MQTT 协议版本
bCleanSession : BOOL := TRUE; // 清理会话标志
uiKeepAlive : UINT := 60; // 心跳周期(秒)
bUseSSL : BOOL := FALSE; // 是否启用 SSL
bAutoReconnect : BOOL := TRUE; // 是否自动重连
uiReconnectDelay : UINT := 5000; // 重连延时(毫秒)
udiSessionExpiry : UDINT := 0; // 会话过期间隔
uiReceiveMax : UINT := 65535; // 最大接收数量
udMaxPacketSize : UDINT := 4096; // 最大报文长度
bWillFlag : BOOL := TRUE; // 是否启用遗嘱消息
sWillTopic : STRING := 'CodeSys'; // 遗嘱主题
sWillMessage : STRING := 'CodeSys Offline'; // 遗嘱消息内容
eWillQoS : E_MqttQoS := E_MqttQoS.byQoS1; // 遗嘱消息 QoS
bWillRetain : BOOL := FALSE; // 遗嘱消息保留标志
bPublish : BOOL; // 发布命令
sPubTopic : STRING := 'CodeSys'; // 发布主题
sPubPayload : STRING := 'CodeSys'; // 发布载荷
ePubQoS : E_MqttQoS := E_MqttQoS.byQoS1; // 发布 QoS
bPubRetain : BOOL := FALSE; // 发布保留标志
bSubscribe : BOOL; // 订阅命令
bUnsubscribe : BOOL; // 取消订阅命令
sSubTopic : STRING := 'CodeSys'; // 订阅主题
sUnsubTopic : STRING := 'CodeSys'; // 取消订阅主题
eSubQoS : E_MqttQoS := E_MqttQoS.byQoS1; // 订阅请求 QoS
udiSubscriptionId : UDINT := 0; // MQTT 5.0 订阅标识符
eMqttState : E_MqttState; // 客户端当前状态
bIsConnected : BOOL; // TCP 连接状态
bMqttConnected : BOOL; // MQTT 连接状态
bError : BOOL; // 错误标志
eErrorID : NBS.ERROR; // 错误码
sDiagMsg : STRING; // 诊断信息
aSubscriptions : ARRAY[1..GVL_Mqtt.cnMaxSubscriptions] OF ST_MqttSubscription; // 订阅列表
uiSubscriptionCount : UINT; // 订阅数量
sRecTopic : STRING; // 最新接收主题
sRecPayload : STRING; // 最新接收载荷
aRecTopicList : ARRAY[0..GVL_Mqtt.cnMaxHistory] OF STRING; // 接收主题历史
aRecPayloadList : ARRAY[0..GVL_Mqtt.cnMaxHistory] OF STRING; // 接收载荷历史
byReceivedQoS : BYTE; // 最新接收 QoS
bReceivedRetain : BOOL; // 最新接收保留标志
END_VAR
// === IMPLEMENTATION ===
IF bInit THEN
bInit := FALSE;
END_IF
/// MQTT客户端
fbMqttClient(
bEnable := bEnable,
bConnect := bConnect,
sBrokerIP := sBrokerIP,
uiPort := uiPort,
sClientID := sClientID,
sUsername := sUsername,
sPassword := sPassword,
eVersion := eVersion,
bCleanSession := bCleanSession,
uiKeepAlive := uiKeepAlive,
bUseSSL := bUseSSL,
bAutoReconnect := bAutoReconnect,
uiReconnectDelay := uiReconnectDelay,
udiSessionExpiry := udiSessionExpiry,
uiReceiveMax := uiReceiveMax,
udMaxPacketSize := udMaxPacketSize,
bWillFlag := bWillFlag,
sWillTopic := sWillTopic,
sWillMessage := sWillMessage,
eWillQoS := eWillQoS,
bWillRetain := bWillRetain,
bPublish := bPublish,
sPubTopic := sPubTopic,
sPubPayload := sPubPayload,
ePubQoS := ePubQoS,
bPubRetain := bPubRetain,
bSubscribe := bSubscribe,
sSubTopic := sSubTopic,
eSubQoS := eSubQoS,
udiSubscriptionId := udiSubscriptionId,
bUnsubscribe := bUnsubscribe,
sUnsubTopic := sUnsubTopic,
eState => eMqttState,
bIsConnected => bIsConnected,
bMqttConnected => bMqttConnected,
bError => bError,
eErrorID => eErrorID,
sDiagMsg => sDiagMsg,
aSubscriptions => aSubscriptions,
uiSubscriptionCount => uiSubscriptionCount,
sRecTopic => sRecTopic,
sRecPayload => sRecPayload,
aRecTopicList => aRecTopicList,
aRecPayloadList => aRecPayloadList,
byReceivedQoS => byReceivedQoS,
bReceivedRetain => bReceivedRetain);
系列导航
- 系列定位:加更篇 2
- 上一篇:加更1 MQTT 5.0 到底值不值得上
- 下一篇:系列收官,无下一篇
项目与资料
- 开源项目名称:MqttClient_V1_0
- GitHub 仓库关键字:ControlRookie MqttClient_V1_0
- 配套资料说明:我已经同步整理 MQTT 标准、控制报文、十六进制拆解、状态机图和 ST 代码映射知识库,后续会持续补全和公开入口
适合谁收藏
- 正在做 CODESYS / PLC / MQTT 项目的人
- 想把 MQTT 从报文真正看到 ST 代码的人
- 正在排查 QoS1 / QoS2 超时、掉线、重连问题的人
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)