前言:

        RocketMQ阿里开源的,一款分布式的消息中间件,它经过阿里的生产环境的高并发、高吞吐的考验,同时,还支持分布式事务等场景。RocketMQ使用Java语言进行开发,方便Java开发者学习源码。但是,RocketMQ设计相对复杂,官方文档不是很完善,不太适合中小公司引用。技多不压身,作为一个好的Coder,应该多学习一下优秀的框架。本篇主要介绍一下,RocketMQ的基础用法:

正文:

        这里我主要通过代码来介绍一下RocketMQ的使用,首先介绍一下RocketMQ的原生写法,然后介绍基于Springboot体系下,RocketMQ生产者的消息发送、消费者的消息接受等写法。

一、Java原生的消息发送与接收写法:

1. 生产者:

/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2022/9/30
 * @since 1.0.0
 */
@Slf4j
public class Producer {


    public static void main(String[] args) throws Exception {

        // 实例化消息生产者
        DefaultMQProducer producer = new DefaultMQProducer(RocketConstant.ORIGINAL_PRODUCER_GROUP);
        // 设置NameServer的地址
        producer.setNamesrvAddr(RocketConstant.NAME_SERVER_ADDR);
        // 启动Producer实例
        producer.start();
        // 创建消息,设置 Topic、Tag、keys、flag、消息体等
        Message message = new Message(RocketConstant.ORIGINAL_TOPIC, ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));
        // 发送消息到一个Broker
        SendResult sendResult = producer.send(message);
        // 通过sendResult返回消息是否成功送达
        log.info("消息发送成功:{}", sendResult);
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }

}

2. 消费者:

/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2022/9/30
 * @since 1.0.0
 */
@Slf4j
public class Consumer {

    public static void main(String[] args) throws Exception {

        // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketConstant.ORIGINAL_CONSUMER_GROUP);
        // 设置NameServer的地址
        consumer.setNamesrvAddr(RocketConstant.NAME_SERVER_ADDR);
        // 订阅一个或多个Topic,用Tag来过滤需要消费的消息,这里指定*表示接收所有Tag的消息
        consumer.subscribe(RocketConstant.ORIGINAL_TOPIC, "*");
        // 注册回调实现类来处理从broker拉取回来的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExts, ConsumeConcurrentlyContext context) {
                log.info("{}收到消息:{}", this.getClass().getSimpleName(), messageExts);
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
         // 启动消费者实例
        consumer.start();
        log.info("消费者启动成功!");
    }
}

二、基于Springboot体系的消息发送写法:

1.普通消息发送:

        具体有三种形式,主要包括:同步、异步、单向。

    /**
     * 发送普通同步消息
     *
     * @return
     */
    @GetMapping("/sendMqBySync")
    @ResponseBody
    public Object sendMqBySync() {
        RocketMessage message = RocketMessage.builder().name("普通同步消息" + LocalDateTime.now()).build();
        // syncSend
        SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.COMMON_TOPIC, message);
        return sendResult;
    }

    /**
     * 发送异步消息
     *
     * @return
     */
    @GetMapping("/sendMqByAsync")
    @ResponseBody
    public Object sendMqByAsync() {
        RocketMessage message = RocketMessage.builder().name("异步消息" + LocalDateTime.now()).build();
        // asyncSend
        rocketMQTemplate.asyncSend(RocketConstant.ASYNC_TOPIC, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                // 处理相应的业务
                log.info("发送成功:{}", JSON.toJSONString(sendResult));
            }

            @Override
            public void onException(Throwable throwable) {
                // 处理相应的业务
                log.info("发送异常:{}", throwable);
            }
        });
        return null;
    }


    /**
     * 发送单向消息
     * <p>
     * 这种方式主要用在不特别关心发送结果的场景,例如日志发送
     *
     * @return
     */
    @GetMapping("/sendMqByOneWay")
    @ResponseBody
    public Object sendMqByOneWay() {
        RocketMessage message = RocketMessage.builder().name("单向消息" + LocalDateTime.now()).build();
        // sendOneWay
        rocketMQTemplate.sendOneWay(RocketConstant.ONE_WAY_TOPIC, message);
        return null;
    }

