消息队列之RocketMQ
系列文章目录
提示:这里可以添加系列文章的所有文章的目录,目录需要自己手动添加
消息队列之RocketMQ
提示:写完文章后,目录可以自动生成,如何生成可参考右边的帮助文档
文章目录
前言
提示:这里可以添加本文要记录的大概内容:
在当今的分布式系统和微服务架构中,消息队列扮演着至关重要的角色。它们提供了可靠的异步通信和消息传递机制,使得不同的系统组件能够协调工作,提高了系统的可靠性、可扩展性和性能。
在众多的消息队列中,RocketMQ 以其高性能、高可靠性和丰富的特性脱颖而出。它是由阿里巴巴开源的一款分布式消息中间件,经过了大规模生产环境的验证,被广泛应用于各种行业和场景。
在本博客中,我将深入探讨 RocketMQ 的核心概念、工作原理、安装部署以及实际应用。我将分享我在使用 RocketMQ 过程中的实践经验,包括如何利用其高级特性来解决实际业务问题。
无论你是刚刚开始接触消息队列,还是已经有一定经验的开发者,我相信本博客都能为你提供有价值的信息和见解。让我们一起探索 RocketMQ 的世界,释放其强大的能力,构建更加高效、可靠的分布式系统。
提示:以下是本篇文章正文内容,下面案例可供参考
一、RacketMQ介绍
RocketMQ 是一款由阿里巴巴开源的分布式消息中间件,它具有高性能、高可靠性和丰富的特性。它支持多种消息类型,包括普通消息、顺序消息、定时消息等,并提供了灵活的消息过滤和消息确认机制。RocketMQ 采用了分布式架构,可以轻松地进行水平扩展和容错处理。此外,它还支持多种部署方式,包括独立部署、集群部署和云原生部署等。
RocketMQ 已经经过了大规模生产环境的验证,被广泛应用于各种行业和场景,如电子商务、金融、医疗、物流等。它可以帮助企业快速构建高性能、可扩展的分布式系统,提高系统的可靠性和灵活性。
二、RocketMQ应用场景
应用解耦
以订单系统为例,当用户创建订单后,需要调用库存系统,支付系统,物流系统,如果其中任何一个系统出问题或者暂时不可用,都会造成下单问题,用户的体验极差。
流量削峰
流量削峰:在高并发的场景下,RocketMQ 可以作为流量削峰的工具。将大量的请求放入队列中,消费者可以按照自己的处理能力从队列中获取请求进行处理,从而避免系统因瞬间高流量而崩溃。
异步通信
在分布式系统中,不同的服务或组件可以通过 RocketMQ 进行异步通信,无需等待对方立即响应。这可以提高系统的响应性和并发处理能力。
事件通知
当系统中发生重要事件时,可以使用 RocketMQ 向相关的订阅者发送通知。这可以用于监控、预警、日志收集等场景。
三、RocketMQ的概念术语
- 消息模型:消息模型主要有队列模型和发布订阅模型。其中,RabbitMQ是队列模型,RocketMQ是发布订阅模型。
- 生产者(Producer):负责发送消息到 RocketMQ 服务器的实体或应用程序。
- 消费者(Consumer):从 RocketMQ 服务器接收消息并进行处理的实体或应用程序。
- 主题(Topic):消息的分类或标识符,即一类消息的集合,用于指定消息的发送和接收。
- 代理服务器(Broker):消息中转对象,负责消息的存储和转发。
- 名字服务(nameServer):名称服务管理代理服务器broker,相当于一个管理机构。
- 消费者组:同一类消费者的集合。
- 拉取式消费:消费者主动从Broker中拉取消息。
- 推动式消费:Broker主动推消息给消费者消费。
- 普通顺序消息:在同一消息队列里的消息是顺序的,不同消息队列的消息可能是不同的。
- 严格顺序消息:所有的消息都是有循序的。
四、RocketMQ安装
1.在Windows下载RocketMQ安装包,并转移到Linux中,点击下载
2.修改环境变量
export JAVA_HOME=/usr/local/jdk-11.0.11/
export PATH=$PATH:$JAVA_HOME/bin
export ROCKETMQ_HOME=/usr/local/rocketmq-5.0.0
export PATH=$PATH:$ROCKETMQ_HOME/bin
3.解压文件
unzip rocketmq-all-5.0.0-bin-release.zip
4.启动NameServer
nohup sh mqnamesrv &
5.启动broker
nohup sh mqbroker -n localhost:9876
五、RocketMQ工作流程
- 启动NameServer,通过监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
- Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
- 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
- Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,包含Topic中所有队列列表然后选择一个队列,与队列所在的Broker建立长连接再向Broker发消息。
- Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
六、管理命令
- 启动namesrv和broker
./mqnamesrv #启动nameserver
./mqbroker -n localhost:9876 -c /opt/alibaba-rocketmq/conf/broker.conf #启动broker
- 查看日志
tail -f ~/logs/rocketmqlogs/namesrv.log #查看日志
tail -f ~/logs/rocketmqlogs/broker.log #查看日志
- 新增Topic,-n:nameServer第地址,-c:集群地址,-t:新增Topic名字
mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t topicWarning
- 查看某个topic的状态
mqadmin topicStatus -n localhost:9876 -t topicWarning
- 查看所有消费组group
mqadmin consumerProgress -n localhost:9876
- 查看所有Topic
mqadmin topicList -n localhost:9876
- 删除topic
mqadmin deleteTopic -n localhost:9876 -c DefaultCluster -t topicWarning
- 关闭namesrv和broker服务
mqshutdown namesrv
mqshutdown broker
七、SpringBoot整合RocketMQ
1.创建springboot-rocketmq-producer工程,在pom.xml文件中添加依赖
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq-spring-boot-starter-version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.6</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
2.配置文件
# application.properties
spring.application.name=springboot_rocketmq_producer
# nameserver的地址
rocketmq.name-server=192.168.139.128:9876
#指定生产组名称
rocketmq.producer.group=my-group
3.测试类
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {MyRocketProducerApplication.class})
public class MyRocketProducerApplicationTest {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Test
public void testSendMessage() {
// 用于向broker发送消息
// 第一个参数是topic名称
// 第二个参数是消息内容
this.rocketMQTemplate.convertAndSend(
"topic_springboot_01",
"springboot: hello rocketmq..."
);
}
}
4.创建springboot-rocketmq-consumer工程,pom.xml文件同producer工程
,application.properties配置
spring.application.name=springboot-rocketmq-consumer
rocketmq.name-server=192.168.139.128:9876
5.启动类
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringbootRocketmqConsumerApplication {
public static void main(String[] args) { SpringApplication.run(SpringbootRocketmqConsumerApplication.class, args);
}
}
6.消息监听器
@Slf4j
@Component
@RocketMQMessageListener(topic="topic_springboot_01",consumerGroup="springboot-rocketmq-consumer-01")
public class Consumer implements RocketMQlistener{
@override
public void onMessage(String message){
log.info("Received messsge:" + message);
}
}
总结
提示:这里对文章进行总结:
总的来说,这篇博客为读者提供了全面而深入的了解生成消息队列之 RocketMQ 的机会。无论是新手还是有经验的开发者,都能从中受益,并为自己的项目选择合适的消息队列解决方案提供参考。
更多推荐
所有评论(0)