前言

工作中很多种场景用到消息队列,消息队列简单来说就是消息的传输过程中保存消息的一种容器。项目中引入消息队列中间件主要解决了异步处理、应用耦合、流量削峰等问题。今天我们来学习一下阿里开源的一款产品 RocketMQ。

一、RocketMQ简介

RocketMQ 是一款低延迟、高并发、高可用、高可靠的分布式消息中间件。具备异步通信的优势,系统拓扑简单、上下游耦合较弱,主要应用于异步解耦,流量削峰填谷等场景。

1.1 整体架构

在这里插入图片描述

RocketMQ 架构主要分为四个部分,如上图所示:

● Producer: 消息发布的角色,支持分布式集群方式部署。Producer 通过 MQ 负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败并且低延迟。

● Consumer: 消息消费的角色,支持分布式集群方式部署。支持 PUSH 推,PULL 拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。

● NameServer: NameServer 是整个 RocketMQ 的大脑,相当于路由/注册和发现中心。可使用集群方式部署,集群中各个NameServer 都是无状态的即无法感知其它 NameServer 的存在。NameServer 的主要作用是为消息生产者和消息消费者提供有关 Topic 的路由信息,所以 NameServer 就需要存储路由信息,并且能够管理 Broker 节点,包括路由注册、路由删除等功能。

● Broker: RocketMQ 核心组件之一,大部分重量级工作都是通过 Broker 来完成的。Borker 处理各种请求和存储消息,决定整个 RocketMQ 体系的吞吐性能、可靠性和可用性。Broker 可以存储多个 Topic 的消息,每个 Topic 的消息也可以分片存储于不同的 Broker。MessageQueue 用于存储消息的物理地址,每个 Topic 中的消息地址存储于多个 MessageQueue 中。

主题(Topic)
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是 RocketMQ 进行消息订阅的基本单位。
在RocketMQ中,每个Topic默认都会有4个队列,并且每个队列都有一个id,默认从0开始,依次递增

● 路由注册
RockerMQ 路由注册是通过 Broker 与 NameServer 的心跳功能实现的。Broker 启动时向集群中所有的 NameServer 发送心跳语句,每隔 30s 向集群中所有的 NameServer 发送心跳包,NameServer收到心跳包会先更新 RouteInfoManager 类中 brokerLiveTable 中 BrokerLiveInfo的 lastUpdateTimestamp,然后每隔 10s 扫描一次 brokerLiveTable,如果连续 120s 没有收到心跳包,NameServer 将移除该 Broker 的路由信息,同时关闭 Socket 连接。

● 路由删除
上边提到了 NameServer 如果连续 120s 没有收到 Broker 的心跳包,将移除该 Broker 的路由信息。还有一点就是 Broker 在正常关闭的情况下,会执行 unregisterBroker 命令。

● 路由发现
RockerMQ 路由发现是非实时的,当 Topic 路由出现变化后,NameServer 不会主动推送给客户端,而是由客户端定时拉取主题最新的路由。

二、RocketMQ安装部署

2.1 RocketMQ 下载

安装部署RocketMQ, 需要先安装JDK,这里不做记录!

RocketMQ 下载链接:https://archive.apache.org/dist/rocketmq/4.9.1/

在这里插入图片描述

下载到本地后 win + r 打开 cmd 输入如下命令将文件上传到 Liunx 服务器的 /mydata/rocketmq/目录。

scp rocketmq-all-4.9.1-bin-release.zip的位置 root@192.168.57.129:/mydata/rocketmq/

输入 Linux 开机密码后上传成功,如下图:
在这里插入图片描述
或者直接使用如下命令下载安装包

wget https://archive.apache.org/dist/rocketmq/4.9.1/rocketmq-all-4.9.1-bin-release.zip

解压安装包

unzip rocketmq-all-4.9.1-bin-release.zip

重命名

mv rocketmq-all-4.9.1-bin-release rocketmq-4.9.1

进入rocketmq-4.9.1 文件夹

cd rocketmq-4.9.1

查看 rocketmq-4.9.1 目录

在这里插入图片描述

2.2 修改 JVM 参数

在启动 RocketMQ 之前,建议修改启动时的 JVM 参数,因为默认的参数都比较大,为了避免内存不够,建议修改小。

修改 runserver.sh 的 JVM 参数

vi /mydata/rocketmq/rocketmq-4.9.1/bin/runserver.sh

