RocketMQ安装部署详解
文章目录
一、RocketMQ简介
RocektMQ是阿里巴巴在2012年开源的一个纯java、分布式消息中间件。RocektMQ具有低延迟、高性能、可靠性、万亿级别的容量和灵活的可伸缩性。2016年阿里巴巴将RocketMQ捐赠给Apache,2017年9月RocketMQ正式从Apache社区正式毕业,成为Apache顶级项目。
二、消息队列优点
消息队列的有应用解耦、流量削峰、消息分发等多种优点。
本文接下来详细介绍RocketMQ安装步骤。
三、单机安装步骤
1、环境要求
1)64bit OS
2)64bit JDK 1.8+;
3)Maven 3.2.x;
4)Git;
5)4g+ free disk for Broker server
2、下载
从https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.7.1下载4.7.1版本二进制包。
3、Linux环境安装
将二进制压缩包解压,进入bin目录。
首先启动Name Server,
nohup sh bin/mqnamesrv &
之后启动Broker:
nohup sh bin/mqbroker -n localhost:9876 &
接下来就可以收发消息了。
Linux环境安装比较简单,下面重点介绍windows环境安装。
4、Windows环境安装
下面的步骤是Windows10上进行的,要求必须安装powershell。
1)将二进制包解压到D:\rocketmq下
2)设置环境变量
ROCKETMQ_HOME=D:\rocketmq
NAMESRV_ADDR=localhost:9876
3)启动Name Server
打开powershell,执行如下命令:
.\bin\mqnamesrv.cmd
当出现下图时即为启动成功:
4)启动Broker
打开一个新的powershell,执行如下命令:
.\bin\mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true
出现下图表示启动成功。
日志文件中也可以看到启动成功的信息,默认情况下rocketMQ将日志打印在个人目录的logs\rocketmqlogs下。
可以修改conf目录下logback_broker.xml和logback_namesrv.xml的配置将日志打印在指定位置。
5、程序访问
生产者代码:
public class Producer {
public static void main(String[] args) throws Exception{
DefaultMQProducer producer = new DefaultMQProducer("producer-A");
producer.setNamesrvAddr ("localhost:9876");
producer.start() ;
for (int i = 0 ; i < 100 ; i++) {
//Create a Message instance, specify ng Top工 c tag and Message
Message msg =new Message("TopicTest" /* Topic */ ,
"TagA" /* Tag*/ ,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
System.out.println("发送"+i);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
producer.shutdown() ;
}
}
消费者代码:
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-A");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
通过上面两个程序,生产者向broker发送了100个消息,消费者取出100个消息消费。
在执行上面程序之前需要使用如下命令创建topic:
mqadmin.cmd updateTopic -b localhost:10911 -t TopicTest
四、角色介绍
rocketMQ一共有四个角色:生产者、消费者、broker、NameServer。
Broker是消息的中转站,生产者将消息发送给broker,broker再将消息转发给消费者。为了高可用,broker可以搭建一主多从。
NameServer承担的是类似于注册中心的角色,生产者和消费者可以通过NameServer发现broker集群。NameServer可以以集群部署。
五、集群部署
在第三节搭建了单机模式,单机模式容易发生单点故障,本节介绍搭建NameServer和broker的集群模式。我是在一台机器上搭建集群,因此启动NameServer和broker使用不同的端口。
集群架构是两个NameServer,broker是两主两从。
1、搭建NameServer集群
为了能在一台机器上启动两个NameServer,需要修改NameServer的源代码。打开类NamesrvStartup,将84行设置监听端口修改为9877:
nettyServerConfig.setListenPort(9877);
这样两个NameServer分别在端口9876和9877启动。
接下来在两个powershell分别执行.\bin\mqnamesrv.cmd的启动命令即可。
2、搭建broker集群
broker集群是两主两从,角色为两个master和两个slave。
rockerMQ在conf\2m-2s-sync目录下提供了搭建两主两从broker的配置文件,我在这些配置文件的基础上修改一下。
master-a:
namesrvAddr=127.0.0.1:9876;127.0.0.1:9877 #设置NameServer集群地址,中间分号分隔
brokerClusterName=DefaultCluster #集群名字
brokerName=master-a #Broker名字,Master和Slave通过使用相同的Broker名称来表明相互关
系,以说明某个Slave是哪个Master的Slave
brokerId=0 #一个Master Barker可以有多个Slave, 0表示Master,大于0表示不同Slave ID
deleteWhen=04 #与fileReservedTime参数呼应,表明在几点做消息删除动作,默认值04,表示凌晨4点执行
fileReservedTime=48 #在磁盘上保存消息的时长,单位是小时,自动删除超时的消息
brokerRole=SYNC_MASTER #master角色
flushDiskType=ASYNC_FLUSH #表示刷盘策略
listenPort=10910 #设置broker的监听地址,因为代码中将监听端口写死,这里的设置不起作用
storePathRootDir=D:/rocketmq1/ #必须设置该值,而且必须是目录,mq启动会在该目录下创建lock文件,并对文件加锁,mq以此确保只有一个实例启动
master-b:
namesrvAddr=127.0.0.1:9876;127.0.0.1:9877//设置NameServer集群地址,中间分号分隔
brokerClusterName=DefaultCluster
brokerName=master-b
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER //master角色
flushDiskType=ASYNC_FLUSH
listenPort=10911 //设置broker的监听地址
storePathRootDir=D:/rocketmq2/
slave-a:
namesrvAddr=127.0.0.1:9876;127.0.0.1:9877//设置NameServer集群地址,中间分号分隔
brokerClusterName=DefaultCluster
brokerName=master-a
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
listenPort=10912 //设置broker的监听地址
storePathRootDir=D:/rocketmq3/
slave-b:
namesrvAddr=127.0.0.1:9876;127.0.0.1:9877//设置NameServer集群地址,中间分号分隔
brokerClusterName=DefaultCluster
brokerName=master-b
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
listenPort=10913 //设置broker的监听地址
storePathRootDir=D:/rocketmq4/
rocketMQ在代码里面将监听端口设置为常量10911,所以配置文件中的listenPort属性其实不起作用,由此带来一个问题是,在一台机器上启动多个broker会报端口被占用,因此需要修改源代码将不同broker的监听端口设置为不同的值。我修改源代码将端口分别设置为:master-a:10911,master-b:20911,slave-a:30911,slav-a:40911。设置的端口之间的距离一定要大,因为broker还监听了其他端口,其他端口是在10911的基础上增减一个值得到的。
配置文件准备好后,下面就可以通过命令启动了,在启动前,还需通过powershell设置每个MQ实例的ROCKETMQ_HOME环境变量。使用如下命令:
$Env:ROCKETMQ_HOME="D:\rocketmq1" #目录后面数字变化,每个实例使用不同的ROCKETMQ目录
设置好环境变量后就可以使用如下命令分别启动四个broker:
.\bin\mqbroker.cmd -c 配置文件地址
这样一个高可用的MQ集群搭建好了。
3、灾备自动切换测试
我首先启动master-a,slave-a和master-b,然后启动生产者发送消息可以看到如下日志:
SendResult [sendStatus=SEND_OK, msgId=C0A8016833C018B4AAC27B42017A0004, offsetMsgId=C0A8380100002A9F000000000000A404, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=6], queueOffset=12]
消息是发送成功的,此时我关闭master-a,可以看到broker自动切换到了broker-b,由于我没有启动slave-b,所以sendStatus=SLAVE_NOT_AVAILABLE:
SendResult [sendStatus=SLAVE_NOT_AVAILABLE, msgId=C0A8016833C018B4AAC27B4257EE0006, offsetMsgId=C0A8380100009FCF000000000000A1A9, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b, queueId=7], queueOffset=12]
我启动slave-b,可以看到此时日志正常了:
SendResult [sendStatus=SEND_OK, msgId=C0A8016833C018B4AAC27B45E883001D, offsetMsgId=C0A8380100009FCF000000000000B3CB, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-b, queueId=5], queueOffset=15]
master和slave的角色是不可以互换的,如果master宕机了,生产者无法将消息发送到broker,消费者会自动切换到slave读取数据。
更多推荐
所有评论(0)