一、RocketMQ简介

RocektMQ是阿里巴巴在2012年开源的一个纯java、分布式消息中间件。RocektMQ具有低延迟、高性能、可靠性、万亿级别的容量和灵活的可伸缩性。2016年阿里巴巴将RocketMQ捐赠给Apache,2017年9月RocketMQ正式从Apache社区正式毕业,成为Apache顶级项目。

二、消息队列优点

消息队列的有应用解耦、流量削峰、消息分发等多种优点。
本文接下来详细介绍RocketMQ安装步骤。

三、单机安装步骤

1、环境要求

1)64bit OS
2)64bit JDK 1.8+;
3)Maven 3.2.x;
4)Git;
5)4g+ free disk for Broker server

2、下载

从https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.7.1下载4.7.1版本二进制包。

3、Linux环境安装

将二进制压缩包解压,进入bin目录。
首先启动Name Server,

nohup sh bin/mqnamesrv &

之后启动Broker:

nohup sh bin/mqbroker -n localhost:9876 &

接下来就可以收发消息了。
Linux环境安装比较简单,下面重点介绍windows环境安装。

4、Windows环境安装

下面的步骤是Windows10上进行的,要求必须安装powershell。
1)将二进制包解压到D:\rocketmq下
2)设置环境变量

ROCKETMQ_HOME=D:\rocketmq
NAMESRV_ADDR=localhost:9876

3)启动Name Server
打开powershell,执行如下命令:

.\bin\mqnamesrv.cmd

当出现下图时即为启动成功:
在这里插入图片描述
4)启动Broker
打开一个新的powershell,执行如下命令:

.\bin\mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true

出现下图表示启动成功。
在这里插入图片描述
日志文件中也可以看到启动成功的信息,默认情况下rocketMQ将日志打印在个人目录的logs\rocketmqlogs下。
可以修改conf目录下logback_broker.xml和logback_namesrv.xml的配置将日志打印在指定位置。

5、程序访问

生产者代码:

public class Producer {
    public static void main(String[] args) throws Exception{
        DefaultMQProducer producer = new DefaultMQProducer("producer-A");
        producer.setNamesrvAddr ("localhost:9876");
        producer.start() ;
        for (int i = 0 ; i < 100 ; i++) {
        //Create a Message instance, specify ng Top工 c tag and Message
            Message msg =new Message("TopicTest" /* Topic */ ,
                        "TagA" /* Tag*/ ,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            System.out.println("发送"+i);
            SendResult sendResult = producer.send(msg);
            System.out.println(sendResult);
        }
        producer.shutdown() ;
    }
}

消费者代码:

public class Consumer {
    public static void main(String[] args) throws MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-A");

        consumer.setNamesrvAddr("localhost:9876");

