我这篇文章是基于某位仁兄的代码所改造而成的,博客直通车

  1. 改造成cmake编译环境

CMakeLists.txt
cmake_minimum_required(VERSION 3.5)

#工程名称
project(mqtttest)

set(CMAKE_CXX_STANDARD 11)

# target source
add_executable( ${PROJECT_NAME}
    subscribe.c 
    mqtt.c
    mqtt/Clients.c 
    mqtt/Log.c 
    mqtt/Heap.c 
    mqtt/Tree.c 
    mqtt/LinkedList.c
    mqtt/Messages.c 
    mqtt/mqtt_client.c 
    mqtt/MQTTClient.c 
    mqtt/MQTTPacket.c 
    mqtt/MQTTPacketOut.c 
    mqtt/MQTTPersistence.c 
    mqtt/MQTTPersistenceDefault.c
    mqtt/MQTTProtocolClient.c 
    mqtt/MQTTProtocolOut.c 
    mqtt/Socket.c 
    mqtt/SocketBuffer.c 
    mqtt/SSLSocket.c 
    mqtt/StackTrace.c 
    mqtt/Thread.c 
    mqtt/utf-8.c
    
)
TARGET_LINK_LIBRARIES(${CMAKE_PROJECT_NAME}
         -lpthread # for PxDT lib
         -pthread
        )
主要是导入的这些文件,下面的-pthread是线程库,这个可以根据我之前的博客去配置
arm_linux_setup.cmake就是环境配置,arm的编译环境配置
set(CMAKE_SYSTEM_NAME Linux)
set(CMAKE_SYSTEM_PROCESSOR arm)

//这里的gcc与g++需要你们找到对应的arm架构里面的 直接使用我的你们是用不了的
set(tools ../linux-x86/aarch64/gcc-arm-10.3-2021.07-x86_64-aarch64-none-linux-gnu/bin)
set(CMAKE_C_COMPILER ${tools}/aarch64-none-linux-gnu-gcc)
set(CMAKE_CXX_COMPILER ${tools}/aarch64-none-linux-gnu-g++)
  1. 基于那位提供源码的仁兄的代码,我稍微更改了些东西,我增加了一个mqtt.c与mqtt.h,这是我用做封装subscribe.c与重连操作的一些代码

subscribe.c
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <signal.h>
#include "pthread.h"
#include "unistd.h"
#include "mqtt.h"

//这是直接gcc 可以在服务器上用的执行二进制文件
//gcc subscribe.c mqtt/Clients.c mqtt/Heap.c mqtt/LinkedList.c mqtt/Log.c mqtt/Messages.c mqtt/mqtt_client.c mqtt/MQTTClient.c mqtt/MQTTPacket.c mqtt/MQTTPacketOut.c mqtt/MQTTPersistence.c mqtt/MQTTPersistenceDefault.c mqtt/MQTTProtocolClient.c mqtt/MQTTProtocolOut.c mqtt/Socket.c mqtt/SocketBuffer.c mqtt/SSLSocket.c mqtt/StackTrace.c mqtt/Thread.c mqtt/Tree.c mqtt/utf-8.c -o main -lpthread


int main(int argc, char ** argv) {

    pthread_t thread_ID;        //定义线程id

    pthread_create(&thread_ID, NULL, &reve_message, NULL);    //创建一个线程执行mqtt客户端
    pthread_detach(thread_ID);    //设置线程结束收尸

    int chara = 0;

    //做测试用 我在这里循环发送消息
    while (1) {
        if (chara == 0)
        {
            chara = 1;
            mqtt_pub(0);
        }
        
        else if (chara == 1)
        {
            chara = 0;
            mqtt_pub(1);
        }
        sleep(5);
    }

    
    return 0;
}
mqtt.c
#include <stdio.h>

#include "mqtt.h"
#include "pthread.h"
#include "string.h"
#include "unistd.h"
#include "sys/stat.h"
#include "sys/types.h"
#include "sys/socket.h"
#include "netinet/in.h"
#include "arpa/inet.h"
#include "fcntl.h"

#include <stdlib.h>
#include <errno.h>
#include <signal.h>
#include "mqtt/mqtt_client.h"

