一.为什么用Mq

   服务之间通信的中间件。 可以让应用之间解耦,相互之间依赖减小,形成异步调用。还可以用来流量削峰。数据分发。
   但是会有消息一致性问题,系统复杂性增加,如果Mq宕机,系统可用性会降低。

二.特点

  灵活可扩展,支持海量消息单机10万级别,使用文件做持久化, 并支持分布式事务(虽然可能造成较多的写脏), 异步刷盘,内存预分配, 高可用采用了同步双写及异步复制的方式

三.介绍


RocketMQ 是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给 Apache 软件基金会,并于2017年9月25日成为Apache 的顶级项目。作为经历过多次阿里巴巴双十一这种“超级工程”的洗礼并有稳定出色表现的国产中间件,以其高性能、低延时和高可靠等特性近年来已经也被越来越多的国内企业使用。

其主要功能有1.灵活可扩展性、2.海量消息堆积能力、3.支持顺序消息、4.多种消息过滤方式、5.支持事务消息、6.回溯消费等常用功能。

RocketMQ 核心的四大组件:
Name Server(消息的总控制)、是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。         在消息队列 RocketMQ 中提供命名服务,更新和发现 Broker 服务。
          NameServer即名称服务,两个功能:
    接收broker的请求,注册broker的路由信息
           接收client(producer/consumer)的请求,根据某个topic获取其到broker的路由信息
            NameServer没有状态,可以横向扩展。每个broker在启动的时候会到NameServer注册;          Producer在发送消息前会根据topic到NameServer获取路由(到broker)信息;Consumer也会定 时获取topic路由信息。
Broker(分发消息),消息中转角色,负责存储消息,转发消息,可以理解为消息队列服务器,提供了消息的接收、存储、拉取和转发服务。broker是RocketMQ的核心,它不不能挂的,所以需要保证broker的高可用。
Producer(生产者)
Consumer(消费者)
每个组件都可以部署成集群模式进行水平扩展。
消息由topic区分消息类型(一级分类):如订单消息,物流消息等
     tag为二级分类
message queue为消息类型下的消息队列。
用于并行发送和接受消息。


模式:broker分为 Master Broker 和 Slave Broker,一个 Master Broker 可以对应多个 Slave Broker,但是一个 Slave Broker 只能对应一个 Master Broker
    Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。

    每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server。Broker 启动后需要完成一次将自己注册至 Name Server 的操作;随后每隔 30s 定期向 Name Server 上报 Topic 路由信息。Master Broker 和其对应的 Slave     Broker会进行数据同步
   单master模式  可靠性低风险大,宕机后服务将不可用,线上线上慎用

   多master模式 配置简单,单master宕机或重启,其他master还可以继续提供服务
   
   多master多slaver(异步复制)(主从模式)即使磁盘损坏,因为还有从服务,只会丢失异步复制瞬间差的非常少量数据。性能和多master差不多


   多master多slaver(同步复制)(主从模式) 服务可用性和数据的可靠性都非常高,消息无延迟,丢失概率低。但是性能相对咯低。


  四.集群工作流程


1.启动NameServer, NameServer起来后监听端口,等待Broker. Producer. Consumer连 上来,相当于-个路由控制中心。

2. Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。

3.收发消息前,先创建Topic, 创建Topic时需要指定该Topic要存储在哪些Broker上, 也可以在发送消息时自动创建Topic.

4. Producer发送消息,启动时先跟NameServer集群中的其中- -台建立长连接, 并从NameServer中获取当前发送的Topic存 在哪些Broker上,轮询从队列列表中选择-一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。

5. Consumer跟Producer类似,跟其中-台NameServer建立长连接, 获取当前订阅Topic存在哪些Broker上, 然后直接跟Broker建立连接通道,开始消费消息。 

6.集群搭建

    多主多从 (主节点接受消息,从节点消费消息)

     一台机器可以有多个broker但只能配置一个nameserver
      2台机器配置分别为主1从2 ,主2从1 

