物联网下,物理设备内存CPU有限、4G网络不可靠、网络带宽小等,目前有emqttd、mosquitto、activemq等支持mqtt协议。对于点对点的消息传递直接使用一般的通信方式不使用消息队列就ok的,但是最近出了个需求需要消息广播准备使用发布订阅来实现。rabbitmq是将mqtt协议转换为amqp协议来处理。

  • 1.消息类型

消息类型比较简单,请求报文也比较简单。

CONNECT	    1	    C->S	客户端请求与服务端建立连接
CONNACK	    2	    S->C	服务端确认连接建立
PUBLISH	    3	    CóS	    发布消息
PUBACK	    4	    CóS	    收到发布消息确认
PUBREC	    5	    CóS	    发布消息收到
PUBREL	    6	    CóS	    发布消息释放
PUBCOMP	    7	    CóS	    发布消息完成
SUBSCRIBE	8	    C->S	订阅请求
SUBACK	    9	    S->C	订阅确认
UNSUBSCRIBE	10	    C->S	取消订阅
UNSUBACK	11	    S->C	取消订阅确认
PING	    12	    C->S	客户端发送PING(连接保活)命令
PINGRSP	    13	    S->C	PING命令回复
DISCONNECT	14	    C->S	断开连接
  • 2.rabbitmq开启mqtt
#开启WEB管理
rabbitmq-plugins enable rabbitmq_management
#开启MQTT插件
rabbitmq-plugins enable rabbitmq_mqtt

启用的是1883端口:

mqtt

  • 3.java客户端

程序都是网上找的,先写了个简单的测试。依赖文件:

<dependency>
	<groupId>org.eclipse.paho</groupId>
	<artifactId>mqtt-client</artifactId>
	<version>0.4.0</version>
</dependency>

publish端:

//发布客户端
public class publishClient {

    //mqtt服务器地址
    public static final String HOST = "tcp://114.116.48.130:1883";
    //主题
    public static final String TOPIC = "service_login";
    //mqtt 客户机ID
    private static final String clientid = "server";
    private MqttClient client;//客户端实例
    private MqttTopic topic11;//主题实例
    private String userName = "*****";  //非必须
    private String passWord = "*****";  //非必须
    private MqttMessage message;
    //初始化客户端实例
    public publishClient() throws MqttException {
        //MemoryPersistence设置clientid的保存形式,默认为以内存保存
        client = new MqttClient(HOST, clientid, new MemoryPersistence());
        connect();
    }
    //连接服务器
    private void connect() {
        //连接配置
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);//不保存,每次重启新client
        options.setUserName(userName);
        options.setPassword(passWord.toCharArray());
        // 设置超时时间
        options.setConnectionTimeout(10);
        // 设置会话心跳时间
        options.setKeepAliveInterval(20);
        try {
            //设置发布回调
            client.setCallback(new publishCallback());
            client.connect(options);
            topic11 = client.getTopic(TOPIC);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //发布
    public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException,
            MqttException {
        MqttDeliveryToken token = topic.publish(message);
        token.waitForCompletion();
        System.out.println("message is published completely! "+ token.isComplete());
    }
    //测试类
    public static void main(String[] args) throws MqttException, InterruptedException {
        //发布客户端
        publishClient server = new publishClient();
        //每隔10s发一条
        for (;;){
            server.message = new MqttMessage();
            server.message.setQos(1);//保证消息能到达一次
            server.message.setRetained(true);//消息保留
            server.message.setPayload("{'key':'value'}".getBytes());//消息内容
            server.publish(server.topic11 , server.message);//发布
            Thread.sleep(10000);
        }
    }
}

subscribe端:

//订阅客户端
public class subscribeClient {
    //mqtt服务器ip
    public static final String HOST = "tcp://114.116.48.130:1883";
    //主题
    public static final String TOPIC1 = "service_login";
    //mqtt 客户机ID
    private String clientid = "client";
    private MqttClient client;
    private MqttConnectOptions options;
    private String userName = "*****";
    private String passWord = "*****";
    @SuppressWarnings("unused")
    private ScheduledExecutorService scheduler;
    public subscribeClient(String clientid){
        this.clientid = clientid;
    }
    private void start() {
        try {
            // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
            client = new MqttClient(HOST, clientid, new MemoryPersistence());
            // MQTT的连接设置
            options = new MqttConnectOptions();
            // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
            options.setCleanSession(true);
            // 设置连接的用户名
            options.setUserName(userName);
            // 设置连接的密码
            options.setPassword(passWord.toCharArray());
            // 设置超时时间 单位为秒
            options.setConnectionTimeout(10);
            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
            options.setKeepAliveInterval(20);
            // 设置回调
            client.setCallback(new subcribeCallback());
            MqttTopic topic = client.getTopic(TOPIC1);
            //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
            // 遗嘱
            options.setWill(topic, "close".getBytes(), 2, true);
            client.connect(options);
            //订阅消息
            int[] Qos  = {1};
            String[] topic1 = {TOPIC1};
            client.subscribe(topic1, Qos);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) throws MqttException {
        //一个设备一个队列
        new subscribeClient("client_1").start();
        new subscribeClient("client_2").start();
        new subscribeClient("client_3").start();
    }
}

 callback回调:

public class publishCallback implements MqttCallback {

    //在断开连接时调用
    @Override
    public void connectionLost(Throwable cause) {
        // 连接丢失后,一般在这里面进行重连
        System.out.println("连接断开,可以做重连");
    }

    //接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete---------" + token.isComplete());
    }

    //接收已经预订的发布
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // subscribe后得到的消息会执行到这里面
        System.out.println("接收消息主题 : " + topic);
        System.out.println("接收消息Qos : " + message.getQos());
        System.out.println("接收消息内容 : " + new String(message.getPayload()));
    }
}
  • 4.测试

