一、MQ 简介

在业务系统中,我们经常会遇到这样的场景:

用户下单以后,需要做很多事情:如果这些操作全部放在一个接口里同步执行,那用户点一下“下单”,接口可能要等很久才返回。更严重的是,只要其中一个步骤失败,整个链路都会受到影响。这时候就可以引入 MQ,也就是 Message Queue,消息队列

MQ 的核心思想很简单:生产者把消息发送到消息队列中,消费者从消息队列中取消息进行处理。

比如下单系统只负责发送一条“用户已下单”的消息,短信系统、积分系统、物流系统自己去消费这条消息。

这样做有几个好处:

所以 MQ 在电商、支付、日志、订单、秒杀、分布式事务等场景中非常常见。

二、RocketMQ 产品特点

1、RocketMQ 介绍

RocketMQ 是 Apache 旗下的一款分布式消息中间件。它最早来自阿里巴巴,后来成为 Apache 顶级项目,常用于金融、电商、订单、交易、日志收集、异步解耦等场景。

官方文档中,RocketMQ 的核心组件主要包括 Producer、Consumer、NameServer、Broker。Producer 负责发送消息,Consumer 负责消费消息,Broker 负责存储和转发消息,NameServer 负责维护路由信息。Producer 和 Consumer 会通过 NameServer 获取 Broker 的路由信息,从而完成消息投递和消费。

可以简单理解成:

2、RocketMQ 特点

1. 高性能:支持高吞吐消息发送和消费
2. 高可靠:消息支持持久化,Broker 可主从部署
3. 高可用:支持集群部署、主从复制、DLedger 高可用模式
4. 支持顺序消息:适合订单状态流转等场景
5. 支持延迟消息:适合订单超时取消、延迟通知等场景
6. 支持事务消息:适合分布式事务最终一致性场景
7. 支持消息过滤:Consumer 可以按 Tag 过滤消息
8. 支持水平扩展:Broker、Producer、Consumer 都可以集群化

三、RocketMQ 快速实战

1、快速搭建 RocketMQ 服务

RocketMQ 最核心的启动顺序是:

1. 启动 NameServer
2. 启动 Broker
3. 创建 Topic
4. 启动 Producer 发送消息
5. 启动 Consumer 消费消息

NameServer 启动成功后,官方文档中会提示可以在 namesrv.log 中看到类似 The Name Server boot success.. 的日志;启动 Broker 时,需要先保证 NameServer 已经启动。RocketMQ 5.x 版本还支持 Broker 和 Proxy 分离部署,用于更灵活的集群能力。

# 1. 启动 NameServer
nohup sh bin/mqnamesrv &

# 2. 启动 Broker
nohup sh bin/mqbroker -n localhost:9876 &

# 3. 查看启动日志
tail -f ~/logs/rocketmqlogs/namesrv.log
tail -f ~/logs/rocketmqlogs/broker.log

如果是 Windows 环境,通常是执行对应的 .cmd 文件:

start mqnamesrv.cmd
start mqbroker.cmd -n 127.0.0.1:9876

这里要注意一个点:

Broker 必须注册到 NameServer,Producer 和 Consumer 才能知道 Broker 在哪里。

所以 NameServer 有点像 RocketMQ 里的“注册中心”

2、快速实现消息收发

RocketMQ 的消息收发流程可以理解成这样:

Producer -> NameServer 查询 Topic 路由 -> 找到 Broker -> 发送消息
Consumer -> NameServer 查询 Topic 路由 -> 找到 Broker -> 拉取/接收消息

最简单的消息结构一般包括:

Topic:消息主题
Tag:消息标签
Body:消息内容

比如:

Message msg = new Message(
    "TopicTest",
    "TagA",
    "Hello RocketMQ".getBytes(StandardCharsets.UTF_8)
);

这里:

TopicTest:表示这条消息属于哪个主题
TagA:表示这条消息的标签
Hello RocketMQ:真正的消息内容

官方示例中也提供了通过 tools.sh 运行 Producer 和 Consumer 的方式,用来快速验证 RocketMQ 是否搭建成功。

3、搭建 Java 客户端项目

以 RocketMQ 5.x 常见客户端写法为例,Producer 代码大概如下:

1)引入 Maven 依赖

​​​​​​​<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client-java</artifactId>
    <version>5.0.7</version>
</dependency>

2)RocketMQ 5.x Producer 示例​​​​​​​

package com.example.rocketmq;

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;

import java.nio.charset.StandardCharsets;

public class ProducerDemo {

