最近有个项目要添加MQTT协议,根据协议,我们是MQTT客户端,底层已经实现了TCP/IP协议,

载荷是我们私有协议,现在要把载荷换成MQTT协议和用户定义的JSON包。看了下MQTT协议和

C库,如果想在两天之内实现,难度有点大,没有用过MQTT,不知道JSON是啥?通过对用户协

的解读和对MQTT协议查阅资料,目前想要快速实现,具体思路如下:

1、抓包分析MQTT数据包;

2、查看MQTT中文协议;

3、只实现登入、发布消息和心跳这三种类型包;

4、JSON包用sprintf,只实现用户定义的JSON包;

根据以上思路,需要工具:

1、Wireshark;

2、通信猫;

一、登入帧

固定包头:报文类型+剩余长度          10+剩余长度

可变包头:协议名(Protocol Name),协议级别
(Protocol Level),连接标志(Connect Flags)和保持连接(Keep Alive)

共10个字节

协议名称:00 04 4d 51 54 54 //协议名称

协议版本:04                  //协议版本 3.1.1

连接标志:c2                   //连接标志

保持连接:00 14               //保持连接,S为单位

const static uint8_t c_VariableHeader[] = {0x00,0x04,0x4d,0x51,0x54,0x54,0x04,0xC2,0x00,0x14};

载荷:客户端标识(必须唯一)+登入名(可以为空)+登入密码(可以为空)

两字节客户端标识长度+客户标识

两字节登入名长度+登入名

两字节登入密码长度+登入密码

//MQTT连接连接包
bool mqtt_connect(uint8_t* pchBuffer,uint16_t* phwSize)
{
    uint8_t     chVariableHeader[sizeof(c_VariableHeader)];
    uint8_t     chTemp[SERVICE_BUFFER_SIZE] = {0};
    byte_pipe_t tPipe;
    union{
        uint8_t     chTemp[2];
        int16_t     iTemp;
    }tTemp = {0};

    if(NULL == pchBuffer || NULL == phwSize){
        return false;
    }

    new_byte_pipe(&tPipe,chTemp,sizeof(chTemp));
    
    memcpy(chVariableHeader,c_VariableHeader,sizeof(c_VariableHeader));

    //设备ID
    {
        uint8_t chBuffer[20] = {0};
        memset(chBuffer,0x00,20);
        tTemp.iTemp = sprintf((char*)chBuffer,"Dome%d",eeprom_id_read());
        if(tTemp.iTemp > 0){
            enpipe_byte(&tPipe,tTemp.chTemp[1]);
            enpipe_byte(&tPipe,tTemp.chTemp[0]);
            enpipe_bytes(&tPipe,chBuffer,tTemp.iTemp);
        }
    }
    //用户名
    tTemp.iTemp=strlen((const char*)flash_cfg_mqtt_name_read());
    if(tTemp.iTemp > 0){
        enpipe_byte(&tPipe,tTemp.chTemp[1]);
        enpipe_byte(&tPipe,tTemp.chTemp[0]);
        enpipe_bytes(&tPipe,(uint8_t*)flash_cfg_mqtt_name_read(),tTemp.iTemp);
    }else{
        chVariableHeader[7] = chVariableHeader[7] & 0x7F;
    }
    //登入密码
    tTemp.iTemp=strlen((const char*)flash_cfg_mqtt_password_read());
    if(tTemp.iTemp > 0){
        enpipe_byte(&tPipe,tTemp.chTemp[1]);
        enpipe_byte(&tPipe,tTemp.chTemp[0]);
        enpipe_bytes(&tPipe,(uint8_t*)flash_cfg_mqtt_password_read(),tTemp.iTemp);
    }else{
        chVariableHeader[7] = chVariableHeader[7] & 0xBF;
    }
    //Keep Alive时间
    tTemp.iTemp = eeprom_heart_beat_min_read() * 60;
    chVariableHeader[8] = tTemp.chTemp[1];
    chVariableHeader[9] = tTemp.chTemp[0];
    //固定包头
    pchBuffer[0] = 0x10;
    if(get_byte_pipe_num(&tPipe)+10 > (sizeof(chTemp) - 3)){
        return false;
    }
    //不支持大于256的数据
    if(get_byte_pipe_num(&tPipe)+10 > 127){
        pchBuffer[1] = 0x00;
        pchBuffer[2] = 0x00;
        //低字节在前,高字节在后
        pchBuffer[1] = (get_byte_pipe_num(&tPipe)+10) & 0x007F;
        pchBuffer[1] = pchBuffer[1] | 0x80;
        pchBuffer[2] = (get_byte_pipe_num(&tPipe)+10) >> 7;
        //可变帧头
        memcpy(pchBuffer+3,chVariableHeader,10);
        //Payload
        memcpy(pchBuffer+3+10,get_byte_pipe_buffer(&tPipe),get_byte_pipe_num(&tPipe));

        *phwSize = 3+10+get_byte_pipe_num(&tPipe);
    }else{
        pchBuffer[1] = get_byte_pipe_num(&tPipe)+10;
        //可变帧头
        memcpy(pchBuffer+2,chVariableHeader,10);
        //Payload
        memcpy(pchBuffer+2+10,get_byte_pipe_buffer(&tPipe),get_byte_pipe_num(&tPipe));

        *phwSize = 2+10+get_byte_pipe_num(&tPipe);
    }

    return true;
}