2.顺序消息发送:

        具体有两种形式,主要包括:普通顺序、严格顺序。

    /**
     * 发送普通顺序消息
     *
     * @return
     */
    @GetMapping("/sendMqByOrder")
    @ResponseBody
    public Object sendMqByOrder() {

        List<SendResult> results = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            RocketMessage message = RocketMessage.builder().name("普通顺序消息" + LocalDateTime.now() + i).build();
            // syncSendOrderly
            SendResult sendResult = rocketMQTemplate.syncSendOrderly(RocketConstant.COMMON_TOPIC, message, "hashkey");
            results.add(sendResult);
        }
        return results;
    }


    /**
     * 发送严格顺序消息
     * <p>
     * 概念:
     * 顺序消息是一种对消息发送和消费顺序有严格要求的消息
     * <p>
     * 生产顺序性:
     * RocketMQ通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化。如需保证消息生产的顺序性,则必须满足以下条件:
     * 单一生产者: 消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的分区键,不同生产者之间产生的消息也无法判定其先后顺序。
     * 串行发送:生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。
     *
     * @return
     */
    @GetMapping("/sendMqByStrictOrder")
    @ResponseBody
    public Object sendMqByStrictOrder() {
        List<SendResult> results = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            RocketMessage message = RocketMessage.builder().name("严格顺序消息" + LocalDateTime.now() + i).build();
            // syncSendOrderly
            SendResult sendResult = rocketMQTemplate.syncSendOrderly(RocketConstant.STRICT_ORDER_TOPIC, message, "hashkey");
            results.add(sendResult);
        }
        return results;
    }

3. 延迟消息发送:

    /**
     * 发送延时消息
     * <p>
     * 延时消息的使用限制
     * private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
     * 现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18,
     * 消息消费失败会进入延时消息队列
     *
     * @return
     */
    @GetMapping("/sendMqByDelay")
    @ResponseBody
    public Object sendMqByDelay() {
        RocketMessage message = RocketMessage.builder().name("延时消息" + LocalDateTime.now()).build();
        // syncSend(... int delayLevel)
        SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.DELAY_TOPIC, MessageBuilder.withPayload(message).build(), 2000, 4);
        return sendResult;
    }

4. 批量消息发送:

    /**
     * 批量发送消息
     * <p>
     * 在对吞吐率有一定要求的情况下,RocketMQ可以将一些消息聚成一批以后进行发送,
     * 可以增加吞吐率,并减少API和网络调用次数
     *
     * @return
     */
    @GetMapping("/sendMqByBatch")
    @ResponseBody
    public Object sendMqByBatch() {

        List<Message> messageList = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            RocketMessage message = RocketMessage.builder().name("批量消息" + LocalDateTime.now() + i).build();
            messageList.add(MessageBuilder.withPayload(message).build());
        }
        // syncSend
        SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.COMMON_TOPIC, messageList);
        return sendResult;
    }

 5. 事务消息发送:

   /**
     * 发送事务消息(半消息)
     * <p>
     * 仅仅只是保证本地事务和MQ消息发送形成整体的原子性,而投递到MQ服务器后,
     * 并无法保证消费者一定能消费成功
     * <p>
     *
     * @return
     */
    @GetMapping("/sendMqByTx")
    @ResponseBody
    public Object sendMqByTx(Integer type, Integer msgKey) {
        String transactionId = UUID.randomUUID().toString();
        No6LocalTransactionOriginalSyntaxListener transactionListener = new No6LocalTransactionOriginalSyntaxListener();
        TransactionMQProducer producer = new TransactionMQProducer(RocketConstant.TX_PRODUCER_GROUP);
        try {
            producer.setTransactionListener(transactionListener);
            producer.setNamesrvAddr(RocketConstant.NAME_SERVER_ADDR);
            producer.start();
            log.info("transactionId is {}", transactionId);
            org.apache.rocketmq.common.message.Message msg = new org.apache.rocketmq.common.message.Message(RocketConstant.TX_TOPIC,
                    ("事务消息" + LocalDateTime.now()).getBytes(RemotingHelper.DEFAULT_CHARSET));
            msg.getProperties().put(RocketMQHeaders.TRANSACTION_ID, transactionId);
            msg.getProperties().put("type", String.valueOf(type));
            msg.getProperties().put("msgKey", String.valueOf(msgKey));
            SendResult sendResult = producer.sendMessageInTransaction(msg, null);
            return sendResult;
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.shutdown();
        }

        return null;
    }

6. 带Tag消息发送:

    /**
     * 发送带Tag消息
     * <p>
     * Tag(标签)可以看作子主题,它是消息的第二级类型
     * 通过 RocketMQTemplate发送带Tag的消息,只需要将topic和tag中间通过【:】冒号连接即可
     *
     * @return
     */
    @GetMapping("/sendMqWithTag")
    @ResponseBody
    public Object sendMqWithTag() {
        RocketMessage message = RocketMessage.builder().name("tag消息" + LocalDateTime.now()).build();
        SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.TAG_TOPIC + ":" + RocketConstant.TAG_EXPRESSION, message);
        return sendResult;
    }

三、基于Springboot体系的消息接收写法:

1. MessageModel(消息模型):

        消息模型有两种,主要包括:集群消费与广播消费

    /**
     * 发送集群或广播消息
     * <p>
     * 广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。
     *
     * @param MessageModel 0:CLUSTERING   1:BROADCASTING
     * @return
     */
    @GetMapping("/sendMqByMessageModel")
    @ResponseBody
    public Object sendMqByMessageModel(@RequestParam Integer MessageModel) {

        List<SendResult> results = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            if (MessageModel == 0) {
                RocketMessage message = RocketMessage.builder().name("消息模型-集群消费" + LocalDateTime.now() + i).build();
                SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.COMMON_TOPIC, message);
                results.add(sendResult);
            } else {
                RocketMessage message = RocketMessage.builder().name("消息模型-广播消费" + LocalDateTime.now() + i).build();
                SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.BROADCAST_TOPIC, message);
                results.add(sendResult);
            }
        }
        return results;
    }

2. ConsumeMode(消费模型):

        消费模型有两种,主要包括:并发消费与顺序消费

    /**
     * 并发消费与顺序消费
     *
     * @param consumeMode 0:CONCURRENTLY  1:ORDERLY
     * @return
     */
    @GetMapping("/sendMqByConsumeMode")
    @ResponseBody
    public Object sendMqByConsumeMode(Integer consumeMode) {
        List<SendResult> results = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            if (consumeMode == 0) {
                RocketMessage message = RocketMessage.builder().name("消费模型-并发消费" + LocalDateTime.now() + i).build();
                SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.COMMON_TOPIC, message);
                results.add(sendResult);
            } else {
                RocketMessage message = RocketMessage.builder().name("消费模型-顺序消费" + LocalDateTime.now() + i).build();
                SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.STRICT_ORDER_TOPIC, message);
                results.add(sendResult);
            }
        }
        return results;
    }

3. 消息过滤:

        消息过滤有两种方式,主要包括:Tag过滤与SQL92过滤

   /**
     * Tag过滤与SQL92过滤
     * <p>
     * Tag过滤:
     * 消费者订阅的Tag和发送者设置的消息Tag相互匹配,则消息被投递给消费端进行消费。
     * SQL92过滤:
     * 发送者设置Tag或自定义消息属性,消费者订阅满足SQL92过滤表达式的消息被投递给消费端进行消费。
     * 开启对SQL语法的支持(broker.conf):
     * enablePropertyFilter = true
     *
     * @param filterMode 0:Tag过滤、1:SQL92过滤Tag、2:SQL92过滤自定义消息属性
     * @return
     */
    @GetMapping("/sendMqByFilterMode")
    @ResponseBody
    public Object sendMqByFilterMode(Integer filterMode) {
        if (filterMode == 0) {
            RocketMessage message = RocketMessage.builder().name("消费过滤-Tag过滤" + LocalDateTime.now()).build();
            SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.TAG_TOPIC + ":" + RocketConstant.TAG_EXPRESSION, message);
            return sendResult;
        } else if (filterMode == 1) {
            RocketMessage message = RocketMessage.builder().name("消费过滤-SQL92过滤Tag" + LocalDateTime.now()).build();
            SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.SQL92_TOPIC + ":" + RocketConstant.SQL92_TAG_EXPRESSION, message);
            return sendResult;
        } else {
            // Message msg = new Message("topic", "tagA", "Hello MQ".getBytes())
            // 设置自定义属性A,属性值为1。-> msg.putUserProperties("a", "1")
            //  RocketMQTemplate 目前好像不支持这种写法
            RocketMessage message = RocketMessage.builder().name("消费过滤-SQL92过滤自定义消息属性" + LocalDateTime.now()).build();
            Map<String, Object> map = new HashMap<>();
            map.put("a", 1);
            MessageHeaders messageHeaders = new MessageHeaders(map);
            SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.SQL92_PROPERTIES_TOPIC, MessageBuilder.createMessage(message, messageHeaders));
            return sendResult;
        }
    }

4. 消息重试与死信队列:

  /**
     * 消息重试与死信队列
     * <p>
     * 1. 消息重试只针对集群消费模式生效;广播消费模式不提供失败重试特性,即消费失败后,
     * 失败消息不再重试,继续消费新的消息。
     * 2. 一条消息初次消费失败后,会自动进行消息重试,达到最大重试次数后,将其发送到该消费者对应的死信队列,
     * 这类消息称为死信消息(Dead-Letter Message)。死信队列是死信Topic下,分区数唯一的单独队列。
     * 3. 如果产生了死信消息,那对应的ConsumerGroup的死信Topic名称为%DLQ%ConsumerGroupName,
     * 死信队列的消息将不会再被消费。
     * 4. 可以利用RocketMQ Admin工具或者RocketMQ Dashboard上查询到对应死信消息的信息。
     *
     * @return
     */
    @GetMapping("/sendMqByRetry")
    @ResponseBody
    public Object sendMqByRetry() {
        RocketMessage message = RocketMessage.builder().name("消息重试" + LocalDateTime.now()).build();
        // syncSend
        SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.RETRY_TOPIC, message);
        return sendResult;
    }

