前言

前面写过一片文章使用rocketmq-client整合RocketMQ的,这篇文章也不讲这些理论,理论还是前往RocketMQ消息类型或者其他往期文章,本文就如标题,纯粹的操一下rocketmq-spring-boot-starter这个玩意!

依赖

		<dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
        </dependency>

这里就不能单纯使用rocketmq-client了,有很多API是rocketmq-spring-boot-starter提供的,虽然底层还是调用的rocketmq-client,下文会介绍!

通用消息体

@Setter
@Getter
@Accessors(chain = true)
@AllArgsConstructor
@NoArgsConstructor
public class MsgTest {

    private int id;

    private String context;

    private Date date;

}

普通消息

同步消息
在这里插入图片描述
同步消息也就这些API,简单讲解一下!

//发送普通同步消息-Object
syncSend(String destination, Object payload)
//发送普通同步消息-Message
syncSend(String destination, Message<?> message)
//发送批量普通同步消息
syncSend(String destination, Collection<T> messages)
//发送普通同步消息-Object,并设置发送超时时间
syncSend(String destination, Object payload, long timeout)
//发送普通同步消息-Message,并设置发送超时时间
syncSend(String destination, Message<?> message, long timeout)
//发送批量普通同步消息,并设置发送超时时间
syncSend(String destination, Collection<T> messages, long timeout)
//发送普通同步延迟消息,并设置超时,这个下文会演示
syncSend(String destination, Message<?> message, long timeout, int delayLevel)
	/**
     * 同步消息-
     */
	@Test
    void syncSendStr() {
    	//syncSend和send是等价的
        rocketMQTemplate.syncSend("first-topic-str", "hello world test1");
        //send底层还是会调用syncSend的代码
        rocketMQTemplate.send("first-topic-str", MessageBuilder.withPayload("hello world test1").build());
        SendResult res = rocketMQTemplate.syncSend("first-topic-str:tag1", "hello world test2");
        log.info("syncSend===>{}",res);
    }


	/**
     * 同步消息-
     */
    @Test
    void syncSendPojo() {
        MsgTest msg = new MsgTest(1,"hello world test3",new Date());
        SendResult res = rocketMQTemplate.syncSend("first-topic-pojo", MessageBuilder.withPayload(msg).build());
        log.info("syncSend===>{}",res);
    }

这里存在两种消息体,一种是Object的,另一种是Message<?>的形式的,其实我们发送Object的时候,底层是有帮我们做转换的,其实和我们在上层调用

MessageBuilder.withPayload("hello world test1").build()

是一样的!源码如下
在这里插入图片描述

异步消息
在这里插入图片描述

//发送普通异步消息-Object
asyncSend(String destination, Object payload, SendCallback sendCallback)
//发送普通异步消息-Message
asyncSend(String destination, Message<?> message, SendCallback sendCallback)
//发送普通异步消息-Object,并设置发送超时时间
asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout)
//发送普通异步消息-Message,并设置发送超时时间
asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout)
//发送普通异步延迟消息,并设置超时,这个下文会演示
asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout,
        int delayLevel) 
	/**
     * 异步消息-String
     * 指发送方发出数据后,不等接收方发回响应,接着发送下个数据包
     * 关键实现异步发送回调接口(SendCallback)
     * 在执行消息的异步发送时应用不需要等待服务器响应即可直接返回,通过回调接口接收务器响应,并对服务器的响应结果进行处理
     * 这种方式任然需要返回发送消息任务的执行结果,异步不影响后续任务,不会造成阻塞
     */
     @Test
    void asyncSendStr() {
        rocketMQTemplate.asyncSend("first-topic-str:tag1", "hello world test2 asyncSendStr", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("异步消息发送成功:{}",sendResult);
            }
            @Override
            public void onException(Throwable throwable) {
                log.info("异步消息发送失败:{}",throwable.getMessage());
            }
        });
    }

