RocketMQ 从入门到实战:一篇文章搞懂消息队列、架构模型与集群部署
一、MQ 简介
在业务系统中,我们经常会遇到这样的场景:
用户下单以后,需要做很多事情:
如果这些操作全部放在一个接口里同步执行,那用户点一下“下单”,接口可能要等很久才返回。更严重的是,只要其中一个步骤失败,整个链路都会受到影响。这时候就可以引入 MQ,也就是 Message Queue,消息队列。
MQ 的核心思想很简单:生产者把消息发送到消息队列中,消费者从消息队列中取消息进行处理。
比如下单系统只负责发送一条“用户已下单”的消息,短信系统、积分系统、物流系统自己去消费这条消息。
这样做有几个好处:
所以 MQ 在电商、支付、日志、订单、秒杀、分布式事务等场景中非常常见。
二、RocketMQ 产品特点
1、RocketMQ 介绍
RocketMQ 是 Apache 旗下的一款分布式消息中间件。它最早来自阿里巴巴,后来成为 Apache 顶级项目,常用于金融、电商、订单、交易、日志收集、异步解耦等场景。
官方文档中,RocketMQ 的核心组件主要包括 Producer、Consumer、NameServer、Broker。Producer 负责发送消息,Consumer 负责消费消息,Broker 负责存储和转发消息,NameServer 负责维护路由信息。Producer 和 Consumer 会通过 NameServer 获取 Broker 的路由信息,从而完成消息投递和消费。
可以简单理解成:

2、RocketMQ 特点
1. 高性能:支持高吞吐消息发送和消费
2. 高可靠:消息支持持久化,Broker 可主从部署
3. 高可用:支持集群部署、主从复制、DLedger 高可用模式
4. 支持顺序消息:适合订单状态流转等场景
5. 支持延迟消息:适合订单超时取消、延迟通知等场景
6. 支持事务消息:适合分布式事务最终一致性场景
7. 支持消息过滤:Consumer 可以按 Tag 过滤消息
8. 支持水平扩展:Broker、Producer、Consumer 都可以集群化
三、RocketMQ 快速实战
1、快速搭建 RocketMQ 服务
RocketMQ 最核心的启动顺序是:
1. 启动 NameServer
2. 启动 Broker
3. 创建 Topic
4. 启动 Producer 发送消息
5. 启动 Consumer 消费消息
NameServer 启动成功后,官方文档中会提示可以在 namesrv.log 中看到类似 The Name Server boot success.. 的日志;启动 Broker 时,需要先保证 NameServer 已经启动。RocketMQ 5.x 版本还支持 Broker 和 Proxy 分离部署,用于更灵活的集群能力。
# 1. 启动 NameServer
nohup sh bin/mqnamesrv &
# 2. 启动 Broker
nohup sh bin/mqbroker -n localhost:9876 &
# 3. 查看启动日志
tail -f ~/logs/rocketmqlogs/namesrv.log
tail -f ~/logs/rocketmqlogs/broker.log
如果是 Windows 环境,通常是执行对应的 .cmd 文件:
start mqnamesrv.cmd
start mqbroker.cmd -n 127.0.0.1:9876
这里要注意一个点:
Broker 必须注册到 NameServer,Producer 和 Consumer 才能知道 Broker 在哪里。
所以 NameServer 有点像 RocketMQ 里的“注册中心”
2、快速实现消息收发
RocketMQ 的消息收发流程可以理解成这样:
Producer -> NameServer 查询 Topic 路由 -> 找到 Broker -> 发送消息
Consumer -> NameServer 查询 Topic 路由 -> 找到 Broker -> 拉取/接收消息
最简单的消息结构一般包括:
Topic:消息主题
Tag:消息标签
Body:消息内容
比如:
Message msg = new Message(
"TopicTest",
"TagA",
"Hello RocketMQ".getBytes(StandardCharsets.UTF_8)
);
这里:
TopicTest:表示这条消息属于哪个主题
TagA:表示这条消息的标签
Hello RocketMQ:真正的消息内容
官方示例中也提供了通过 tools.sh 运行 Producer 和 Consumer 的方式,用来快速验证 RocketMQ 是否搭建成功。
3、搭建 Java 客户端项目
以 RocketMQ 5.x 常见客户端写法为例,Producer 代码大概如下:
1)引入 Maven 依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.7</version>
</dependency>
2)RocketMQ 5.x Producer 示例
package com.example.rocketmq;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import java.nio.charset.StandardCharsets;
public class ProducerDemo {
public static void main(String[] args) throws ClientException {
// 1. 获取 RocketMQ 客户端服务提供者
ClientServiceProvider provider = ClientServiceProvider.loadService();
// 2. 配置 RocketMQ 服务地址
// 如果你是本地启动 RocketMQ 5.x Proxy,通常是 8081
// 如果是云服务或集群环境,填写对应的 endpoint
String endpoint = "localhost:8081";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoint)
.build();
// 3. 指定 Topic
String topic = "TopicTest";
// 4. 创建 Producer
Producer producer = provider.newProducerBuilder()
.setClientConfiguration(clientConfiguration)
.setTopics(topic)
.build();
// 5. 创建消息
Message message = provider.newMessageBuilder()
.setTopic(topic)
.setTag("TagA")
.setKeys("key-001")
.setBody("Hello RocketMQ 5.x".getBytes(StandardCharsets.UTF_8))
.build();
// 6. 发送消息
SendReceipt sendReceipt = producer.send(message);
System.out.println("消息发送成功,MessageId = " + sendReceipt.getMessageId());
// 7. 关闭 Producer
producer.close();
}
}
这段代码可以这样理解:
ClientServiceProvider:
RocketMQ 5.x 客户端入口,用来创建 Producer、Consumer、Message。
ClientConfiguration:
客户端配置,主要配置 RocketMQ 服务地址 endpoint。
endpoint:
RocketMQ 5.x 客户端连接地址。
如果你走 Proxy,一般是 Proxy 的地址,比如 localhost:8081。
Producer:
消息生产者,负责发送消息。
Topic:
消息主题,消息必须发送到某个 Topic。
Tag:
消息标签,用于对 Topic 下的消息进一步分类。
Keys:
消息业务 key,方便后续排查问题和查询消息轨迹。
Body:
真正的消息内容。
4、搭建 RocketMQ 可视化管理服务
学习 RocketMQ 时,建议搭建一个可视化控制台,比如 RocketMQ Dashboard。
它可以帮助我们查看:
1. Broker 列表
2. Topic 列表
3. Consumer Group
4. 消息堆积情况
5. 消息轨迹
6. 集群状态