        consumer.subscribe("TopicTest", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

通过上面两个程序,生产者向broker发送了100个消息,消费者取出100个消息消费。
在执行上面程序之前需要使用如下命令创建topic:

mqadmin.cmd updateTopic -b localhost:10911 -t TopicTest

四、角色介绍

rocketMQ一共有四个角色:生产者、消费者、broker、NameServer。
Broker是消息的中转站,生产者将消息发送给broker,broker再将消息转发给消费者。为了高可用,broker可以搭建一主多从。
NameServer承担的是类似于注册中心的角色,生产者和消费者可以通过NameServer发现broker集群。NameServer可以以集群部署。

五、集群部署

在第三节搭建了单机模式,单机模式容易发生单点故障,本节介绍搭建NameServer和broker的集群模式。我是在一台机器上搭建集群,因此启动NameServer和broker使用不同的端口。
集群架构是两个NameServer,broker是两主两从。

1、搭建NameServer集群

为了能在一台机器上启动两个NameServer,需要修改NameServer的源代码。打开类NamesrvStartup,将84行设置监听端口修改为9877:

 nettyServerConfig.setListenPort(9877);

这样两个NameServer分别在端口9876和9877启动。
接下来在两个powershell分别执行.\bin\mqnamesrv.cmd的启动命令即可。

2、搭建broker集群

broker集群是两主两从,角色为两个master和两个slave。
rockerMQ在conf\2m-2s-sync目录下提供了搭建两主两从broker的配置文件,我在这些配置文件的基础上修改一下。
master-a:

namesrvAddr=127.0.0.1:9876;127.0.0.1:9877 #设置NameServer集群地址,中间分号分隔
brokerClusterName=DefaultCluster  #集群名字
brokerName=master-a  #Broker名字,Master和Slave通过使用相同的Broker名称来表明相互关
系,以说明某个Slave是哪个Master的Slave
brokerId=0 #一个Master Barker可以有多个Slave, 0表示Master,大于0表示不同Slave ID
deleteWhen=04  #与fileReservedTime参数呼应,表明在几点做消息删除动作,默认值04,表示凌晨4点执行
fileReservedTime=48 #在磁盘上保存消息的时长,单位是小时,自动删除超时的消息
brokerRole=SYNC_MASTER   #master角色
flushDiskType=ASYNC_FLUSH #表示刷盘策略
listenPort=10910  #设置broker的监听地址,因为代码中将监听端口写死,这里的设置不起作用
storePathRootDir=D:/rocketmq1/  #必须设置该值,而且必须是目录,mq启动会在该目录下创建lock文件,并对文件加锁,mq以此确保只有一个实例启动

master-b:

namesrvAddr=127.0.0.1:9876;127.0.0.1:9877//设置NameServer集群地址,中间分号分隔
brokerClusterName=DefaultCluster
brokerName=master-b
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER   //master角色
flushDiskType=ASYNC_FLUSH
listenPort=10911  //设置broker的监听地址
storePathRootDir=D:/rocketmq2/

slave-a:

namesrvAddr=127.0.0.1:9876;127.0.0.1:9877//设置NameServer集群地址,中间分号分隔
brokerClusterName=DefaultCluster
brokerName=master-a
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
listenPort=10912  //设置broker的监听地址
storePathRootDir=D:/rocketmq3/

slave-b:

namesrvAddr=127.0.0.1:9876;127.0.0.1:9877//设置NameServer集群地址,中间分号分隔
brokerClusterName=DefaultCluster
brokerName=master-b
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
listenPort=10913  //设置broker的监听地址
storePathRootDir=D:/rocketmq4/

rocketMQ在代码里面将监听端口设置为常量10911,所以配置文件中的listenPort属性其实不起作用,由此带来一个问题是,在一台机器上启动多个broker会报端口被占用,因此需要修改源代码将不同broker的监听端口设置为不同的值。我修改源代码将端口分别设置为:master-a:10911,master-b:20911,slave-a:30911,slav-a:40911。设置的端口之间的距离一定要大,因为broker还监听了其他端口,其他端口是在10911的基础上增减一个值得到的。
配置文件准备好后,下面就可以通过命令启动了,在启动前,还需通过powershell设置每个MQ实例的ROCKETMQ_HOME环境变量。使用如下命令:

$Env:ROCKETMQ_HOME="D:\rocketmq1" #目录后面数字变化,每个实例使用不同的ROCKETMQ目录

设置好环境变量后就可以使用如下命令分别启动四个broker:

.\bin\mqbroker.cmd -c 配置文件地址

这样一个高可用的MQ集群搭建好了。

3、灾备自动切换测试

我首先启动master-a,slave-a和master-b,然后启动生产者发送消息可以看到如下日志:

SendResult [sendStatus=SEND_OK, msgId=C0A8016833C018B4AAC27B42017A0004, offsetMsgId=C0A8380100002A9F000000000000A404, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=6], queueOffset=12]

消息是发送成功的,此时我关闭master-a,可以看到broker自动切换到了broker-b,由于我没有启动slave-b,所以sendStatus=SLAVE_NOT_AVAILABLE:

SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=C0A8016833C018B4AAC27B4257EE0006, offsetMsgId=C0A8380100009FCF000000000000A1A9, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b, queueId=7], queueOffset=12]

我启动slave-b,可以看到此时日志正常了:

SendResult [sendStatus=SEND_OK, msgId=C0A8016833C018B4AAC27B45E883001D, offsetMsgId=C0A8380100009FCF000000000000B3CB, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b, queueId=5], queueOffset=15]

master和slave的角色是不可以互换的,如果master宕机了,生产者无法将消息发送到broker,消费者会自动切换到slave读取数据。

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