Rocketmq实例以及详解
一.为什么用Mq
服务之间通信的中间件。 可以让应用之间解耦,相互之间依赖减小,形成异步调用。还可以用来流量削峰。数据分发。
但是会有消息一致性问题,系统复杂性增加,如果Mq宕机,系统可用性会降低。
二.特点
灵活可扩展,支持海量消息单机10万级别,使用文件做持久化, 并支持分布式事务(虽然可能造成较多的写脏), 异步刷盘,内存预分配, 高可用采用了同步双写及异步复制的方式
三.介绍
RocketMQ 是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给 Apache 软件基金会,并于2017年9月25日成为Apache 的顶级项目。作为经历过多次阿里巴巴双十一这种“超级工程”的洗礼并有稳定出色表现的国产中间件,以其高性能、低延时和高可靠等特性近年来已经也被越来越多的国内企业使用。
其主要功能有1.灵活可扩展性、2.海量消息堆积能力、3.支持顺序消息、4.多种消息过滤方式、5.支持事务消息、6.回溯消费等常用功能。
RocketMQ 核心的四大组件:
Name Server(消息的总控制)、是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。 在消息队列 RocketMQ 中提供命名服务,更新和发现 Broker 服务。
NameServer即名称服务,两个功能:
接收broker的请求,注册broker的路由信息
接收client(producer/consumer)的请求,根据某个topic获取其到broker的路由信息
NameServer没有状态,可以横向扩展。每个broker在启动的时候会到NameServer注册; Producer在发送消息前会根据topic到NameServer获取路由(到broker)信息;Consumer也会定 时获取topic路由信息。
Broker(分发消息),消息中转角色,负责存储消息,转发消息,可以理解为消息队列服务器,提供了消息的接收、存储、拉取和转发服务。broker是RocketMQ的核心,它不不能挂的,所以需要保证broker的高可用。
Producer(生产者)、
Consumer(消费者) ,
每个组件都可以部署成集群模式进行水平扩展。
消息由topic区分消息类型(一级分类):如订单消息,物流消息等
tag为二级分类
message queue为消息类型下的消息队列。
用于并行发送和接受消息。
模式:broker分为 Master Broker 和 Slave Broker,一个 Master Broker 可以对应多个 Slave Broker,但是一个 Slave Broker 只能对应一个 Master Broker
Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。
每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server。Broker 启动后需要完成一次将自己注册至 Name Server 的操作;随后每隔 30s 定期向 Name Server 上报 Topic 路由信息。Master Broker 和其对应的 Slave Broker会进行数据同步
单master模式 可靠性低风险大,宕机后服务将不可用,线上线上慎用
多master模式 配置简单,单master宕机或重启,其他master还可以继续提供服务
多master多slaver(异步复制)(主从模式)即使磁盘损坏,因为还有从服务,只会丢失异步复制瞬间差的非常少量数据。性能和多master差不多
多master多slaver(同步复制)(主从模式) 服务可用性和数据的可靠性都非常高,消息无延迟,丢失概率低。但是性能相对咯低。
四.集群工作流程
1.启动NameServer, NameServer起来后监听端口,等待Broker. Producer. Consumer连 上来,相当于-个路由控制中心。
2. Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
3.收发消息前,先创建Topic, 创建Topic时需要指定该Topic要存储在哪些Broker上, 也可以在发送消息时自动创建Topic.
4. Producer发送消息,启动时先跟NameServer集群中的其中- -台建立长连接, 并从NameServer中获取当前发送的Topic存 在哪些Broker上,轮询从队列列表中选择-一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
5. Consumer跟Producer类似,跟其中-台NameServer建立长连接, 获取当前订阅Topic存在哪些Broker上, 然后直接跟Broker建立连接通道,开始消费消息。
6.集群搭建
多主多从 (主节点接受消息,从节点消费消息)
一台机器可以有多个broker但只能配置一个nameserver
2台机器配置分别为主1从2 ,主2从1
7.消息发送分类:
基本消息: 同步消息(性能要求相对低,可靠性要求高),异步消息(性能要求高,可靠性要求低),单向发送消息
顺序消息 通过同一类型消息选择同一个队列保证消息的局部顺序性(先进先出)
延时消息 设置延迟时间发送消息 消费者会等待生产者设置延迟时间后才能消费
批量消息 把message放在list里面一起发送
事务消息 三种状态 提交状态 回滚状态 中间状态 未知状态回查 创建事务生产者对象 创建事务监听器 执行事务状态, 向mqserver发送消息后不会立马执行,而是生成一个消费者不可见的半消息,然后回调发送者的事务监听器执行事务状态处理
过滤消息 tag过滤 还有sql过滤
8.消费者广播模式和负载均衡模式 (默认为负载均衡模式)
广播模式:消息都能够被几个消费者消费
负载均衡模式:消息被几个消费者平均分配消费(不会重复消费)
9.先进先出 多队列模式
保证局部数据的有序性,需要把同样数据放在同一个队列,比如根据订单号
五.代码示例
导包
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-common</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
生产者
package org.userMgs5002;
import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class RocketMqTest {
/**
* 生成生产者对象producer
* @return DefaultMQProducer
*/
public static DefaultMQProducer getRocketMQProducer() {
DefaultMQProducer producer;
producer = new DefaultMQProducer("test-demo");
producer.setNamesrvAddr("192.168.10.50:9876;192.168.10.22:9876");
producer.setRetryTimesWhenSendFailed(10000);
try {
producer.start();
} catch (MQClientException e) {
System.out.println(e);
}
return producer;
}
/**
* 生成事务生产者对象producer
* @return TransactionMQProducer
*/
public static TransactionMQProducer getTransactionMQProducer(){
TransactionMQProducer producer = new TransactionMQProducer("test-demo");
producer.setNamesrvAddr("192.168.10.50:9876;192.168.10.22:9876");
producer.setRetryTimesWhenSendFailed(10000);
//事务监听器
producer.setTransactionListener(new TransactionListener(){
//执行本地事务
//msg :消息对象
//arg :人工参数
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
if("test6".equals(msg.getTags())){
return LocalTransactionState.COMMIT_MESSAGE;
}else if("test1".equals(msg.getTags())||"test2".equals(msg.getTags())||"test3".equals(msg.getTags())) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.UNKNOW; //未知状态调回查方法
}
//消息事务状态回查方法
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
});
return producer;
}
/*
* 发送同步消息
*/
public static void sendSysMes() throws MQClientException, RemotingException, MQBrokerException, InterruptedException{
DefaultMQProducer producer = getRocketMQProducer();
Message msg = new Message();
msg.setTopic("test-demo");
msg.setTags("test1");
//msg.putUserProperty("i", String.valueOf(5)); 设置人工属性 消费者可以用sql筛选
msg.setBody("ok".getBytes());
for (int i = 0; i < 10; i++) {
SendResult s = producer.send(msg ,100000);
System.out.println(s.getMsgId());
System.out.println(s.getMessageQueue().getQueueId());
System.out.println(s.getSendStatus());
}
producer.shutdown();
}
/*
* 发送异步消息
*/
public static void sendAysMes() throws MQClientException, RemotingException, MQBrokerException, InterruptedException{
DefaultMQProducer producer = getRocketMQProducer();
Message msg = new Message();
msg.setTopic("test-demo");
msg.setTags("test2");
msg.setBody("ok".getBytes());
for (int i = 0; i < 10; i++) {
Thread.sleep(10000);
producer.send(msg ,new SendCallback() {
@Override
public void onSuccess(SendResult arg0) {
System.out.println("发送结果"+arg0);
}
@Override
public void onException(Throwable arg0) {
System.out.println("发送异常"+arg0);
}
});
}
}
/*
* 发送单向消息
*/
public static void senddMsg() throws MQClientException, RemotingException, InterruptedException{
DefaultMQProducer producer = getRocketMQProducer();
Message msg = new Message();
msg.setTopic("test-demo");
msg.setTags("test3");
msg.setBody("ok".getBytes());
for (int i = 0; i < 10; i++) {
producer.sendOneway(msg);;
}
producer.shutdown();
}
/*
* 发送顺序消息
*/
public static void sendSxMes() throws MQClientException, RemotingException, MQBrokerException, InterruptedException{
DefaultMQProducer producer = getRocketMQProducer();
Message msg = new Message();
msg.setTopic("test-demo");
msg.setTags("test4");
for (int i = 10; i < 100; i++) {
msg.setBody(String.valueOf(i).getBytes());
Thread.sleep(2000);
//模式i取模相等为同一种消息放在同一个队列,形成局部消息有序性
SendResult s =producer.send(msg ,new MessageQueueSelector() {
/**
* mqs:队列集合
* msg:消息对象
* arg: 业务参数相当于i,new MessageQueueSelector()的第三个参数
* select方法:用来选择队列
*/
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int i =(int)arg;
int index = i%3; //根据消息的id取模相等的设为同一种消息然后放在同一个队列
return mqs.get(index);
}
} ,i);
System.out.println(s.getMsgId());
System.out.println(s.getMessageQueue().getQueueId());
System.out.println(s.getSendStatus());
}
producer.shutdown();
}
/*
* 发送延迟消息
*/
public static void sendYcMes() throws MQClientException, RemotingException, MQBrokerException, InterruptedException{
DefaultMQProducer producer = getRocketMQProducer();
Message msg = new Message();
msg.setTopic("test-demo");
msg.setTags("test5");
msg.setBody("ok".getBytes());
msg.setDelayTimeLevel(5);
for (int i = 0; i < 10; i++) {
SendResult s = producer.send(msg ,100000);
System.out.println(s.getMsgId());
System.out.println(s.getMessageQueue().getQueueId());
System.out.println(s.getSendStatus());
}
producer.shutdown();
}
/**
* 发送批量消息
* @param args
*/
public static void sendPlMes() throws MQClientException, RemotingException, MQBrokerException, InterruptedException{
DefaultMQProducer producer = getRocketMQProducer();
List<Message> list = new ArrayList<Message>();
Message msg0 = new Message();
msg0.setTopic("test-demo");
msg0.setTags("test5");
msg0.setBody("ok".getBytes());
Message msg1 = new Message();
msg1.setTopic("test-demo");
msg1.setTags("test5");
msg1.setBody("ok".getBytes());
Message msg2 = new Message();
msg2.setTopic("test-demo");
msg2.setTags("test5");
msg2.setBody("ok".getBytes());
list.add(msg0);
list.add(msg1);
list.add(msg2);
SendResult s = producer.send(list ,100000);
System.out.println(s.getMsgId());
System.out.println(s.getMessageQueue().getQueueId());
System.out.println(s.getSendStatus());
producer.shutdown();
}
/**
* 事务消息
* @param args
* @throws InterruptedException
* @throws MQBrokerException
* @throws RemotingException
* @throws MQClientException
*/
public static void sendSwMes() throws MQClientException, RemotingException, MQBrokerException, InterruptedException{
TransactionMQProducer producer = getTransactionMQProducer();
Message msg = new Message();
msg.setTopic("test-demo");
msg.setTags("test6");
//msg.putUserProperty("i", String.valueOf(5)); 设置人工属性 消费者可以用sql筛选
msg.setBody("ok".getBytes());
for (int i = 0; i < 3; i++) {
SendResult s = producer.sendMessageInTransaction(msg ,null);
System.out.println(s.getMsgId());
System.out.println(s.getMessageQueue().getQueueId());
System.out.println(s.getSendStatus());
}
producer.shutdown();
}
public static void main(String[] args) {
try {
//sendSysMes();
//sendAysMes();
//senddMsg();
sendSxMes();
//sendPlMes();
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace();
}
}
}
package org.userMgs5002;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
public class RocketMqConumer {
/*
* 消费者监听器
*/
public static void consumeMessage() throws MQClientException{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-demo");
consumer.setNamesrvAddr("192.168.10.50:9876;192.168.10.22:9876");
//consumer.subscribe("test-demo", "test1 || test2");
consumer.subscribe("test-demo", "*"); //订阅主题topic以及tag tag为*代表全部
//consumer.setMessageModel(MessageModel.BROADCASTING); //设置消费者模式 广播或者负载均衡模式 默认为负载均衡模式
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt message : msgs) {
String msgbody = new String(message.getBody(), "utf-8");
System.out.println("消息体内容为 "+msgbody+" "+"详细信息: " + msgs);
if (msgbody.equals("HelloWorld - RocketMQ")) {
System.out.println("======错误=======");
}
}
} catch (Exception e) {
e.printStackTrace();
if (msgs.get(0).getReconsumeTimes() == 3) {
// 该条消息可以存储到DB或者LOG日志中,或其他处理方式
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
} else {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
/*
* 顺序消费者监听器
*/
public static void consumeMessagesxx() throws MQClientException{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-demo");
consumer.setNamesrvAddr("192.168.10.50:9876;192.168.10.22:9876");
consumer.subscribe("test-demo", "test4"); //订阅主题topic以及tag tag为*代表全部
//consumer.setMessageModel(MessageModel.BROADCASTING); //设置消费者模式 广播或者负载均衡模式 默认为负载均衡模式
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
try{
for (MessageExt message : msgs) {
String msgbody = new String(message.getBody(), "utf-8");
System.out.println("消息体内容为 "+msgbody+" "+"详细信息: " + msgs);
if (msgbody.equals("HelloWorld - RocketMQ")) {
System.out.println("======错误=======");
}
}
}catch(Exception e){
e.printStackTrace();
if (msgs.get(0).getReconsumeTimes() == 3) {
// 该条消息可以存储到DB或者LOG日志中,或其他处理方式
return ConsumeOrderlyStatus.SUCCESS;// 成功
} else {
return ConsumeOrderlyStatus.ROLLBACK;// 重试
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
public static void main(String[] args) throws MQClientException {
//consumeMessage();
consumeMessagesxx();
}
}
更多推荐
所有评论(0)