二、发布消息

发布消息分为QoS0、QoS1、QoS2,其中QoS0没有回复帧,QoS1需要确认帧,QoS2实现

比较复杂,暂不实现。

QoS0不包含报文标识

30+剩余长度+主题长度+主题+用户载荷

//MQTT发送消息包
//没有确认帧
bool mqtt_publish_qos0(uint8_t* pchBuffer,uint16_t* phwSize,uint16_t hwBufferSize,const char* pcPublishTopic,uint8_t* pchJson,uint16_t hwJsonSize)
{
    uint16_t j=0;
    static union{
        uint8_t     chFrameNum[2];
        uint16_t    hwFrameNum;
    }s_tFrameNum = {0};

    union{
        uint8_t     chTemp[2];
        int16_t     iTemp;
    }tTemp = {0};

    if(NULL == pchBuffer || NULL == phwSize || !hwBufferSize || NULL == pcPublishTopic || NULL == pchJson || !hwJsonSize){
        return false;
    }

    tTemp.iTemp = hwJsonSize;

    j = strlen(pcPublishTopic);

    if(tTemp.iTemp+j+5 > hwBufferSize){
        TRACE_ERROR("mqtt_publish_qos0 size error\r\n");
        return false;
    }

    //固定包头
    pchBuffer[0] = 0x30;
    //剩余长度
    //不支持大于256的数据
    if(tTemp.iTemp+j+2 > 127){
        pchBuffer[1] = 0x00;
        pchBuffer[2] = 0x00;
        //低字节在前,高字节在后
        pchBuffer[1] = (tTemp.iTemp+j+2) & 0x007F;
        pchBuffer[1] = pchBuffer[1] | 0x80;
        pchBuffer[2] = (tTemp.iTemp+j+2) >> 7;

        //主题长度
        pchBuffer[3] = (j >> 8) & 0x00FF;
        pchBuffer[4] = (j >> 0) & 0x00FF;
        //主题
        memcpy(pchBuffer+5,pcPublishTopic,j);

        //报文标识
        //pchBuffer[j+5+0] = s_tFrameNum.chFrameNum[1];
        //pchBuffer[j+5+1] = s_tFrameNum.chFrameNum[0];        
        //载荷
        memcpy(pchBuffer+j+5,pchJson,tTemp.iTemp);
        *phwSize = tTemp.iTemp+j+5;
    }else{
        pchBuffer[1] = tTemp.iTemp+j+2;

        //主题长度
        pchBuffer[2] = (j >> 8) & 0x00FF;
        pchBuffer[3] = (j >> 0) & 0x00FF;
        //主题
        memcpy(pchBuffer+4,pcPublishTopic,j);

        //报文标识
        //pchBuffer[j+4+0] = s_tFrameNum.chFrameNum[1];
        //pchBuffer[j+4+1] = s_tFrameNum.chFrameNum[0];        
        //载荷
        memcpy(pchBuffer+j+4,pchJson,tTemp.iTemp);
        *phwSize = tTemp.iTemp+j+4;
    }
    s_tFrameNum.hwFrameNum++;
    return true;
}

QoS1需要服务器发送确认帧

32+剩余长度+主题长度+主题+报文标识+用户载荷

//MQTT发送消息包
//有确认帧
bool mqtt_publish_qos1(uint8_t* pchBuffer,uint16_t* phwSize,uint16_t hwBufferSize,const char* pcPublishTopic,uint8_t* pchJson,uint16_t hwJsonSize)
{
    uint16_t j=0;
    static union{
        uint8_t     chFrameNum[2];
        uint16_t    hwFrameNum;
    }s_tFrameNum = {0};

    union{
        uint8_t     chTemp[2];
        int16_t     iTemp;
    }tTemp = {0};

    if(NULL == pchBuffer || NULL == phwSize || !hwBufferSize || NULL == pcPublishTopic || NULL == pchJson || !hwJsonSize){
        return false;
    }

    tTemp.iTemp = hwJsonSize;

    j = strlen(pcPublishTopic);

    if(tTemp.iTemp+j+5+2 > hwBufferSize){
        TRACE_ERROR("mqtt_publish_qos1 size error\r\n");
        return false;
    }

    //固定包头
    pchBuffer[0] = 0x32;
    //剩余长度
    if(tTemp.iTemp+j+4 > 127){
        pchBuffer[1] = 0x00;
        pchBuffer[2] = 0x00;
        //低字节在前,高字节在后
        pchBuffer[1] = (tTemp.iTemp+j+4) & 0x007F;
        pchBuffer[1] = pchBuffer[1] | 0x80;
        pchBuffer[2] = (tTemp.iTemp+j+4) >> 7;

        //主题长度
        pchBuffer[3] = (j >> 8) & 0x00FF;
        pchBuffer[4] = (j >> 0) & 0x00FF;
        //主题
        memcpy(pchBuffer+5,pcPublishTopic,j);

        //报文标识
        pchBuffer[j+5+0] = s_tFrameNum.chFrameNum[1];
        pchBuffer[j+5+1] = s_tFrameNum.chFrameNum[0];        
        //载荷
        memcpy(pchBuffer+j+5+2,pchJson,tTemp.iTemp);
        *phwSize = tTemp.iTemp+j+5+2;
    }else{
        pchBuffer[1] = tTemp.iTemp+j+4;

        //主题长度
        pchBuffer[2] = (j >> 8) & 0x00FF;
        pchBuffer[3] = (j >> 0) & 0x00FF;
        //主题
        memcpy(pchBuffer+4,pcPublishTopic,j);

        //报文标识
        pchBuffer[j+4+0] = s_tFrameNum.chFrameNum[1];
        pchBuffer[j+4+1] = s_tFrameNum.chFrameNum[0];        
        //载荷
        memcpy(pchBuffer+j+4+2,pchJson,tTemp.iTemp);
        *phwSize = tTemp.iTemp+j+4+2;
    }
    s_tFrameNum.hwFrameNum++;
    return true;
}

