STM32实现MQTT及JSON包思路
最近有个项目要添加MQTT协议,根据协议,我们是MQTT客户端,底层已经实现了TCP/IP协议,
载荷是我们私有协议,现在要把载荷换成MQTT协议和用户定义的JSON包。看了下MQTT协议和
C库,如果想在两天之内实现,难度有点大,没有用过MQTT,不知道JSON是啥?通过对用户协
的解读和对MQTT协议查阅资料,目前想要快速实现,具体思路如下:
1、抓包分析MQTT数据包;
2、查看MQTT中文协议;
3、只实现登入、发布消息和心跳这三种类型包;
4、JSON包用sprintf,只实现用户定义的JSON包;
根据以上思路,需要工具:
1、Wireshark;
2、通信猫;
一、登入帧
固定包头:报文类型+剩余长度 10+剩余长度
可变包头:协议名(Protocol Name),协议级别
(Protocol Level),连接标志(Connect Flags)和保持连接(Keep Alive)
共10个字节
协议名称:00 04 4d 51 54 54 //协议名称
协议版本:04 //协议版本 3.1.1
连接标志:c2 //连接标志
保持连接:00 14 //保持连接,S为单位
const static uint8_t c_VariableHeader[] = {0x00,0x04,0x4d,0x51,0x54,0x54,0x04,0xC2,0x00,0x14};
载荷:客户端标识(必须唯一)+登入名(可以为空)+登入密码(可以为空)
两字节客户端标识长度+客户标识
两字节登入名长度+登入名
两字节登入密码长度+登入密码
//MQTT连接连接包
bool mqtt_connect(uint8_t* pchBuffer,uint16_t* phwSize)
{
uint8_t chVariableHeader[sizeof(c_VariableHeader)];
uint8_t chTemp[SERVICE_BUFFER_SIZE] = {0};
byte_pipe_t tPipe;
union{
uint8_t chTemp[2];
int16_t iTemp;
}tTemp = {0};
if(NULL == pchBuffer || NULL == phwSize){
return false;
}
new_byte_pipe(&tPipe,chTemp,sizeof(chTemp));
memcpy(chVariableHeader,c_VariableHeader,sizeof(c_VariableHeader));
//设备ID
{
uint8_t chBuffer[20] = {0};
memset(chBuffer,0x00,20);
tTemp.iTemp = sprintf((char*)chBuffer,"Dome%d",eeprom_id_read());
if(tTemp.iTemp > 0){
enpipe_byte(&tPipe,tTemp.chTemp[1]);
enpipe_byte(&tPipe,tTemp.chTemp[0]);
enpipe_bytes(&tPipe,chBuffer,tTemp.iTemp);
}
}
//用户名
tTemp.iTemp=strlen((const char*)flash_cfg_mqtt_name_read());
if(tTemp.iTemp > 0){
enpipe_byte(&tPipe,tTemp.chTemp[1]);
enpipe_byte(&tPipe,tTemp.chTemp[0]);
enpipe_bytes(&tPipe,(uint8_t*)flash_cfg_mqtt_name_read(),tTemp.iTemp);
}else{
chVariableHeader[7] = chVariableHeader[7] & 0x7F;
}
//登入密码
tTemp.iTemp=strlen((const char*)flash_cfg_mqtt_password_read());
if(tTemp.iTemp > 0){
enpipe_byte(&tPipe,tTemp.chTemp[1]);
enpipe_byte(&tPipe,tTemp.chTemp[0]);
enpipe_bytes(&tPipe,(uint8_t*)flash_cfg_mqtt_password_read(),tTemp.iTemp);
}else{
chVariableHeader[7] = chVariableHeader[7] & 0xBF;
}
//Keep Alive时间
tTemp.iTemp = eeprom_heart_beat_min_read() * 60;
chVariableHeader[8] = tTemp.chTemp[1];
chVariableHeader[9] = tTemp.chTemp[0];
//固定包头
pchBuffer[0] = 0x10;
if(get_byte_pipe_num(&tPipe)+10 > (sizeof(chTemp) - 3)){
return false;
}
//不支持大于256的数据
if(get_byte_pipe_num(&tPipe)+10 > 127){
pchBuffer[1] = 0x00;
pchBuffer[2] = 0x00;
//低字节在前,高字节在后
pchBuffer[1] = (get_byte_pipe_num(&tPipe)+10) & 0x007F;
pchBuffer[1] = pchBuffer[1] | 0x80;
pchBuffer[2] = (get_byte_pipe_num(&tPipe)+10) >> 7;
//可变帧头
memcpy(pchBuffer+3,chVariableHeader,10);
//Payload
memcpy(pchBuffer+3+10,get_byte_pipe_buffer(&tPipe),get_byte_pipe_num(&tPipe));
*phwSize = 3+10+get_byte_pipe_num(&tPipe);
}else{
pchBuffer[1] = get_byte_pipe_num(&tPipe)+10;
//可变帧头
memcpy(pchBuffer+2,chVariableHeader,10);
//Payload
memcpy(pchBuffer+2+10,get_byte_pipe_buffer(&tPipe),get_byte_pipe_num(&tPipe));
*phwSize = 2+10+get_byte_pipe_num(&tPipe);
}
return true;
}
二、发布消息
发布消息分为QoS0、QoS1、QoS2,其中QoS0没有回复帧,QoS1需要确认帧,QoS2实现
比较复杂,暂不实现。
QoS0不包含报文标识
30+剩余长度+主题长度+主题+用户载荷
//MQTT发送消息包
//没有确认帧
bool mqtt_publish_qos0(uint8_t* pchBuffer,uint16_t* phwSize,uint16_t hwBufferSize,const char* pcPublishTopic,uint8_t* pchJson,uint16_t hwJsonSize)
{
uint16_t j=0;
static union{
uint8_t chFrameNum[2];
uint16_t hwFrameNum;
}s_tFrameNum = {0};
union{
uint8_t chTemp[2];
int16_t iTemp;
}tTemp = {0};
if(NULL == pchBuffer || NULL == phwSize || !hwBufferSize || NULL == pcPublishTopic || NULL == pchJson || !hwJsonSize){
return false;
}
tTemp.iTemp = hwJsonSize;
j = strlen(pcPublishTopic);
if(tTemp.iTemp+j+5 > hwBufferSize){
TRACE_ERROR("mqtt_publish_qos0 size error\r\n");
return false;
}
//固定包头
pchBuffer[0] = 0x30;
//剩余长度
//不支持大于256的数据
if(tTemp.iTemp+j+2 > 127){
pchBuffer[1] = 0x00;
pchBuffer[2] = 0x00;
//低字节在前,高字节在后
pchBuffer[1] = (tTemp.iTemp+j+2) & 0x007F;
pchBuffer[1] = pchBuffer[1] | 0x80;
pchBuffer[2] = (tTemp.iTemp+j+2) >> 7;
//主题长度
pchBuffer[3] = (j >> 8) & 0x00FF;
pchBuffer[4] = (j >> 0) & 0x00FF;
//主题
memcpy(pchBuffer+5,pcPublishTopic,j);
//报文标识
//pchBuffer[j+5+0] = s_tFrameNum.chFrameNum[1];
//pchBuffer[j+5+1] = s_tFrameNum.chFrameNum[0];
//载荷
memcpy(pchBuffer+j+5,pchJson,tTemp.iTemp);
*phwSize = tTemp.iTemp+j+5;
}else{
pchBuffer[1] = tTemp.iTemp+j+2;
//主题长度
pchBuffer[2] = (j >> 8) & 0x00FF;
pchBuffer[3] = (j >> 0) & 0x00FF;
//主题
memcpy(pchBuffer+4,pcPublishTopic,j);
//报文标识
//pchBuffer[j+4+0] = s_tFrameNum.chFrameNum[1];
//pchBuffer[j+4+1] = s_tFrameNum.chFrameNum[0];
//载荷
memcpy(pchBuffer+j+4,pchJson,tTemp.iTemp);
*phwSize = tTemp.iTemp+j+4;
}
s_tFrameNum.hwFrameNum++;
return true;
}
QoS1需要服务器发送确认帧
32+剩余长度+主题长度+主题+报文标识+用户载荷
//MQTT发送消息包
//有确认帧
bool mqtt_publish_qos1(uint8_t* pchBuffer,uint16_t* phwSize,uint16_t hwBufferSize,const char* pcPublishTopic,uint8_t* pchJson,uint16_t hwJsonSize)
{
uint16_t j=0;
static union{
uint8_t chFrameNum[2];
uint16_t hwFrameNum;
}s_tFrameNum = {0};
union{
uint8_t chTemp[2];
int16_t iTemp;
}tTemp = {0};
if(NULL == pchBuffer || NULL == phwSize || !hwBufferSize || NULL == pcPublishTopic || NULL == pchJson || !hwJsonSize){
return false;
}
tTemp.iTemp = hwJsonSize;
j = strlen(pcPublishTopic);
if(tTemp.iTemp+j+5+2 > hwBufferSize){
TRACE_ERROR("mqtt_publish_qos1 size error\r\n");
return false;
}
//固定包头
pchBuffer[0] = 0x32;
//剩余长度
if(tTemp.iTemp+j+4 > 127){
pchBuffer[1] = 0x00;
pchBuffer[2] = 0x00;
//低字节在前,高字节在后
pchBuffer[1] = (tTemp.iTemp+j+4) & 0x007F;
pchBuffer[1] = pchBuffer[1] | 0x80;
pchBuffer[2] = (tTemp.iTemp+j+4) >> 7;
//主题长度
pchBuffer[3] = (j >> 8) & 0x00FF;
pchBuffer[4] = (j >> 0) & 0x00FF;
//主题
memcpy(pchBuffer+5,pcPublishTopic,j);
//报文标识
pchBuffer[j+5+0] = s_tFrameNum.chFrameNum[1];
pchBuffer[j+5+1] = s_tFrameNum.chFrameNum[0];
//载荷
memcpy(pchBuffer+j+5+2,pchJson,tTemp.iTemp);
*phwSize = tTemp.iTemp+j+5+2;
}else{
pchBuffer[1] = tTemp.iTemp+j+4;
//主题长度
pchBuffer[2] = (j >> 8) & 0x00FF;
pchBuffer[3] = (j >> 0) & 0x00FF;
//主题
memcpy(pchBuffer+4,pcPublishTopic,j);
//报文标识
pchBuffer[j+4+0] = s_tFrameNum.chFrameNum[1];
pchBuffer[j+4+1] = s_tFrameNum.chFrameNum[0];
//载荷
memcpy(pchBuffer+j+4+2,pchJson,tTemp.iTemp);
*phwSize = tTemp.iTemp+j+4+2;
}
s_tFrameNum.hwFrameNum++;
return true;
}
三、心跳
发送 C0 00
收到 D0 00
//MQTT心跳包
bool mqtt_pingresp(uint8_t* pchBuffer,uint16_t* phwSize)
{
if(NULL == pchBuffer || NULL == phwSize){
return false;
}
pchBuffer[0] = 0xC0;
pchBuffer[1] = 0x00;
*phwSize = 2;
return true;
}
四、JSON包
JSON包就是一串用户定义的字符串,用sprintf实现即可。
举例:定位
uint16_t mqtt_location_publish(uint32_t wID,char* pcLocation,uint8_t chSize,char* pchBuffer)
{
int16_t i=0;
if(NULL == pcLocation || !chSize || NULL == pchBuffer){
return 0;
}
i = sprintf(pchBuffer,"{\"nodeIp\":\"%u\",\"baseIp\":\"%u\",\"location\":\"",wID,eeprom_id_read());
i = (i>0)?i:0;
if(!i){
return 0;
}
memcpy(pchBuffer+i,pcLocation,chSize);
i+=chSize;
pchBuffer[i] = '"';
i+=1;
pchBuffer[i] = '}';
i+=1;
return i;
}
根据以上思路,在TCP/IP之上实现MQTT的客户端,两天就实现了。
《--------------------------------------------------------------------------------------------------------------------------》
JSON(JavaScript Object Notation)是一种轻量级的数据交换格式.易于阅读和理解,也易于机器解析和生成.JSON采用独立于语言的文本格式,使用了类似于C语言家族的习惯(包括C,C++,C#,Java, JavaScript, Perl, Python等).这些特性使得JSON成为理想的数据交换语言.
一 JSON构建于两种结构:
- "名称/值"对的集合(A collection of name/value pairs).在不同的语言中被理解为对象(Object),记录(Records),结构(struct),字典(dictionary),哈希表(hash table),有键列表(keyed list),或者是关联数组(associative array)
- 值得有序列表(An ordered list of values),在大部分语言中被理解为数据(array)
二 JSON的形式
- 对象
对象是一个无序的"'名称/值'对"集合.一个对象以“{”(左括号)开始,“}”(右括号)结束。每个“名称”后跟一个“:”(冒号);“‘名称/值’ 对”之间使用“,”(逗号)分隔。
2. 数组
数组是值(value)的有序集合。一个数组以“[”(左中括号)开始,“]”(右中括号)结束。值之间间使用“,”(逗号)分隔。
三 值的内容
值(value)可以是双引号括起来的字符串(string)、数值(number)、true、false、 null、对象(object)或者数组(array)。这些结构可以嵌套。
字符串(string)是由双引号包围的任意数量Unicode字符的集合,使用反斜线转义。一个字符(character)即一个单独的字符串(character string)。
字符串(string)与C或者Java的字符串非常相似。
数值(number)也与C或者Java的数值非常相似。除去未曾使用的八进制与十六进制格式。除去一些编码细节。
四 范例
注意:字符串一定要用双引号括起来
数组中可以嵌套数组和对象
{
"name": "BeJson",
"url": "http://www.bejson.com",
"page": 88,
"isNonProfit": true,
"address": {
"street": "科技园路.",
"city": "江苏苏州",
"country": "中国"
},
"links": [
{
"name": "Google",
"url": "http://www.google.com"
},
{
"name": "Baidu",
"url": "http://www.baidu.com"
},
{
"name": "SoSo",
"url": "http://www.SoSo.com"
}
]
}
更多推荐
所有评论(0)