int running = 1;

void stop_running(int sig)
{
    signal(SIGINT, NULL);
    running = 0;
}

mqtt_client *m; //mqtt_client 对象指针
int ret; //返回值
char *host = "xxxx:1883";//测试服务器
char *client_id = "xxxx";//客户端ID; 对测试服务器,可以随便写

//char *host = "xxxx:19883";//测试服务器
//char *client_id = "xxxx";//客户端ID; 对测试服务器,可以随便写

char *topic = "shadow/name/ota/get/accepted"; //主题
char *topic1 = "shadow/name/settings/get/accepted"; //主题
char *username = NULL;//用户名,用于验证身份。对测试服务器,无。
char *password = NULL;//密码,用于验证身份。对测试服务器,无。
int Qos; //Quality of Service

int mqttinti() {
    //create new mqtt client object
    m = mqtt_new(host, MQTT_PORT, client_id); //创建对象,MQTT_PORT = 1883
    if (m == NULL) {
        printf("mqtt client create failure, return  code = %d\n", errno);
        return 1;
    }
    else {
        printf("mqtt client created\n");
    }

    //connect to server
    ret = mqtt_connect(m, username, password); //连接服务器
    if (ret != MQTT_SUCCESS) {
        printf("mqtt client connect failure, return code = %d\n", ret);
        return 1;
    }
    else {
        printf("mqtt client connect\n");
    }

    //subscribe
    Qos = 1;

    ret = mqtt_subscribe(m, topic, Qos);//订阅消息
    printf("mqtt client subscribe %s,  return code = %d\n", topic, ret);

    ret = mqtt_subscribe(m, topic1, Qos);//订阅消息
    printf("mqtt client subscribe %s,  return code = %d\n", topic1, ret);

    //signal(SIGINT, stop_running);
    //signal(SIGTERM, stop_running);
    printf("wait for message of topic: %s ...\n", topic);
}




void mqtt_pub(int code) {
    
    printf("mqtt_pub code = %d\n", code);
    if (code == 0)
    {
        ret = mqtt_publish(m, "shadow/name/settings/get", "{}", 0);//发布消息
        //ret = mqtt_publish(m, topic, "{topic}", Qos);//发布消息
        printf("mqtt client publish,  shadow/name/settings/get code = %d\n", ret);
    }

    if (code == 1)
    {
        ret = mqtt_publish(m, "shadow/name/ota/get", "{}", 0);//发布消息
        //ret = mqtt_publish(m, topic1, "{topic1}", Qos);//发布消息
        printf("mqtt client publish,  shadow/name/ota/get code = %d\n", ret);
    }
    
}

void *reve_message() {
    mqttinti();
    while (running) {
        //这个地方可以做重连的操作 额外增加内容
        ret = mqtt_is_connected(m);
        printf("reve_message mqtt_is_connected ret = %d\n", ret);
        if (ret == 0) {
            mqtt_disconnect(m); //disconnect
            mqtt_delete(m);  //delete mqtt client object
            mqttinti();
        }

        ret = mqtt_receive(m, 200);
        printf("reve_message mqtt_receive ret = %d\n", ret);
        if (ret == MQTT_SUCCESS) { //recieve message,接收消息
            printf("received Topic=%s, Message=%s\n\n\n\n", m->received_topic, m->received_message);
        }
        mqtt_sleep(200); //sleep a while
    }
}

mqtt.h
#ifndef NET_PROC_H
#define NET_PROC_H


void *reve_message();

int mqttinti();

void mqtt_pub(int code);

#endif
  1. 我为什么要写这篇博客:因为我下载的源码里面有一套死锁的逻辑,因为我是c语言初学者,我并没有理解为什么,但是有这套死锁,会出现无法重连的操作,所以,我把死锁的所有代码都注释掉了,然后改了这份用做断网重连的代码!

源码直通车

GitHub 加速计划 / li / linux-dash
10.39 K
1.2 K
下载
A beautiful web dashboard for Linux
最近提交(Master分支:2 个月前 )
186a802e added ecosystem file for PM2 4 年前
5def40a3 Add host customization support for the NodeJS version 4 年前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