    public static void main(String[] args) throws ClientException {
        // 1. 获取 RocketMQ 客户端服务提供者
        ClientServiceProvider provider = ClientServiceProvider.loadService();

        // 2. 配置 RocketMQ 服务地址
        // 如果你是本地启动 RocketMQ 5.x Proxy,通常是 8081
        // 如果是云服务或集群环境,填写对应的 endpoint
        String endpoint = "localhost:8081";

        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(endpoint)
                .build();

        // 3. 指定 Topic
        String topic = "TopicTest";

        // 4. 创建 Producer
        Producer producer = provider.newProducerBuilder()
                .setClientConfiguration(clientConfiguration)
                .setTopics(topic)
                .build();

        // 5. 创建消息
        Message message = provider.newMessageBuilder()
                .setTopic(topic)
                .setTag("TagA")
                .setKeys("key-001")
                .setBody("Hello RocketMQ 5.x".getBytes(StandardCharsets.UTF_8))
                .build();

        // 6. 发送消息
        SendReceipt sendReceipt = producer.send(message);

        System.out.println("消息发送成功,MessageId = " + sendReceipt.getMessageId());

        // 7. 关闭 Producer
        producer.close();
    }
}

这段代码可以这样理解:

ClientServiceProvider:
RocketMQ 5.x 客户端入口,用来创建 Producer、Consumer、Message。

ClientConfiguration:
客户端配置,主要配置 RocketMQ 服务地址 endpoint。

endpoint:
RocketMQ 5.x 客户端连接地址。
如果你走 Proxy,一般是 Proxy 的地址,比如 localhost:8081。

Producer:
消息生产者,负责发送消息。

Topic:
消息主题,消息必须发送到某个 Topic。

Tag:
消息标签,用于对 Topic 下的消息进一步分类。

Keys:
消息业务 key,方便后续排查问题和查询消息轨迹。

Body:
真正的消息内容。

4、搭建 RocketMQ 可视化管理服务

学习 RocketMQ 时,建议搭建一个可视化控制台,比如 RocketMQ Dashboard。

它可以帮助我们查看:

1. Broker 列表
2. Topic 列表
3. Consumer Group
4. 消息堆积情况
5. 消息轨迹
6. 集群状态

5、升级分布式集群

单机版 RocketMQ 只适合学习和测试,企业环境一般会使用集群。

一个基础 RocketMQ 集群可以这样设计:

NameServer 集群:
namesrv1
namesrv2

Broker 集群:
broker-a master
broker-a slave

broker-b master
broker-b slave

Producer 集群:
多个业务服务实例

Consumer 集群:
多个消费服务实例

在 RocketMQ 传统主从架构中,Master 和 Slave 的对应关系通常通过相同的 brokerName 和不同的 brokerId 来定义,其中 brokerId=0 表示 Master,非 0 表示 Slave。

例如:

</>properties

# broker-a master
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
namesrvAddr=192.168.1.10:9876;192.168.1.11:9876
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH

</>properties

# broker-a slave
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=1
namesrvAddr=192.168.1.10:9876;192.168.1.11:9876
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH

这里要重点理解:

brokerName 一样:说明它们是一组主从
brokerId=0:表示主节点
brokerId=1:表示从节点
namesrvAddr:表示注册到哪些 NameServer

企业环境中,Topic 通常是提前创建好的,而不是完全依赖代码自动创建。

原因是:

1. Topic 是业务资源,需要规划
2. Topic 需要设置队列数量
3. Topic 需要设置权限
4. Topic 需要评估流量和容量
5. 防止代码误创建大量脏 Topic

所以实际公司里通常会有规范:

订单相关:order_topic
支付相关:pay_topic
物流相关:logistics_topic
短信相关:sms_topic

6、升级 DLedger 高可用集群

传统主从模式下,如果 Master 挂了,Slave 不一定能自动变成 Master。为了提升高可用能力,RocketMQ 支持 DLedger 模式。

DLedger 是基于 Raft 协议的分布式日志存储组件。官方文档说明,部署 RocketMQ 时可以使用 DLedger 替换原生副本存储机制;一个 RocketMQ-on-DLedger Group 通常需要至少 3 个节点,通过 Raft 自动选举 Leader,其他节点作为 Follower,并在节点之间复制数据。

可以简单理解成:

DLedger 的优势:

1. 自动选主
2. 数据多副本复制
3. 提高 Broker 高可用能力
4. 减少人工切换 Master 的成本

一个 DLedger 组大概是这样:

broker-a-n0
broker-a-n1
broker-a-n2

其中一个是 Leader,另外两个是 Follower。

配置示例可以理解为:

</>properties