7.消息发送分类:
   基本消息: 同步消息(性能要求相对低,可靠性要求高),异步消息(性能要求高,可靠性要求低),单向发送消息
    顺序消息   通过同一类型消息选择同一个队列保证消息的局部顺序性(先进先出)
    延时消息    设置延迟时间发送消息  消费者会等待生产者设置延迟时间后才能消费
   批量消息     把message放在list里面一起发送
  事务消息     三种状态  提交状态 回滚状态 中间状态  未知状态回查   创建事务生产者对象 创建事务监听器 执行事务状态, 向mqserver发送消息后不会立马执行,而是生成一个消费者不可见的半消息,然后回调发送者的事务监听器执行事务状态处理
   过滤消息    tag过滤 还有sql过滤

8.消费者广播模式和负载均衡模式 (默认为负载均衡模式)

广播模式:消息都能够被几个消费者消费


负载均衡模式:消息被几个消费者平均分配消费(不会重复消费)


9.先进先出 多队列模式
    保证局部数据的有序性,需要把同样数据放在同一个队列,比如根据订单号

五.代码示例

导包

  	<dependency>
			<groupId>org.apache.rocketmq</groupId>
			<artifactId>rocketmq-common</artifactId>
			<version>4.3.0</version>
		</dependency>

		<dependency>
			<groupId>org.apache.rocketmq</groupId>
			<artifactId>rocketmq-client</artifactId>
			<version>4.4.0</version>
		</dependency>
       	

生产者

package org.userMgs5002;

import java.util.ArrayList;
import java.util.List;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;

public class RocketMqTest {
	
	/**
	 * 生成生产者对象producer	
	 * @return DefaultMQProducer
	 */
	 public static DefaultMQProducer getRocketMQProducer() {
	        DefaultMQProducer producer;
	        producer = new DefaultMQProducer("test-demo");
	        producer.setNamesrvAddr("192.168.10.50:9876;192.168.10.22:9876");
	        producer.setRetryTimesWhenSendFailed(10000);
	        try {
	            producer.start();
	        } catch (MQClientException e) {
	            System.out.println(e);
	        }
	        return producer;
	    }
	 
	 /**
	  * 生成事务生产者对象producer
	  * @return TransactionMQProducer
	  */
	 public static  TransactionMQProducer getTransactionMQProducer(){
		 TransactionMQProducer  producer = new TransactionMQProducer("test-demo");
	        producer.setNamesrvAddr("192.168.10.50:9876;192.168.10.22:9876");
	        producer.setRetryTimesWhenSendFailed(10000);
	        //事务监听器
	        producer.setTransactionListener(new TransactionListener(){
	        	//执行本地事务
	        	//msg :消息对象
	        	//arg :人工参数
				@Override
				public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
					 if("test6".equals(msg.getTags())){
						 return LocalTransactionState.COMMIT_MESSAGE;
						 
					 }else if("test1".equals(msg.getTags())||"test2".equals(msg.getTags())||"test3".equals(msg.getTags())) {
						 return LocalTransactionState.ROLLBACK_MESSAGE;
					 }
					return LocalTransactionState.UNKNOW; //未知状态调回查方法
				}
				//消息事务状态回查方法
				@Override
				public LocalTransactionState checkLocalTransaction(MessageExt msg) {
					return LocalTransactionState.ROLLBACK_MESSAGE;
				}
	        });
	        