单向消息
在这里插入图片描述
这里普通单向消息就只有两个操作空间,这个不用多说了,一个是Object,另一个是Message

	/**
     * 单向消息
     * 特点为只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答
     * 此方式发送消息的过程耗时非常短,一般在微秒级别
     * 应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集
     */
    @Test
    void sendOneWayStr() {
        rocketMQTemplate.sendOneWay("first-topic-str:tag1", "hello world test2 sendOneWayStr");
        log.info("单向消息已发送");
    }

批量消息

	/**
     * 批量消息
     */
    @Test
    void asyncSendBatch() {
        Message<String> msg = MessageBuilder.withPayload("hello world test1").build();
        List<Message> msgList = Arrays.asList(msg,msg,msg,msg,msg);
        SendResult res = rocketMQTemplate.syncSend("first-topic-str:tag1", msgList);
        log.info("批量消息");
    }

延迟消息

同步延迟消息

	/**
     * 同步延迟消息
     * rocketMQ的延迟消息发送其实是已发送就已经到broker端了,然后消费端会延迟收到消息。
     * RocketMQ 目前只支持固定精度的定时消息。
     * 固定等级:1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
     * 延迟的底层方法是用定时任务实现的。
     */
    @Test
    void syncSendDelayedStr() {
        Message<String> message= MessageBuilder.withPayload("syncSendDelayedStr"+new Date()).build();
        /**
         * @param destination formats: `topicName:tags`
         * @param message 消息体
         * @param timeout 发送超时时间
         * @param delayLevel 延迟级别  1到18
         * @return {@link SendResult}
         */
        SendResult res=rocketMQTemplate.syncSend("first-topic-str:tag1", message, 3000, 3);
        log.info("res==>{}",res);
    }

异步延迟消息

	 /**
     * 异步延迟消息
     */
    @Test
    void asyncSendDelayedStr() {
        //Callback
        SendCallback sc=new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("发送异步延时消息成功");
            }
            @Override
            public void onException(Throwable throwable) {
                log.info("发送异步延时消息失败:{}",throwable.getMessage());
            }
        };

        Message<String> message= MessageBuilder.withPayload("asyncSendDelayedStr").build();
        rocketMQTemplate.asyncSend("first-topic-str:tag1", message, sc, 3000, 3);
    }

顺序消息

理论铺垫请看RocketMQ顺序消息,这里使用rocketmq-spring-boot-starter发送顺序消息就比较方便了,不像使用rocket-client那样,需要手动获取RocketMQ中当前topic的队列个数然后再通过hashKey值,mqs.size()取模,得到一个索引值,这里底层都帮我们做好了处理!

	/**
     * 顺序消息
     */
    @Test
    void SendOrderStr() {
        List<MsgTest> msgList = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            msgList.add(new MsgTest(100, "我是id为100的第"+(i+1)+"条消息", new Date()));
        }
        //msgList.add(new MsgTest(1, "我是id为1的第1条消息", new Date()));
        //msgList.add(new MsgTest(2, "我是id为2的第1条消息", new Date()));
        //msgList.add(new MsgTest(1, "我是id为1的第2条消息", new Date()));
        //msgList.add(new MsgTest(1, "我是id为1的第3条消息", new Date()));
        //msgList.add(new MsgTest(2, "我是id为2的第2条消息", new Date()));
        //msgList.add(new MsgTest(2, "我是id为2的第3条消息", new Date()));
        //msgList.add(new MsgTest(2, "我是id为2的第4条消息", new Date()));
        //msgList.add(new MsgTest(2, "我是id为2的第5条消息", new Date()));
        //msgList.add(new MsgTest(2, "我是id为2的第6条消息", new Date()));
        //msgList.add(new MsgTest(2, "我是id为2的第7条消息", new Date()));
        //msgList.add(new MsgTest(1, "我是id为1的第4条消息", new Date()));
        //msgList.add(new MsgTest(1, "我是id为1的第5条消息", new Date()));
        //msgList.add(new MsgTest(1, "我是id为1的第6条消息", new Date()));
        //msgList.add(new MsgTest(1, "我是id为1的第7条消息", new Date()));
        msgList.forEach(t ->{
            //rocketMQTemplate.sendOneWayOrderly("first-topic-str:tag1", t,String.valueOf(t.getId()));
            //rocketMQTemplate.syncSendOrderly("first-topic-str:tag1", t, String.valueOf(t.getId()));
            rocketMQTemplate.asyncSendOrderly("first-topic-str:tag1", t,String.valueOf(t.getId()), new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    log.info("异步消息发送成功:{}", sendResult);
                }

                @Override
                public void onException(Throwable throwable) {
                    log.info("异步消息发送失败:{}", throwable.getMessage());
                }
            });

        });
    }