将原来的初始堆内存和最大最内存调整为 512m,esc + : wq 保存后退出

在这里插入图片描述

修改 runbroker.sh 的 JVM 参数

vi /mydata/rocketmq/rocketmq-4.9.1/bin/runbroker.sh

在这里插入图片描述

2.3 启动 NameServer 和 Broker

启动 nameServer

nohup /mydata/rocketmq/rocketmq-4.9.1/bin/mqnamesrv -n 192.168.57.129:9876 &

查看 nohup.out 日志

[root@localhost bin]# cat nohup.out 
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON

编写 broker.conf 配置文件

vim /mydata/rocketmq/rocketmq-4.9.1/conf/broker.conf

加上这行,开启自动创建 Topic

autoCreateTopicEnable = true

启动 broker

nohup /mydata/rocketmq/rocketmq-4.9.1/bin/mqbroker -n 192.168.57.129:9876 &

查看 nohup.out 日志

[root@localhost bin]# cat nohup.out 
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON
The broker[localhost.localdomain, 192.168.57.129:10911] boot success. serializeType=JSON and name server is 192.168.57.129:9876

2.4 验证发送和接受消息

● 配置 nameserver 的环境变量,在发送和接受消息之前,需要告诉客户端 nameserver 的位置。配置 NAMESRV_ADDR 如下。

vim /etc/profile

配置信息

export NAMESRV_ADDR=192.168.57.129:9876

重新加载配置,让它生效

source /etc/profile

● 使用 bin/tools.sh 工具验证消息的发送和接收,默认会发 1000 条数据。

启动生产者发送消息

tools.sh org.apache.rocketmq.example.quickstart.Producer

在这里插入图片描述

启动消费者接收消息

tools.sh org.apache.rocketmq.example.quickstart.Consumer

在这里插入图片描述

2.5 停止 NameServer 和 Broker

停止 nameserver

[root@localhost bin]# mqshutdown namesrv
The mqnamesrv(11783) is running...
Send shutdown request to mqnamesrv(11783) OK

停止 broker

[root@localhost bin]# mqshutdown broker
The mqbroker(12178) is running...
Send shutdown request to mqbroker(12178) OK

2.6 配置全局环境

vim /etc/profile
export ROCKETMQ_HOME=/mydata/rocketmq/rocketmq-4.9.1
export PATH=$PATH:$ROCKETMQ_HOME/bin
source /etc/profile

这样就不必每次进入 RocketMQ 的安装目录了,直接可以使用 mqnamesrv 和mqbroker 指令。

2.7 RocketMQ 执行流程

  1. 启动 NameServer,NameServer 启动后监听端口,等待Broker、Producer、Consumer 连接,相当于一个路由控制中心。
  2. Broker启动,与所有的 NameServer 保持长连接,定时发送心跳包。心跳包中包含当前 Broker 信息(IP + Port 等)以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。
  3. 收发消息前,先创建 Topic,创建 Topic 时需要指定该 Topic 要存储在哪些 Broker上,也可以在发送消息时自动创建 Topic。
  4. Producer 发送消息,启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic 存在哪些 Broker 上,轮询从队列列表中选择一个队列,然后与队列所在的 Broker 建立长连接从而向 Broker发消息。
  5. Consumer 跟Producer 类似,跟其中一台 NameServer 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费消息。

三、RocketMQ应用场景

3.1 异步处理

如下图,简单模拟用户下单流程,若使用同步通信方式用户下订单后,后台需要执行创建订单,扣减库存,加积分,生成优惠卷四个业务模块。通常来说它们都是以微服务的形式部署在不同机器上,网络不可靠因素无疑增加了用户下单失败的风险,调用链长增加了用户下单后返回下单成功的时长。
在这里插入图片描述

使用消息队列后,当用户点击下订单操作,后台往消息队列发送消息(userId,productId,…),后续创建订单,扣减库存,加积分,优惠卷四个业务只需要去消息队列的某个 Topic 订阅消息,当 Broker 的某个 Topic 有它们所订阅的消息后就能接受消息,进而去执行它们的业务逻辑。所以用户下订单后便能很快返下单成功。

相比第一种同步调用方案,使用 MQ 异步处理能明显提高了系统的吞吐量,而且上下游的依赖关系明显减弱了,达到了解耦的目的。

在这里插入图片描述

3.2 应用解耦