三、心跳

发送 C0 00 

收到 D0 00

//MQTT心跳包
bool mqtt_pingresp(uint8_t* pchBuffer,uint16_t* phwSize)
{
    if(NULL == pchBuffer || NULL == phwSize){
        return false;
    }

    pchBuffer[0] = 0xC0;
    pchBuffer[1] = 0x00;

    *phwSize     = 2;

    return true;
}

四、JSON包

JSON包就是一串用户定义的字符串,用sprintf实现即可。

举例:定位

uint16_t    mqtt_location_publish(uint32_t  wID,char* pcLocation,uint8_t chSize,char* pchBuffer)
{
    int16_t    i=0;

    if(NULL == pcLocation || !chSize || NULL == pchBuffer){
        return 0;
    }

    i = sprintf(pchBuffer,"{\"nodeIp\":\"%u\",\"baseIp\":\"%u\",\"location\":\"",wID,eeprom_id_read());

    i = (i>0)?i:0;

    if(!i){
        return 0;
    }

    memcpy(pchBuffer+i,pcLocation,chSize);

    i+=chSize;

    pchBuffer[i] = '"';
    i+=1;
    pchBuffer[i] = '}';
    i+=1;
    return i;    
}

根据以上思路,在TCP/IP之上实现MQTT的客户端,两天就实现了。

《--------------------------------------------------------------------------------------------------------------------------》

JSON(JavaScript Object Notation)是一种轻量级的数据交换格式.易于阅读和理解,也易于机器解析和生成.JSON采用独立于语言的文本格式,使用了类似于C语言家族的习惯(包括C,C++,C#,Java, JavaScript, Perl, Python等).这些特性使得JSON成为理想的数据交换语言.

一 JSON构建于两种结构:

  1. "名称/值"对的集合(A collection of name/value pairs).在不同的语言中被理解为对象(Object),记录(Records),结构(struct),字典(dictionary),哈希表(hash table),有键列表(keyed list),或者是关联数组(associative array)
  2. 值得有序列表(An ordered list of values),在大部分语言中被理解为数据(array)

二 JSON的形式

  1. 对象

  对象是一个无序的"'名称/值'对"集合.一个对象以“{”(左括号)开始,“}”(右括号)结束。每个“名称”后跟一个“:”(冒号);“‘名称/值’ 对”之间使用“,”(逗号)分隔。

  2. 数组

  

  数组是值(value)的有序集合。一个数组以“[”(左中括号)开始,“]”(右中括号)结束。值之间间使用“,”(逗号)分隔。 

 三 值的内容 

值(value)可以是双引号括起来的字符串(string)、数值(number)、true、false、 null、对象(object)或者数组(array)。这些结构可以嵌套。 

 

字符串(string)是由双引号包围的任意数量Unicode字符的集合,使用反斜线转义。一个字符(character)即一个单独的字符串(character string)。 

字符串(string)与C或者Java的字符串非常相似。 

 

数值(number)也与C或者Java的数值非常相似。除去未曾使用的八进制与十六进制格式。除去一些编码细节。

 

四 范例

注意:字符串一定要用双引号括起来

   数组中可以嵌套数组和对象

{

    "name": "BeJson",

    "url": "http://www.bejson.com",

    "page": 88,

    "isNonProfit": true,

    "address": {

        "street": "科技园路.",

        "city": "江苏苏州",

        "country": "中国"

    },

    "links": [

        {

            "name": "Google",

            "url": "http://www.google.com"

        },

        {

            "name": "Baidu",

            "url": "http://www.baidu.com"

        },

        {

            "name": "SoSo",

            "url": "http://www.SoSo.com"

        }

    ]

}

GitHub 加速计划 / js / json
41.72 K
6.61 K
下载
适用于现代 C++ 的 JSON。
最近提交(Master分支:1 个月前 )
960b763e 4 个月前
8c391e04 6 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