RocketMQ基础学习
·
前言:
RocketMQ阿里开源的,一款分布式的消息中间件,它经过阿里的生产环境的高并发、高吞吐的考验,同时,还支持分布式事务等场景。RocketMQ使用Java语言进行开发,方便Java开发者学习源码。但是,RocketMQ设计相对复杂,官方文档不是很完善,不太适合中小公司引用。技多不压身,作为一个好的Coder,应该多学习一下优秀的框架。本篇主要介绍一下,RocketMQ的基础用法:
正文:
这里我主要通过代码来介绍一下RocketMQ的使用,首先介绍一下RocketMQ的原生写法,然后介绍基于Springboot体系下,RocketMQ生产者的消息发送、消费者的消息接受等写法。
一、Java原生的消息发送与接收写法:
1. 生产者:
/**
* 〈一句话功能简述〉<br>
* 〈〉
*
* @author hanxinghua
* @create 2022/9/30
* @since 1.0.0
*/
@Slf4j
public class Producer {
public static void main(String[] args) throws Exception {
// 实例化消息生产者
DefaultMQProducer producer = new DefaultMQProducer(RocketConstant.ORIGINAL_PRODUCER_GROUP);
// 设置NameServer的地址
producer.setNamesrvAddr(RocketConstant.NAME_SERVER_ADDR);
// 启动Producer实例
producer.start();
// 创建消息,设置 Topic、Tag、keys、flag、消息体等
Message message = new Message(RocketConstant.ORIGINAL_TOPIC, ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息到一个Broker
SendResult sendResult = producer.send(message);
// 通过sendResult返回消息是否成功送达
log.info("消息发送成功:{}", sendResult);
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}
2. 消费者:
/**
* 〈一句话功能简述〉<br>
* 〈〉
*
* @author hanxinghua
* @create 2022/9/30
* @since 1.0.0
*/
@Slf4j
public class Consumer {
public static void main(String[] args) throws Exception {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketConstant.ORIGINAL_CONSUMER_GROUP);
// 设置NameServer的地址
consumer.setNamesrvAddr(RocketConstant.NAME_SERVER_ADDR);
// 订阅一个或多个Topic,用Tag来过滤需要消费的消息,这里指定*表示接收所有Tag的消息
consumer.subscribe(RocketConstant.ORIGINAL_TOPIC, "*");
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExts, ConsumeConcurrentlyContext context) {
log.info("{}收到消息:{}", this.getClass().getSimpleName(), messageExts);
// 标记该消息已经被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者实例
consumer.start();
log.info("消费者启动成功!");
}
}
二、基于Springboot体系的消息发送写法:
1.普通消息发送:
具体有三种形式,主要包括:同步、异步、单向。
/**
* 发送普通同步消息
*
* @return
*/
@GetMapping("/sendMqBySync")
@ResponseBody
public Object sendMqBySync() {
RocketMessage message = RocketMessage.builder().name("普通同步消息" + LocalDateTime.now()).build();
// syncSend
SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.COMMON_TOPIC, message);
return sendResult;
}
/**
* 发送异步消息
*
* @return
*/
@GetMapping("/sendMqByAsync")
@ResponseBody
public Object sendMqByAsync() {
RocketMessage message = RocketMessage.builder().name("异步消息" + LocalDateTime.now()).build();
// asyncSend
rocketMQTemplate.asyncSend(RocketConstant.ASYNC_TOPIC, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
// 处理相应的业务
log.info("发送成功:{}", JSON.toJSONString(sendResult));
}
@Override
public void onException(Throwable throwable) {
// 处理相应的业务
log.info("发送异常:{}", throwable);
}
});
return null;
}
/**
* 发送单向消息
* <p>
* 这种方式主要用在不特别关心发送结果的场景,例如日志发送
*
* @return
*/
@GetMapping("/sendMqByOneWay")
@ResponseBody
public Object sendMqByOneWay() {
RocketMessage message = RocketMessage.builder().name("单向消息" + LocalDateTime.now()).build();
// sendOneWay
rocketMQTemplate.sendOneWay(RocketConstant.ONE_WAY_TOPIC, message);
return null;
}
2.顺序消息发送:
具体有两种形式,主要包括:普通顺序、严格顺序。
/**
* 发送普通顺序消息
*
* @return
*/
@GetMapping("/sendMqByOrder")
@ResponseBody
public Object sendMqByOrder() {
List<SendResult> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
RocketMessage message = RocketMessage.builder().name("普通顺序消息" + LocalDateTime.now() + i).build();
// syncSendOrderly
SendResult sendResult = rocketMQTemplate.syncSendOrderly(RocketConstant.COMMON_TOPIC, message, "hashkey");
results.add(sendResult);
}
return results;
}
/**
* 发送严格顺序消息
* <p>
* 概念:
* 顺序消息是一种对消息发送和消费顺序有严格要求的消息
* <p>
* 生产顺序性:
* RocketMQ通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化。如需保证消息生产的顺序性,则必须满足以下条件:
* 单一生产者: 消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的分区键,不同生产者之间产生的消息也无法判定其先后顺序。
* 串行发送:生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。
*
* @return
*/
@GetMapping("/sendMqByStrictOrder")
@ResponseBody
public Object sendMqByStrictOrder() {
List<SendResult> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
RocketMessage message = RocketMessage.builder().name("严格顺序消息" + LocalDateTime.now() + i).build();
// syncSendOrderly
SendResult sendResult = rocketMQTemplate.syncSendOrderly(RocketConstant.STRICT_ORDER_TOPIC, message, "hashkey");
results.add(sendResult);
}
return results;
}
3. 延迟消息发送:
/**
* 发送延时消息
* <p>
* 延时消息的使用限制
* private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
* 现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18,
* 消息消费失败会进入延时消息队列
*
* @return
*/
@GetMapping("/sendMqByDelay")
@ResponseBody
public Object sendMqByDelay() {
RocketMessage message = RocketMessage.builder().name("延时消息" + LocalDateTime.now()).build();
// syncSend(... int delayLevel)
SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.DELAY_TOPIC, MessageBuilder.withPayload(message).build(), 2000, 4);
return sendResult;
}
4. 批量消息发送:
/**
* 批量发送消息
* <p>
* 在对吞吐率有一定要求的情况下,RocketMQ可以将一些消息聚成一批以后进行发送,
* 可以增加吞吐率,并减少API和网络调用次数
*
* @return
*/
@GetMapping("/sendMqByBatch")
@ResponseBody
public Object sendMqByBatch() {
List<Message> messageList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
RocketMessage message = RocketMessage.builder().name("批量消息" + LocalDateTime.now() + i).build();
messageList.add(MessageBuilder.withPayload(message).build());
}
// syncSend
SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.COMMON_TOPIC, messageList);
return sendResult;
}
5. 事务消息发送:
/**
* 发送事务消息(半消息)
* <p>
* 仅仅只是保证本地事务和MQ消息发送形成整体的原子性,而投递到MQ服务器后,
* 并无法保证消费者一定能消费成功
* <p>
*
* @return
*/
@GetMapping("/sendMqByTx")
@ResponseBody
public Object sendMqByTx(Integer type, Integer msgKey) {
String transactionId = UUID.randomUUID().toString();
No6LocalTransactionOriginalSyntaxListener transactionListener = new No6LocalTransactionOriginalSyntaxListener();
TransactionMQProducer producer = new TransactionMQProducer(RocketConstant.TX_PRODUCER_GROUP);
try {
producer.setTransactionListener(transactionListener);
producer.setNamesrvAddr(RocketConstant.NAME_SERVER_ADDR);
producer.start();
log.info("transactionId is {}", transactionId);
org.apache.rocketmq.common.message.Message msg = new org.apache.rocketmq.common.message.Message(RocketConstant.TX_TOPIC,
("事务消息" + LocalDateTime.now()).getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.getProperties().put(RocketMQHeaders.TRANSACTION_ID, transactionId);
msg.getProperties().put("type", String.valueOf(type));
msg.getProperties().put("msgKey", String.valueOf(msgKey));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
return sendResult;
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.shutdown();
}
return null;
}
6. 带Tag消息发送:
/**
* 发送带Tag消息
* <p>
* Tag(标签)可以看作子主题,它是消息的第二级类型
* 通过 RocketMQTemplate发送带Tag的消息,只需要将topic和tag中间通过【:】冒号连接即可
*
* @return
*/
@GetMapping("/sendMqWithTag")
@ResponseBody
public Object sendMqWithTag() {
RocketMessage message = RocketMessage.builder().name("tag消息" + LocalDateTime.now()).build();
SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.TAG_TOPIC + ":" + RocketConstant.TAG_EXPRESSION, message);
return sendResult;
}
三、基于Springboot体系的消息接收写法:
1. MessageModel(消息模型):
消息模型有两种,主要包括:集群消费与广播消费
/**
* 发送集群或广播消息
* <p>
* 广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。
*
* @param MessageModel 0:CLUSTERING 1:BROADCASTING
* @return
*/
@GetMapping("/sendMqByMessageModel")
@ResponseBody
public Object sendMqByMessageModel(@RequestParam Integer MessageModel) {
List<SendResult> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
if (MessageModel == 0) {
RocketMessage message = RocketMessage.builder().name("消息模型-集群消费" + LocalDateTime.now() + i).build();
SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.COMMON_TOPIC, message);
results.add(sendResult);
} else {
RocketMessage message = RocketMessage.builder().name("消息模型-广播消费" + LocalDateTime.now() + i).build();
SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.BROADCAST_TOPIC, message);
results.add(sendResult);
}
}
return results;
}
2. ConsumeMode(消费模型):
消费模型有两种,主要包括:并发消费与顺序消费
/**
* 并发消费与顺序消费
*
* @param consumeMode 0:CONCURRENTLY 1:ORDERLY
* @return
*/
@GetMapping("/sendMqByConsumeMode")
@ResponseBody
public Object sendMqByConsumeMode(Integer consumeMode) {
List<SendResult> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
if (consumeMode == 0) {
RocketMessage message = RocketMessage.builder().name("消费模型-并发消费" + LocalDateTime.now() + i).build();
SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.COMMON_TOPIC, message);
results.add(sendResult);
} else {
RocketMessage message = RocketMessage.builder().name("消费模型-顺序消费" + LocalDateTime.now() + i).build();
SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.STRICT_ORDER_TOPIC, message);
results.add(sendResult);
}
}
return results;
}
3. 消息过滤:
消息过滤有两种方式,主要包括:Tag过滤与SQL92过滤
/**
* Tag过滤与SQL92过滤
* <p>
* Tag过滤:
* 消费者订阅的Tag和发送者设置的消息Tag相互匹配,则消息被投递给消费端进行消费。
* SQL92过滤:
* 发送者设置Tag或自定义消息属性,消费者订阅满足SQL92过滤表达式的消息被投递给消费端进行消费。
* 开启对SQL语法的支持(broker.conf):
* enablePropertyFilter = true
*
* @param filterMode 0:Tag过滤、1:SQL92过滤Tag、2:SQL92过滤自定义消息属性
* @return
*/
@GetMapping("/sendMqByFilterMode")
@ResponseBody
public Object sendMqByFilterMode(Integer filterMode) {
if (filterMode == 0) {
RocketMessage message = RocketMessage.builder().name("消费过滤-Tag过滤" + LocalDateTime.now()).build();
SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.TAG_TOPIC + ":" + RocketConstant.TAG_EXPRESSION, message);
return sendResult;
} else if (filterMode == 1) {
RocketMessage message = RocketMessage.builder().name("消费过滤-SQL92过滤Tag" + LocalDateTime.now()).build();
SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.SQL92_TOPIC + ":" + RocketConstant.SQL92_TAG_EXPRESSION, message);
return sendResult;
} else {
// Message msg = new Message("topic", "tagA", "Hello MQ".getBytes())
// 设置自定义属性A,属性值为1。-> msg.putUserProperties("a", "1")
// RocketMQTemplate 目前好像不支持这种写法
RocketMessage message = RocketMessage.builder().name("消费过滤-SQL92过滤自定义消息属性" + LocalDateTime.now()).build();
Map<String, Object> map = new HashMap<>();
map.put("a", 1);
MessageHeaders messageHeaders = new MessageHeaders(map);
SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.SQL92_PROPERTIES_TOPIC, MessageBuilder.createMessage(message, messageHeaders));
return sendResult;
}
}
4. 消息重试与死信队列:
/**
* 消息重试与死信队列
* <p>
* 1. 消息重试只针对集群消费模式生效;广播消费模式不提供失败重试特性,即消费失败后,
* 失败消息不再重试,继续消费新的消息。
* 2. 一条消息初次消费失败后,会自动进行消息重试,达到最大重试次数后,将其发送到该消费者对应的死信队列,
* 这类消息称为死信消息(Dead-Letter Message)。死信队列是死信Topic下,分区数唯一的单独队列。
* 3. 如果产生了死信消息,那对应的ConsumerGroup的死信Topic名称为%DLQ%ConsumerGroupName,
* 死信队列的消息将不会再被消费。
* 4. 可以利用RocketMQ Admin工具或者RocketMQ Dashboard上查询到对应死信消息的信息。
*
* @return
*/
@GetMapping("/sendMqByRetry")
@ResponseBody
public Object sendMqByRetry() {
RocketMessage message = RocketMessage.builder().name("消息重试" + LocalDateTime.now()).build();
// syncSend
SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.RETRY_TOPIC, message);
return sendResult;
}
5. 消息应答:
/**
* 消息应答
*
* @return
*/
@GetMapping("/sendByReply")
@ResponseBody
public Object sendByReply() {
RocketMessage message = RocketMessage.builder().name("消息应答" + LocalDateTime.now()).build();
RocketMessage receiveMessage = rocketMQTemplate.sendAndReceive(RocketConstant.REPLY_TOPIC, message, RocketMessage.class);
return receiveMessage;
}
6. Pull消费:
Pull消费包括两种方式,主要包括:原始Pull Consumer与Lite Pull Consumer)
/**
* 原始Pull Consumer的消息发送
*
* @return
*/
@GetMapping("/sendByOriginalPull")
@ResponseBody
public Object sendByOriginalPull() {
List<Message> messageList = new ArrayList<>();
for (int i = 0; i < 100; i++) {
RocketMessage message = RocketMessage.builder().name("原始pull消息" + LocalDateTime.now() + i).build();
messageList.add(MessageBuilder.withPayload(message).build());
}
// syncSend
SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.PULL_ORIGINAL_TOPIC, messageList);
return sendResult;
}
@GetMapping("/pullByOriginal")
@ResponseBody
public void pullByOriginal() {
pullMgsOriginal.pull(0, 2);
}
/**
* 使用rocketMQTemplate拉取消息
*
* @return
*/
@GetMapping("/sendByTemplatePull")
@ResponseBody
public Object sendByTemplatePull() {
List<Message> messageList = new ArrayList<>();
for (int i = 0; i < 100; i++) {
RocketMessage message = RocketMessage.builder().name("template pull消息" + LocalDateTime.now() + i).build();
messageList.add(MessageBuilder.withPayload(message).build());
}
// syncSend
SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.LITE_PULL_TEMPLATE_TOPIC, messageList);
return sendResult;
}
/**
* LitePullSubscribe
*
* @return
*/
@GetMapping("/sendBySubscribePull")
@ResponseBody
public Object sendBySubscribePull() {
List<Message> messageList = new ArrayList<>();
for (int i = 0; i < 100; i++) {
RocketMessage message = RocketMessage.builder().name("subscribe pull消息" + LocalDateTime.now() + i).build();
messageList.add(MessageBuilder.withPayload(message).build());
}
// syncSend
SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.LITE_PULL_SUBSCRIBE_TOPIC, messageList);
return sendResult;
}
@GetMapping("/pullBySubscribe")
@ResponseBody
public void pullBySubscribe() {
litePullSubscribeMsg.pull(20);
}
/**
* LitePullAssign
*
* @return
*/
@GetMapping("/sendByAssignPull")
@ResponseBody
public Object sendByAssignPull() {
List<Message> messageList = new ArrayList<>();
for (int i = 0; i < 100; i++) {
RocketMessage message = RocketMessage.builder().name("assign pull消息" + LocalDateTime.now() + i).build();
messageList.add(MessageBuilder.withPayload(message).build());
}
// syncSend
SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.LITE_PULL_ASSIGN_TOPIC, messageList);
return sendResult;
}
@GetMapping("/pullByAssign")
@ResponseBody
public void pullByAssign() {
litePullAssignMsg.pull();
}
7. 设置消费点位:
/**
* 消费点类型设置成:
* ConsumeFromWhere.CONSUME_FROM_TIMESTAMP
*
* @return
*/
@GetMapping("/sendMqByConsumePoint")
@ResponseBody
public Object sendMqByConsumePoint() {
RocketMessage message = RocketMessage.builder().name("设置消费点位" + LocalDateTime.now()).build();
// syncSend
SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.CONSUME_POINT_TOPIC, message);
return sendResult;
}
8. 消费者手动应答:
/**
* @param mgs 0:会抛空指针,重试三次。
* @return
*/
@GetMapping("/sendMqByManualConfirm")
@ResponseBody
public Object sendMqByManualConfirm(Integer mgs) {
RocketMessage message = RocketMessage.builder().name(mgs == null ? "消费者手动应答" + LocalDateTime.now() : String.valueOf(mgs)).build();
// syncSend
SendResult sendResult = rocketMQTemplate.syncSend(RocketConstant.MANUAL_CONFIRM_TOPIC, message);
return sendResult;
}
更多推荐
已为社区贡献2条内容
所有评论(0)