使用for循环100条数据,或者使用注释掉的代码其实都是一样的,说明一下使用for循环100是确定id一致的时候,通过hashKey会被分配到同一个队列中,如下
在这里插入图片描述
在这里插入图片描述
上面代码共测试了三总类型,同步,异步,单向,但是异步,单向好像顺序还是有问题,但是查看了数据,发现数据确实是在分派到一个队列,

在这里插入图片描述
至于原因,这个放在RocketMQ顺序消息这篇文章中统一讲!

事务消息

消费者

	/**
     * 事务消息  注意这里还有一个监听器 TransactionListenerImpl
     */
    @Test
    void sendTransactionStr() {

        String[] tags = {"TAGA", "TAGB", "TAGC"};
        for (int i = 0; i < 3; i++) {
            Message<String> message=MessageBuilder.withPayload("事务消息===>"+i).build();
            TransactionSendResult res=rocketMQTemplate.sendMessageInTransaction("transaction-str:"+tags[i], message, i+1);
            if(res.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)&&res.getSendStatus().equals(SendStatus.SEND_OK)){
                log.info("事物消息发送成功");
            }

            log.info("事物消息发送结果:{}",res);
        }
}

事务消息生产者端的消息监听器

@Slf4j
@Component
@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 执行本地事务
        String tag = String.valueOf(msg.getHeaders().get("rocketmq_TAGS"));
        if (StringUtils.equals("TAGA", tag)){
            //这里只讲TAGA消息提交,状态为可执行
            return RocketMQLocalTransactionState.COMMIT;
        }else if (StringUtils.equals("TAGB", tag)) {
            return RocketMQLocalTransactionState.ROLLBACK;
        } else if (StringUtils.equals("TAGC",tag)) {
            return RocketMQLocalTransactionState.UNKNOWN;
        }
        return RocketMQLocalTransactionState.UNKNOWN;
    }

	//mq回调检查本地事务执行情况
  	@Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        log.info("checkLocalTransaction===>{}",msg);
        return RocketMQLocalTransactionState.COMMIT;
    }
}

消费者

/**
 * @description: 事务消息消费者
 * @author TAO
 * @date 2021/12/28 12:33 上午
 */
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "transaction-group", topic = "transaction-str",consumeMode = ConsumeMode.ORDERLY)
public class TransactionConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String str) {
        log.info("===>"+str);
    }

}

注意
我这里使用的rocketmq-spring-boot-starter版本时2.1.0,和老版本的写法是不同的,关于这部分可以查看rocketmq-spring-boot-starter 2.1.0 事务消息 txProducerGroup 移除解读

注意

上述API中带了超时时间的是总的耗时(包含重试n次的耗时) < sendMsgTimeout(发送消息时传入的参数)

GitHub 加速计划 / sp / spring-boot
73.97 K
40.4 K
下载
spring-projects/spring-boot: 是一个用于简化Spring应用开发的框架。适合用于需要快速开发企业级Java应用的项目。特点是可以提供自动配置、独立运行和内置的Tomcat服务器,简化Spring应用的构建和部署。
最近提交(Master分支:1 个月前 )
fdf24c6c Closes gh-42976 6 天前
3c42ba8c * pr/42974: Fix copyright year of updated file Polish Closes gh-42974 6 天前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