	        return producer;
	 }
	 
	 
	 
	 /*
	  * 发送同步消息 
	  */
	 public static void  sendSysMes() throws MQClientException, RemotingException, MQBrokerException, InterruptedException{
		  DefaultMQProducer producer = getRocketMQProducer();
			  Message msg = new Message();
			  	msg.setTopic("test-demo");
			  	msg.setTags("test1");
			  //msg.putUserProperty("i", String.valueOf(5));  设置人工属性 消费者可以用sql筛选
			  	msg.setBody("ok".getBytes());
			  	for (int i = 0; i < 10; i++) {
			  			SendResult s = producer.send(msg ,100000);
						System.out.println(s.getMsgId());
						System.out.println(s.getMessageQueue().getQueueId());
						System.out.println(s.getSendStatus());
					}
		
	  		producer.shutdown();
	 }
	 
	 /*
	  * 发送异步消息
	  */
	 public static void sendAysMes() throws MQClientException, RemotingException, MQBrokerException, InterruptedException{
		  DefaultMQProducer producer = getRocketMQProducer();
		  Message msg = new Message();
		  	msg.setTopic("test-demo");
		  	msg.setTags("test2");
		  	msg.setBody("ok".getBytes());
			for (int i = 0; i < 10; i++) {
					Thread.sleep(10000);
	  				producer.send(msg ,new SendCallback() {
					@Override
					public void onSuccess(SendResult arg0) {
						System.out.println("发送结果"+arg0);
					}
					@Override
					public void onException(Throwable arg0) {
						System.out.println("发送异常"+arg0);
						
					}
				});
			
			}
		 
	 }
	 
	 /*
	  * 发送单向消息	
	  */
	 public static void senddMsg() throws MQClientException, RemotingException, InterruptedException{
		  DefaultMQProducer producer = getRocketMQProducer();
		  Message msg = new Message();
		  	msg.setTopic("test-demo");
		  	msg.setTags("test3");
		  	msg.setBody("ok".getBytes());
		  	for (int i = 0; i < 10; i++) {
		  			producer.sendOneway(msg);;
				}
		  	producer.shutdown();
	 }
	 
	 
	 /*
	  * 发送顺序消息
	  */
	 public static void  sendSxMes() throws MQClientException, RemotingException, MQBrokerException, InterruptedException{
		  DefaultMQProducer producer = getRocketMQProducer();
			  Message msg = new Message();
			  	msg.setTopic("test-demo");
			  	msg.setTags("test4");
			  	
			  	
			  	for (int i = 10; i < 100; i++) {
				  	msg.setBody(String.valueOf(i).getBytes());
					Thread.sleep(2000);
			  		 //模式i取模相等为同一种消息放在同一个队列,形成局部消息有序性
			  		SendResult s  =producer.send(msg ,new MessageQueueSelector() {
							/**
							 * mqs:队列集合
							 * msg:消息对象
							 * arg: 业务参数相当于i,new MessageQueueSelector()的第三个参数
							 * select方法:用来选择队列
							 */
							@Override
							public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
								int i =(int)arg;
								int index = i%3;   //根据消息的id取模相等的设为同一种消息然后放在同一个队列
								return mqs.get(index);
							}
						} ,i);
			  		System.out.println(s.getMsgId());
					System.out.println(s.getMessageQueue().getQueueId());
					System.out.println(s.getSendStatus());
				}
		
	  		producer.shutdown();
	 }
	 
	 
	 /*
	  * 发送延迟消息
	  */
	 
	 public static void  sendYcMes() throws MQClientException, RemotingException, MQBrokerException, InterruptedException{
		  DefaultMQProducer producer = getRocketMQProducer();
			  Message msg = new Message();
			  	msg.setTopic("test-demo");
			  	msg.setTags("test5");
			  	msg.setBody("ok".getBytes());
			  	msg.setDelayTimeLevel(5);
			  	for (int i = 0; i < 10; i++) {
			  			SendResult s = producer.send(msg ,100000);
						System.out.println(s.getMsgId());
						System.out.println(s.getMessageQueue().getQueueId());
						System.out.println(s.getSendStatus());
					}
		
	  		producer.shutdown();
	 }
	 
	 /**
	  * 发送批量消息
	  * @param args
	  */
	 public static void  sendPlMes() throws MQClientException, RemotingException, MQBrokerException, InterruptedException{
		  DefaultMQProducer producer = getRocketMQProducer();
		  	List<Message> list = new ArrayList<Message>();
			  Message msg0 = new Message();
			  	msg0.setTopic("test-demo");
			  	msg0.setTags("test5");
			  	msg0.setBody("ok".getBytes());
			  Message msg1 = new Message();
			  	msg1.setTopic("test-demo");
			  	msg1.setTags("test5");
			  	msg1.setBody("ok".getBytes());
			  Message msg2 = new Message();
			  	msg2.setTopic("test-demo");
			  	msg2.setTags("test5");
			  	msg2.setBody("ok".getBytes());
			  	list.add(msg0);
			  	list.add(msg1);
			  	list.add(msg2);
			  SendResult s = producer.send(list ,100000);
			  System.out.println(s.getMsgId());
			  System.out.println(s.getMessageQueue().getQueueId());
			  System.out.println(s.getSendStatus());
	  		producer.shutdown();
	 }
	
	 /**
	  * 事务消息
	  * @param args
	 * @throws InterruptedException 
	 * @throws MQBrokerException 
	 * @throws RemotingException 
	 * @throws MQClientException 
	  */
	 public static void sendSwMes() throws MQClientException, RemotingException, MQBrokerException, InterruptedException{
		 TransactionMQProducer  producer = getTransactionMQProducer();
	        Message msg = new Message();
		  	msg.setTopic("test-demo");
		  	msg.setTags("test6");
		  //msg.putUserProperty("i", String.valueOf(5));  设置人工属性 消费者可以用sql筛选
		  	msg.setBody("ok".getBytes());
		  	for (int i = 0; i < 3; i++) {
		  			SendResult s = producer.sendMessageInTransaction(msg ,null);
					System.out.println(s.getMsgId());
					System.out.println(s.getMessageQueue().getQueueId());
					System.out.println(s.getSendStatus());
				}
	
  		producer.shutdown(); 
	 }
	 
	 
	 
	 public static void main(String[] args) {
		 try {
			//sendSysMes();
			//sendAysMes();
			//senddMsg();
			sendSxMes();
			//sendPlMes();
		} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
			e.printStackTrace();
		}
		 
	}	

}
package org.userMgs5002;