5. 消息应答:

   /**
     * 消息应答
     *
     * @return
     */
    @GetMapping("/sendByReply")
    @ResponseBody
    public Object sendByReply() {
        RocketMessage message = RocketMessage.builder().name("消息应答" + LocalDateTime.now()).build();
        RocketMessage receiveMessage = rocketMQTemplate.sendAndReceive(RocketConstant.REPLY_TOPIC, message, RocketMessage.class);
        return receiveMessage;
    }

6. Pull消费:

        Pull消费包括两种方式,主要包括:原始Pull Consumer与Lite Pull Consumer)

   /**
     * 原始Pull Consumer的消息发送
     *
     * @return
     */
    @GetMapping("/sendByOriginalPull")
    @ResponseBody
    public Object sendByOriginalPull() {

        List<Message> messageList = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            RocketMessage message = RocketMessage.builder().name("原始pull消息" + LocalDateTime.now() + i).build();
            messageList.add(MessageBuilder.withPayload(message).build());
        }

        // syncSend
        SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.PULL_ORIGINAL_TOPIC, messageList);
        return sendResult;
    }

    @GetMapping("/pullByOriginal")
    @ResponseBody
    public void pullByOriginal() {

        pullMgsOriginal.pull(0, 2);
    }

    /**
     * 使用rocketMQTemplate拉取消息
     *
     * @return
     */
    @GetMapping("/sendByTemplatePull")
    @ResponseBody
    public Object sendByTemplatePull() {

        List<Message> messageList = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            RocketMessage message = RocketMessage.builder().name("template pull消息" + LocalDateTime.now() + i).build();
            messageList.add(MessageBuilder.withPayload(message).build());
        }
        // syncSend
        SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.LITE_PULL_TEMPLATE_TOPIC, messageList);
        return sendResult;
    }


    /**
     * LitePullSubscribe
     *
     * @return
     */
    @GetMapping("/sendBySubscribePull")
    @ResponseBody
    public Object sendBySubscribePull() {

        List<Message> messageList = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            RocketMessage message = RocketMessage.builder().name("subscribe pull消息" + LocalDateTime.now() + i).build();
            messageList.add(MessageBuilder.withPayload(message).build());
        }
        // syncSend
        SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.LITE_PULL_SUBSCRIBE_TOPIC, messageList);
        return sendResult;
    }


    @GetMapping("/pullBySubscribe")
    @ResponseBody
    public void pullBySubscribe() {

        litePullSubscribeMsg.pull(20);
    }

    /**
     * LitePullAssign
     *
     * @return
     */
    @GetMapping("/sendByAssignPull")
    @ResponseBody
    public Object sendByAssignPull() {

        List<Message> messageList = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            RocketMessage message = RocketMessage.builder().name("assign pull消息" + LocalDateTime.now() + i).build();
            messageList.add(MessageBuilder.withPayload(message).build());
        }
        // syncSend
        SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.LITE_PULL_ASSIGN_TOPIC, messageList);
        return sendResult;
    }


    @GetMapping("/pullByAssign")
    @ResponseBody
    public void pullByAssign() {

        litePullAssignMsg.pull();
    }

7. 设置消费点位:

    /**
     * 消费点类型设置成:
     * ConsumeFromWhere.CONSUME_FROM_TIMESTAMP
     *
     * @return
     */
    @GetMapping("/sendMqByConsumePoint")
    @ResponseBody
    public Object sendMqByConsumePoint() {
        RocketMessage message = RocketMessage.builder().name("设置消费点位" + LocalDateTime.now()).build();
        // syncSend
        SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.CONSUME_POINT_TOPIC, message);
        return sendResult;
    }

8. 消费者手动应答:

   /**
     * @param mgs 0:会抛空指针,重试三次。
     * @return
     */
    @GetMapping("/sendMqByManualConfirm")
    @ResponseBody
    public Object sendMqByManualConfirm(Integer mgs) {
        RocketMessage message = RocketMessage.builder().name(mgs == null ? "消费者手动应答" + LocalDateTime.now() : String.valueOf(mgs)).build();
        // syncSend
        SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.MANUAL_CONFIRM_TOPIC, message);
        return sendResult;
    }

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