如下图所示:当用户发送请求经过 API 网关路由到服务 A,服务 A 再发送消息给 MQ 的某个 Topic ,服务 B,服务 C,服务 D 订阅这个 Topic 后就能接受服务 A 的消息,再去处理各自的业务,达到应用解耦的目的。

在这里插入图片描述

绝大多数电商业务场景都使用消息队列来解决类似的系统耦合过于紧密的问题。引入消息队列后订单服务在订单变化时发送一条消息到消息队列的一个主题 Order 中,所有下游系统都订阅主题 Order,这样每个下游系统都可以获得一份实时完整的订单数据无论增加、减少下游系统或是下游系统需求如何变化,订单服务都无需做任何更改,实现了订单服务与下游服务的解耦。

3.3 流量削峰

如何避免过多的请求压垮我们的秒杀系统?

设计思路: 使用消息队列隔离网关和后端服务,以达到流量控制和保护后端服务的目的。

在这里插入图片描述

加入消息队列之后,整个秒杀的流程如下:

  1. 网关在收到请求后,将请求放入到 MQ 中
  2. 后端服务从请求 MQ 获取请求,完成后续秒杀处理过程,返回响应
    在这里插入图片描述

代价:

  1. 增加系统调用链的环节,导致总体的响应时延变长
  2. 同步调用变成了异步调用,增加系统的复杂度
  3. 成本问题,MQ高可能、高可用

常见限流算法:

  1. 固定窗口算法
  2. 滑动窗口算法
  3. 漏桶算法
  4. 令牌桶算法

令牌桶控制流量的原理是: 单位时间内只发放固定数量的令牌到令牌桶中,规定服务在处理请求之前必须先从令牌桶中拿出一个令牌,如果令牌桶中没有令牌,则拒绝请求。这样就保证单位时间内,能处理的请求不超过发放令牌的数量,起到了流量控制的作用。

在这里插入图片描述

实现的方式简单,不需要破坏原有的调用链,只要网关在处理 APP 请求时增加一个获取令牌的逻辑。

更多的使用场景:

  1. 通过MQ实现分布式事务,最终一致性
  2. 作为发布/订阅系统实现一个微服务级系统间的观察者模式
  3. 连接流计算任务和数据
  4. 用于将消息广播给大量接收者,数据同步

备注: 应用解耦和流量控制部分来自B站作者(架构驿站)的消息队列进阶课。在此表示感谢!

四、MQ技术选型

4.1 ActiveMQ、RabbitMQ、RocketMQ、Kafka有什么优缺点?

特性ActiveMQRabbitMQRocketMQKafka
单机吞吐量万级万级十万级,支撑高吞吐十万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景
topic 数量对吞吐量的影响topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topictopic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源
时效性ms 级微秒级,这是 RabbitMQ 的一大特点,延迟最低ms 级延迟在 ms 级以内
可用性高,基于主从架构实现高可用高,基于主从架构实现高可用非常高,分布式架构非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性有较低的概率丢失数据基本不丢经过参数优化配置,可以做到 0 丢失经过参数优化配置,可以做到 0 丢失
功能支持MQ 领域的功能极其完备基于 erlang 开发,并发能力很强,性能极好,延时很低MQ 功能较为完善,还是分布式的,扩展性好功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用

经过综合对比与分析,对于MQ的选择,提出以下建议:

对于一般的业务系统,尽管ActiveMQ在过去被广泛使用,但鉴于其在大规模吞吐量场景下的表现未经充分验证,且社区活跃度有所降低,因此不推荐使用。

RabbitMQ作为后续流行的选择,虽然其基于Erlang语言可能让部分Java工程师望而却步,但从开源支持、稳定性及社区活跃度来看,它是一个值得考虑的选择。然而,由于其对公司的可控性有所欠缺,需要谨慎评估。

近年来,RocketMQ因其出色的性能与阿里巴巴的背景受到了广泛关注。虽然其社区存在突然终止的风险(目前RocketMQ已捐给Apache),但对于技术实力较强的公司,它仍然是一个值得推荐的选择。若对技术实力有所保留,建议继续使用RabbitMQ,其活跃的开源社区确保了其持续性与稳定性。

在大数据领域的实时计算、日志采集等场景中,Kafka无疑是业内标准。其社区活跃度高,稳定性强,且在全球范围内得到了广泛应用,是这一领域的实际规范。因此,对于此类场景,推荐使用Kafka。

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