enableDLegerCommitLog=true
dLegerGroup=broker-a
dLegerPeers=n0-192.168.1.10:40911;n1-192.168.1.11:40911;n2-192.168.1.12:40911
dLegerSelfId=n0
sendMessageThreadPoolNums=16

三个节点的 dLegerGroup 一样,表示它们属于同一个复制组;dLegerSelfId 不一样,表示当前节点是谁。

四、总结 RocketMQ 的运行架构

核心流程:

1. Broker 启动后注册到 NameServer
2. Producer 发送消息前,从 NameServer 获取 Topic 路由
3. Producer 根据路由把消息发送到 Broker
4. Consumer 从 NameServer 获取 Topic 路由
5. Consumer 从 Broker 拉取消息并消费
6. Broker 记录消息存储和消费进度

所以你可以这样记:

NameServer:负责找路
Broker:负责存消息
Producer:负责发消息
Consumer:负责收消息
Topic:负责消息分类
ConsumerGroup:负责消费分组

五、理解 RocketMQ 的消息模型

RocketMQ 的消息模型中,有几个核心概念必须掌握。

1、Topic

Topic 是消息主题,用来对消息进行分类。

比如:

order_topic:订单消息
pay_topic:支付消息
stock_topic:库存消息
sms_topic:短信消息

Producer 发送消息时必须指定 Topic。Consumer 消费消息时也需要订阅 Topic。

2、Tag

Tag 是 Topic 下更细粒度的分类。

比如订单 Topic 下可以有:

order_topic
TagA:创建订单
TagB:取消订单
TagC:完成订单

Consumer 可以通过 Tag 过滤自己关心的消息。RocketMQ 官方最佳实践中也提到,只有 Producer 发送消息时设置了 Tag,Consumer 才能使用 Tag 通过 Broker 过滤消息。

3、MessageQueue

一个 Topic 底层会分成多个 MessageQueue。

你可以理解成:

Topic 是一个大分类
MessageQueue 是这个分类下面的多个队列

为什么要有多个队列?因为可以提升并发能力。

一个 Topic 有 4 个队列
Producer 可以并发写入多个队列
ConsumerGroup 中多个 Consumer 可以并发消费多个队列

4、Producer

Producer 是消息生产者。

常见发送方式有:

1. 同步发送:等待 Broker 返回结果,可靠性高
2. 异步发送:不阻塞主线程,性能更好
3. 单向发送:只管发,不关心结果,适合日志类场景
4. 顺序发送:保证同一类消息进入同一个队列。

比如订单状态流转:

订单创建 -> 订单支付 -> 订单发货 -> 订单完成

这种场景就适合顺序消息。

5、Consumer

Consumer 是消息消费者。

RocketMQ 中 Consumer 一般是以 ConsumerGroup 的形式工作。

比如:

order-consumer-group
    consumer-1
    consumer-2
    consumer-3

同一个 ConsumerGroup 里的多个 Consumer 会共同消费一个 Topic 下的消息。

这就是集群消费模式。

举个例子:

Topic 里有 100 条消息
ConsumerGroup 里有 2 个 Consumer

consumer-1 消费一部分
consumer-2 消费另一部分

但是如果是不同的 ConsumerGroup,它们可以各自消费一份完整消息。

比如:

sms-consumer-group:消费订单消息后发短信
point-consumer-group:消费订单消息后加积分
stock-consumer-group:消费订单消息后扣库存

这就是发布订阅的效果。

6、消费进度 Offset

RocketMQ 不会因为 Consumer 消费了一条消息,就马上把消息从队列里删除。官方概念文档中说明,RocketMQ 会根据 ConsumerGroup 记录最后消费到的位置。

消息队列:1 2 3 4 5 6 7 8 9
消费者当前消费到:5
那么 offset 就记录到 5 附近

如果消费者重启,可以从上次记录的位置继续消费。

六、章节总结

这篇文章我们从 0 到 1 梳理了 RocketMQ 的核心内容。

可以总结为几句话:

1. MQ 的核心价值是:解耦、异步、削峰、可靠
2. RocketMQ 是一款高性能、高可靠的分布式消息中间件
3. RocketMQ 核心组件包括:NameServer、Broker、Producer、Consumer
4. NameServer 负责路由发现,Broker 负责消息存储和投递
5. Producer 发送消息到 Topic,Consumer 从 Topic 消费消息
6. Topic 下有多个 MessageQueue,用来提高并发能力
7. ConsumerGroup 用来实现集群消费和广播消费
8. 企业环境中 Topic 通常需要提前规划和创建
9. 单机 RocketMQ 适合学习,生产环境一般使用集群
10. DLedger 可以通过 Raft 选主提高 Broker 高可用能力

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