首先QoS取值决定了消息质量,消息推送端分别设置不同的消息级别在建立虚拟通道是有差异,先启动推送端十秒一条消息

0:尽力发送,如果遇到传递失败,TCP传输层保证不会重试,出错会丢失消息
1:消费者没有接收确认或者确认消息本身丢失,消息发送者会再次发送,可能造成消息重复
2:保证消息接收者成功接收一次,造成并发性能下降以及消息传递时延增加

关于rabbitmq的web窗口参数:

Publish:producter pub消息的速率。
Publisher confirm:broker确认pub消息的速率。
Deliver(manual ack):customer手动确认的速率。
Deliver( auto ack):customer自动确认的速率。
Consumer ack:customer正在确认的速率。
Redelivered:正在传递'redelivered'标志集的消息的速率。
Get (manual ack):响应basic.get而要求确认的消息的传输速率。
Get (auto ack):响应于basic.get而发送不需要确认的消息的速率。
Return:将basic.return发送给producter的速率。
Disk read:queue从磁盘读取消息的速率。
Disk write:queue从磁盘写入消息的速率。

server.message.setQos(0):

级别0

server.message.setQos(1):

级别1

server.message.setQos(2):

启动会报IO异常,待解决。Qos=2报错,已断开连接 (32109) - java.io.EOFException,查阅资料发现是rabbitmq自身的bug,据说插件升级到mqtt-3.1.1可以解决,还没有尝试。

本身confirm方式就是用来确保消息成功推送到broker中,这里正好rabbitmqq默认使用confirm实现mqtt协议的QoS=1。rabbitmq的ack来自于发布确认,但是消费者还没有启动所以队列也没有创建,消息从broker传递到队列中之后(不管消息有没有被消费)都会由broker返回确认,启动消费者,查看队列如下:

rabbitmq实现的mqtt协议在发布订阅模式下,每个消费者都会创建一个队列,创建队列由订阅主题时触发client.subscribe(topic1, Qos),对于rabbitmq来说会收到broker返回的消息发布成功确认消息:

每个队列都只有一个消费者,如下:

由于rabbitmq是将mqtt协议转化为amqp协议,在mqtt协议里面是没有交换机、队列概念的,所以这里整个mqtt服务器是利用同一个topic交换机实现的,查看交换机如下:

这里交换机消息进出一比三,交换机绑定三个队列正好对应:

如果更换topic路由键,比如login主题增加一个logout主题,交换机中就会新建一个主题,再启动三个消费者去订阅,结果如下:

  • 5.org.eclipse.paho.client.mqttv3使用细节

Retained :可以让新订阅的客户端得到发布方的最新的状态值,而不必要等待发送,此操作属于持久化操作,消费端重启服务依然可以收到。mqtt协议消息类型publish有一个redain标记位,broker会储存该topic最后一条消息,新上线的客户端会收到这一条消息,这个消息是本地持久化即使推送端重启。

CleanSession:是否清除客户端session,清除会使用新身份登入。如果不清除,即使是客户端下线,我这边关掉消费者,mqtt服务器会保留客户端信息如下,点进去发会发现这个队列没有消费者。如果清除的话这里是没有这条记录的。

ConnectionTimeout:超时时间。

KeepAliveInterval:会话心跳时间。

我这边推送端server.message.setRetained(false)设置消息不保留,消费端设置options.setCleanSession(false)客户端身份不清除,按道理消费者重复上线是不会收到保留消息的,可我这里没生效原因不明,重启依旧收到上一条消息的保留值。所以对publish请求抓包,查看一下publish推送请求如下:

查看subscribe请求抓包如下:

推送的时候retain是false,订阅到的消息retain是true,应该是rabbitmq给我改了, 估摸是和rabbitmq的消息持久化有关,我现在在web窗口手动publish一条消息(设置非持久化)再对subscribe端抓包发现retain为0,且重启subscribe端是没有重复获取这条消息的,结果如下:

在web窗口手动向topic交换机publish消息走的是qmqp协议并没有通过rabbitmq自带的mqtt插件所以会造成这种差别,查看rabbitmq-mqtt源码

#非持久化
delivery_mode(?QOS_0) -> 1;
#持久化
delivery_mode(?QOS_1) -> 2.

发现qos=1的情况默认持久化消息,所以用rabbitmq-mqtt插件会让qos=1的publicsh消息设置retain=false无效,再次设置qos=0然后测试重启果然没有收到这个消息。

  • 6.关于QPS

publish端设置qos=0,subcribe端设置cleanSession=false,启动推送端5秒一条消息,启动消费端之后又停止,查看队列中的未消费消息:

这种情况应该是效率最好的。 对于单队列来说,让生产和消费速率平衡之后测试速率的峰值可以很方便得到QPS,可是我们这里业务场景是生产者速率并不会太高,但是因为要对n多设备进行消息广播所以rabbitmq中在线的队列数量会比我们设备还多,目前还不知道用rabbitmq-mqtt实现广播消息性能怎么样。现在while死循环创建客户端取订阅主题,如图:

看到默认的socket连接上线是829个,队列达到500的时候就已经占用了一大半了。 

中国移动提供了开放的物联网平台,支持多种协议,mqtt就是其中一种。所有服务端交互使用https命令,然后硬件再用对应的协议比如mqtt进行连接。不用自己搭建、不用担心负载、使用简单,就是中国移动要平台维护,几个月一次,每次几分钟,一般的应用可能受不了这5分钟的服务不可用。

中国移动oneNet物联网开放平台

  • 8.mosquitto

搭建也比较简单。

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