5、升级分布式集群
单机版 RocketMQ 只适合学习和测试,企业环境一般会使用集群。
一个基础 RocketMQ 集群可以这样设计:
NameServer 集群:
namesrv1
namesrv2
Broker 集群:
broker-a master
broker-a slave
broker-b master
broker-b slave
Producer 集群:
多个业务服务实例
Consumer 集群:
多个消费服务实例
在 RocketMQ 传统主从架构中,Master 和 Slave 的对应关系通常通过相同的 brokerName 和不同的 brokerId 来定义,其中 brokerId=0 表示 Master,非 0 表示 Slave。
例如:
</>properties
# broker-a master
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
namesrvAddr=192.168.1.10:9876;192.168.1.11:9876
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
</>properties
# broker-a slave
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=1
namesrvAddr=192.168.1.10:9876;192.168.1.11:9876
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
这里要重点理解:
brokerName 一样:说明它们是一组主从
brokerId=0:表示主节点
brokerId=1:表示从节点
namesrvAddr:表示注册到哪些 NameServer
企业环境中,Topic 通常是提前创建好的,而不是完全依赖代码自动创建。
原因是:
1. Topic 是业务资源,需要规划
2. Topic 需要设置队列数量
3. Topic 需要设置权限
4. Topic 需要评估流量和容量
5. 防止代码误创建大量脏 Topic
所以实际公司里通常会有规范:
订单相关:order_topic
支付相关:pay_topic
物流相关:logistics_topic
短信相关:sms_topic
6、升级 DLedger 高可用集群
传统主从模式下,如果 Master 挂了,Slave 不一定能自动变成 Master。为了提升高可用能力,RocketMQ 支持 DLedger 模式。
DLedger 是基于 Raft 协议的分布式日志存储组件。官方文档说明,部署 RocketMQ 时可以使用 DLedger 替换原生副本存储机制;一个 RocketMQ-on-DLedger Group 通常需要至少 3 个节点,通过 Raft 自动选举 Leader,其他节点作为 Follower,并在节点之间复制数据。
可以简单理解成:

DLedger 的优势:
1. 自动选主
2. 数据多副本复制
3. 提高 Broker 高可用能力
4. 减少人工切换 Master 的成本
一个 DLedger 组大概是这样:
broker-a-n0
broker-a-n1
broker-a-n2
其中一个是 Leader,另外两个是 Follower。
配置示例可以理解为:
</>properties
enableDLegerCommitLog=true
dLegerGroup=broker-a
dLegerPeers=n0-192.168.1.10:40911;n1-192.168.1.11:40911;n2-192.168.1.12:40911
dLegerSelfId=n0
sendMessageThreadPoolNums=16
三个节点的 dLegerGroup 一样,表示它们属于同一个复制组;dLegerSelfId 不一样,表示当前节点是谁。
四、总结 RocketMQ 的运行架构

