springboot使用rocketmq-spring-boot-starter整合RocketMQ
前言
前面写过一片文章使用rocketmq-client整合RocketMQ的,这篇文章也不讲这些理论,理论还是前往RocketMQ消息类型或者其他往期文章,本文就如标题,纯粹的操一下rocketmq-spring-boot-starter这个玩意!
依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
这里就不能单纯使用rocketmq-client了,有很多API是rocketmq-spring-boot-starter提供的,虽然底层还是调用的rocketmq-client,下文会介绍!
通用消息体
@Setter
@Getter
@Accessors(chain = true)
@AllArgsConstructor
@NoArgsConstructor
public class MsgTest {
private int id;
private String context;
private Date date;
}
普通消息
同步消息
同步消息也就这些API,简单讲解一下!
//发送普通同步消息-Object
syncSend(String destination, Object payload)
//发送普通同步消息-Message
syncSend(String destination, Message<?> message)
//发送批量普通同步消息
syncSend(String destination, Collection<T> messages)
//发送普通同步消息-Object,并设置发送超时时间
syncSend(String destination, Object payload, long timeout)
//发送普通同步消息-Message,并设置发送超时时间
syncSend(String destination, Message<?> message, long timeout)
//发送批量普通同步消息,并设置发送超时时间
syncSend(String destination, Collection<T> messages, long timeout)
//发送普通同步延迟消息,并设置超时,这个下文会演示
syncSend(String destination, Message<?> message, long timeout, int delayLevel)
/**
* 同步消息-
*/
@Test
void syncSendStr() {
//syncSend和send是等价的
rocketMQTemplate.syncSend("first-topic-str", "hello world test1");
//send底层还是会调用syncSend的代码
rocketMQTemplate.send("first-topic-str", MessageBuilder.withPayload("hello world test1").build());
SendResult res = rocketMQTemplate.syncSend("first-topic-str:tag1", "hello world test2");
log.info("syncSend===>{}",res);
}
/**
* 同步消息-
*/
@Test
void syncSendPojo() {
MsgTest msg = new MsgTest(1,"hello world test3",new Date());
SendResult res = rocketMQTemplate.syncSend("first-topic-pojo", MessageBuilder.withPayload(msg).build());
log.info("syncSend===>{}",res);
}
这里存在两种消息体,一种是Object的,另一种是Message<?>的形式的,其实我们发送Object的时候,底层是有帮我们做转换的,其实和我们在上层调用
MessageBuilder.withPayload("hello world test1").build()
是一样的!源码如下
异步消息
//发送普通异步消息-Object
asyncSend(String destination, Object payload, SendCallback sendCallback)
//发送普通异步消息-Message
asyncSend(String destination, Message<?> message, SendCallback sendCallback)
//发送普通异步消息-Object,并设置发送超时时间
asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout)
//发送普通异步消息-Message,并设置发送超时时间
asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout)
//发送普通异步延迟消息,并设置超时,这个下文会演示
asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout,
int delayLevel)
/**
* 异步消息-String
* 指发送方发出数据后,不等接收方发回响应,接着发送下个数据包
* 关键实现异步发送回调接口(SendCallback)
* 在执行消息的异步发送时应用不需要等待服务器响应即可直接返回,通过回调接口接收务器响应,并对服务器的响应结果进行处理
* 这种方式任然需要返回发送消息任务的执行结果,异步不影响后续任务,不会造成阻塞
*/
@Test
void asyncSendStr() {
rocketMQTemplate.asyncSend("first-topic-str:tag1", "hello world test2 asyncSendStr", new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("异步消息发送成功:{}",sendResult);
}
@Override
public void onException(Throwable throwable) {
log.info("异步消息发送失败:{}",throwable.getMessage());
}
});
}
单向消息
这里普通单向消息就只有两个操作空间,这个不用多说了,一个是Object,另一个是Message
/**
* 单向消息
* 特点为只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答
* 此方式发送消息的过程耗时非常短,一般在微秒级别
* 应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集
*/
@Test
void sendOneWayStr() {
rocketMQTemplate.sendOneWay("first-topic-str:tag1", "hello world test2 sendOneWayStr");
log.info("单向消息已发送");
}
批量消息
/**
* 批量消息
*/
@Test
void asyncSendBatch() {
Message<String> msg = MessageBuilder.withPayload("hello world test1").build();
List<Message> msgList = Arrays.asList(msg,msg,msg,msg,msg);
SendResult res = rocketMQTemplate.syncSend("first-topic-str:tag1", msgList);
log.info("批量消息");
}
延迟消息
同步延迟消息
/**
* 同步延迟消息
* rocketMQ的延迟消息发送其实是已发送就已经到broker端了,然后消费端会延迟收到消息。
* RocketMQ 目前只支持固定精度的定时消息。
* 固定等级:1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
* 延迟的底层方法是用定时任务实现的。
*/
@Test
void syncSendDelayedStr() {
Message<String> message= MessageBuilder.withPayload("syncSendDelayedStr"+new Date()).build();
/**
* @param destination formats: `topicName:tags`
* @param message 消息体
* @param timeout 发送超时时间
* @param delayLevel 延迟级别 1到18
* @return {@link SendResult}
*/
SendResult res=rocketMQTemplate.syncSend("first-topic-str:tag1", message, 3000, 3);
log.info("res==>{}",res);
}
异步延迟消息
/**
* 异步延迟消息
*/
@Test
void asyncSendDelayedStr() {
//Callback
SendCallback sc=new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("发送异步延时消息成功");
}
@Override
public void onException(Throwable throwable) {
log.info("发送异步延时消息失败:{}",throwable.getMessage());
}
};
Message<String> message= MessageBuilder.withPayload("asyncSendDelayedStr").build();
rocketMQTemplate.asyncSend("first-topic-str:tag1", message, sc, 3000, 3);
}
顺序消息
理论铺垫请看RocketMQ顺序消息,这里使用rocketmq-spring-boot-starter发送顺序消息就比较方便了,不像使用rocket-client那样,需要手动获取RocketMQ中当前topic的队列个数然后再通过hashKey值,mqs.size()取模,得到一个索引值,这里底层都帮我们做好了处理!
/**
* 顺序消息
*/
@Test
void SendOrderStr() {
List<MsgTest> msgList = new ArrayList<>();
for (int i = 0; i < 100; i++) {
msgList.add(new MsgTest(100, "我是id为100的第"+(i+1)+"条消息", new Date()));
}
//msgList.add(new MsgTest(1, "我是id为1的第1条消息", new Date()));
//msgList.add(new MsgTest(2, "我是id为2的第1条消息", new Date()));
//msgList.add(new MsgTest(1, "我是id为1的第2条消息", new Date()));
//msgList.add(new MsgTest(1, "我是id为1的第3条消息", new Date()));
//msgList.add(new MsgTest(2, "我是id为2的第2条消息", new Date()));
//msgList.add(new MsgTest(2, "我是id为2的第3条消息", new Date()));
//msgList.add(new MsgTest(2, "我是id为2的第4条消息", new Date()));
//msgList.add(new MsgTest(2, "我是id为2的第5条消息", new Date()));
//msgList.add(new MsgTest(2, "我是id为2的第6条消息", new Date()));
//msgList.add(new MsgTest(2, "我是id为2的第7条消息", new Date()));
//msgList.add(new MsgTest(1, "我是id为1的第4条消息", new Date()));
//msgList.add(new MsgTest(1, "我是id为1的第5条消息", new Date()));
//msgList.add(new MsgTest(1, "我是id为1的第6条消息", new Date()));
//msgList.add(new MsgTest(1, "我是id为1的第7条消息", new Date()));
msgList.forEach(t ->{
//rocketMQTemplate.sendOneWayOrderly("first-topic-str:tag1", t,String.valueOf(t.getId()));
//rocketMQTemplate.syncSendOrderly("first-topic-str:tag1", t, String.valueOf(t.getId()));
rocketMQTemplate.asyncSendOrderly("first-topic-str:tag1", t,String.valueOf(t.getId()), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("异步消息发送成功:{}", sendResult);
}
@Override
public void onException(Throwable throwable) {
log.info("异步消息发送失败:{}", throwable.getMessage());
}
});
});
}
使用for循环100条数据,或者使用注释掉的代码其实都是一样的,说明一下使用for循环100是确定id一致的时候,通过hashKey会被分配到同一个队列中,如下
上面代码共测试了三总类型,同步,异步,单向,但是异步,单向好像顺序还是有问题,但是查看了数据,发现数据确实是在分派到一个队列,
至于原因,这个放在RocketMQ顺序消息这篇文章中统一讲!
事务消息
消费者
/**
* 事务消息 注意这里还有一个监听器 TransactionListenerImpl
*/
@Test
void sendTransactionStr() {
String[] tags = {"TAGA", "TAGB", "TAGC"};
for (int i = 0; i < 3; i++) {
Message<String> message=MessageBuilder.withPayload("事务消息===>"+i).build();
TransactionSendResult res=rocketMQTemplate.sendMessageInTransaction("transaction-str:"+tags[i], message, i+1);
if(res.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE)&&res.getSendStatus().equals(SendStatus.SEND_OK)){
log.info("事物消息发送成功");
}
log.info("事物消息发送结果:{}",res);
}
}
事务消息生产者端的消息监听器
@Slf4j
@Component
@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务
String tag = String.valueOf(msg.getHeaders().get("rocketmq_TAGS"));
if (StringUtils.equals("TAGA", tag)){
//这里只讲TAGA消息提交,状态为可执行
return RocketMQLocalTransactionState.COMMIT;
}else if (StringUtils.equals("TAGB", tag)) {
return RocketMQLocalTransactionState.ROLLBACK;
} else if (StringUtils.equals("TAGC",tag)) {
return RocketMQLocalTransactionState.UNKNOWN;
}
return RocketMQLocalTransactionState.UNKNOWN;
}
//mq回调检查本地事务执行情况
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
log.info("checkLocalTransaction===>{}",msg);
return RocketMQLocalTransactionState.COMMIT;
}
}
消费者
/**
* @description: 事务消息消费者
* @author TAO
* @date 2021/12/28 12:33 上午
*/
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "transaction-group", topic = "transaction-str",consumeMode = ConsumeMode.ORDERLY)
public class TransactionConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String str) {
log.info("===>"+str);
}
}
注意
我这里使用的rocketmq-spring-boot-starter版本时2.1.0,和老版本的写法是不同的,关于这部分可以查看rocketmq-spring-boot-starter 2.1.0 事务消息 txProducerGroup 移除解读
注意
上述API中带了超时时间的是总的耗时(包含重试n次的耗时) < sendMsgTimeout(发送消息时传入的参数)
更多推荐
所有评论(0)