import java.util.List;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

public class RocketMqConumer {
	
	
	   /*
	    * 消费者监听器  
	    */
	  public static void consumeMessage() throws MQClientException{
			
		  	DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-demo");
			consumer.setNamesrvAddr("192.168.10.50:9876;192.168.10.22:9876");
			//consumer.subscribe("test-demo", "test1 || test2");
			consumer.subscribe("test-demo", "*"); //订阅主题topic以及tag  tag为*代表全部
			//consumer.setMessageModel(MessageModel.BROADCASTING);  //设置消费者模式  广播或者负载均衡模式   默认为负载均衡模式
	 
			consumer.registerMessageListener(new MessageListenerConcurrently() {
				@Override
				public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
					try {
						for (MessageExt message : msgs) {
							String msgbody = new String(message.getBody(), "utf-8");
							System.out.println("消息体内容为    "+msgbody+"           "+"详细信息: " + msgs);
							if (msgbody.equals("HelloWorld - RocketMQ")) {
								System.out.println("======错误=======");
							}
						}
					} catch (Exception e) {
						e.printStackTrace();
						if (msgs.get(0).getReconsumeTimes() == 3) {
							// 该条消息可以存储到DB或者LOG日志中,或其他处理方式
							return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
						} else {
							return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试
						}
					}
					return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
				}
			});
	 
			consumer.start();
			System.out.println("Consumer Started.");

		
			
	  }
	
	/*
	 * 顺序消费者监听器
	 */
	  public static void consumeMessagesxx() throws MQClientException{
			DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-demo");
			consumer.setNamesrvAddr("192.168.10.50:9876;192.168.10.22:9876");
			consumer.subscribe("test-demo", "test4"); //订阅主题topic以及tag  tag为*代表全部
			//consumer.setMessageModel(MessageModel.BROADCASTING);  //设置消费者模式  广播或者负载均衡模式   默认为负载均衡模式
	 
			consumer.registerMessageListener(new MessageListenerOrderly() {

				@Override
				public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
					  try{
							for (MessageExt message : msgs) {
								String msgbody = new String(message.getBody(), "utf-8");
								System.out.println("消息体内容为    "+msgbody+"           "+"详细信息: " + msgs);
								if (msgbody.equals("HelloWorld - RocketMQ")) {
									System.out.println("======错误=======");
								}
							}  
					  	}catch(Exception e){
							e.printStackTrace();
							if (msgs.get(0).getReconsumeTimes() == 3) {
								// 该条消息可以存储到DB或者LOG日志中,或其他处理方式
								return ConsumeOrderlyStatus.SUCCESS;// 成功
							} else {
								return ConsumeOrderlyStatus.ROLLBACK;// 重试
							}
					  	}
					 
					return ConsumeOrderlyStatus.SUCCESS;
				}
			});
	 
			consumer.start();
			System.out.println("Consumer Started.");

		
	  }
	      
	public static void main(String[] args) throws MQClientException {
			//consumeMessage();
			consumeMessagesxx();
		
	}
	

}


 

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