核心流程:
1. Broker 启动后注册到 NameServer
2. Producer 发送消息前,从 NameServer 获取 Topic 路由
3. Producer 根据路由把消息发送到 Broker
4. Consumer 从 NameServer 获取 Topic 路由
5. Consumer 从 Broker 拉取消息并消费
6. Broker 记录消息存储和消费进度
所以你可以这样记:
NameServer:负责找路
Broker:负责存消息
Producer:负责发消息
Consumer:负责收消息
Topic:负责消息分类
ConsumerGroup:负责消费分组
五、理解 RocketMQ 的消息模型
RocketMQ 的消息模型中,有几个核心概念必须掌握。
1、Topic
Topic 是消息主题,用来对消息进行分类。
比如:
order_topic:订单消息
pay_topic:支付消息
stock_topic:库存消息
sms_topic:短信消息
Producer 发送消息时必须指定 Topic。Consumer 消费消息时也需要订阅 Topic。
2、Tag
Tag 是 Topic 下更细粒度的分类。
比如订单 Topic 下可以有:
order_topic
TagA:创建订单
TagB:取消订单
TagC:完成订单
Consumer 可以通过 Tag 过滤自己关心的消息。RocketMQ 官方最佳实践中也提到,只有 Producer 发送消息时设置了 Tag,Consumer 才能使用 Tag 通过 Broker 过滤消息。
3、MessageQueue
一个 Topic 底层会分成多个 MessageQueue。
你可以理解成:
Topic 是一个大分类
MessageQueue 是这个分类下面的多个队列
为什么要有多个队列?因为可以提升并发能力。
一个 Topic 有 4 个队列
Producer 可以并发写入多个队列
ConsumerGroup 中多个 Consumer 可以并发消费多个队列
4、Producer
Producer 是消息生产者。
常见发送方式有:
1. 同步发送:等待 Broker 返回结果,可靠性高
2. 异步发送:不阻塞主线程,性能更好
3. 单向发送:只管发,不关心结果,适合日志类场景
4. 顺序发送:保证同一类消息进入同一个队列。
比如订单状态流转:
订单创建 -> 订单支付 -> 订单发货 -> 订单完成
这种场景就适合顺序消息。
5、Consumer
Consumer 是消息消费者。
RocketMQ 中 Consumer 一般是以 ConsumerGroup 的形式工作。
比如:
order-consumer-group
consumer-1
consumer-2
consumer-3
同一个 ConsumerGroup 里的多个 Consumer 会共同消费一个 Topic 下的消息。
这就是集群消费模式。
举个例子:
Topic 里有 100 条消息
ConsumerGroup 里有 2 个 Consumer
consumer-1 消费一部分
consumer-2 消费另一部分
但是如果是不同的 ConsumerGroup,它们可以各自消费一份完整消息。
比如:
sms-consumer-group:消费订单消息后发短信
point-consumer-group:消费订单消息后加积分
stock-consumer-group:消费订单消息后扣库存
这就是发布订阅的效果。
6、消费进度 Offset
RocketMQ 不会因为 Consumer 消费了一条消息,就马上把消息从队列里删除。官方概念文档中说明,RocketMQ 会根据 ConsumerGroup 记录最后消费到的位置。
消息队列:1 2 3 4 5 6 7 8 9
消费者当前消费到:5
那么 offset 就记录到 5 附近
如果消费者重启,可以从上次记录的位置继续消费。
六、章节总结
这篇文章我们从 0 到 1 梳理了 RocketMQ 的核心内容。
可以总结为几句话:
1. MQ 的核心价值是:解耦、异步、削峰、可靠
2. RocketMQ 是一款高性能、高可靠的分布式消息中间件
3. RocketMQ 核心组件包括:NameServer、Broker、Producer、Consumer
4. NameServer 负责路由发现,Broker 负责消息存储和投递
5. Producer 发送消息到 Topic,Consumer 从 Topic 消费消息
6. Topic 下有多个 MessageQueue,用来提高并发能力
7. ConsumerGroup 用来实现集群消费和广播消费
8. 企业环境中 Topic 通常需要提前规划和创建
9. 单机 RocketMQ 适合学习,生产环境一般使用集群
10. DLedger 可以通过 Raft 选主提高 Broker 高可用能力
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)