深入理解 RocketMQ 消息模型:从客户端机制到 Spring Boot 实战与生产最佳实践
本文以 Apache RocketMQ 5.x 的消息模型为主线,并结合常见的 RocketMQ-Spring 使用方式讲解。需要注意的是,RocketMQ 4.x 与 5.x 在延迟消息、Topic 类型和部分客户端 API 上存在差异,实际开发时应以项目采用的版本为准。
一、为什么选择 RocketMQ
在微服务架构中,订单创建、库存扣减、支付通知、物流更新、营销积分等业务往往不适合全部通过同步调用完成。同步调用链越长,系统之间的耦合越重,任何一个下游服务异常,都可能影响主流程。
- 异步解耦:生产者只负责发送事件,不必等待所有消费者完成处理。
- 削峰填谷:高峰期先将请求写入消息队列,消费者根据自身能力平稳处理。
- 最终一致性:通过事务消息、重试和补偿机制完成跨服务状态同步。
- 事件驱动:将系统从“服务调用”演进为“事件传播”。
RocketMQ 不只是一个简单的消息中间件,它还原生支持顺序消息、延迟消息、事务消息、过滤、重试与死信队列等企业级能力。
二、深入理解 RocketMQ 的消息模型
1. RocketMQ 客户端基本流程
RocketMQ 中常见的核心角色包括
- Producer:消息生产者,负责发送消息。
- Consumer:消息消费者,负责订阅并处理消息。
- NameServer:保存路由信息,帮助客户端发现 Broker。
- Broker:负责消息存储、查询和投递。
- Topic:消息主题,用于区分业务类型。
- MessageQueue:Topic 下的逻辑队列,是消息存储和负载分配的基本单位。
一条普通消息的生命周期大致如下:

生产者发送消息流程
生产者启动后,通常会完成以下动作:
- 配置 NameServer 地址、生产者组和安全认证信息。
- 从 NameServer 获取 Topic 的路由信息。
- 根据负载均衡策略选择某个 MessageQueue。
- 将消息发送到对应 Broker。
- 根据发送方式决定是否等待 Broker 响应。

消费者消费消息流程
消费者通常以消费者组为单位订阅 Topic。对于集群消费,同一个消费者组中的多个实例会共同分担消息;对于广播消费,每个实例都会收到完整消息。
消费者处理消息的一般过程为:
- 订阅 Topic 和过滤条件。
- 从 Broker 拉取消息或接收推送式封装后的消息。
- 执行业务逻辑。
- 返回消费成功或消费失败状态。
- 失败消息根据规则进入重试,超过上限后进入死信队列。
RocketMQ 5.x 里,Topic 不再只是一个“消息分类名字”,它还要声明自己准备用来发哪一种消息。你创建 Topic 时,要明确它是普通消息 Topic、顺序消息 Topic、延迟消息 Topic,还是事务消息 Topic。
2. 消息确认机制
消息系统最重要的问题之一是:消息是否真的可靠地到达并被正确处理?
RocketMQ 的可靠性需要从两个方向理解:
- 生产者到 Broker 的发送确认。
- Broker 到消费者的消费确认。
2.1 生产者发送确实
同步发送消息时,Broker 会返回发送状态,例如:
- SEND_OK:消息已成功发送。
- FLUSH_DISK_TIMEOUT:刷盘超时。
- FLUSH_SLAVE_TIMEOUT:同步复制到从节点超时。
- SLAVE_NOT_AVAILABLE:从节点不可用。
2.2消费者消费确认
消费者成功处理消息后,需要向 Broker 表达“这条消息已经消费完成”。
在 Spring Boot 中,监听方法正常执行结束,通常会被框架认为消费成功;如果执行过程中抛出异常,则本次消费失败,RocketMQ 会根据重试策略重新投递。
@Component
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-consumer-group",
selectorExpression = "created"
)
public class OrderCreatedConsumer implements RocketMQListener<OrderCreatedEvent> {
@Override
public void onMessage(OrderCreatedEvent event) {
orderService.handleCreatedEvent(event);
// 方法正常结束,表示本次消费成功
}
}
如果业务处理失败,应让异常继续抛出:
@Override
public void onMessage(OrderCreatedEvent event) {
try {
orderService.handleCreatedEvent(event);
} catch (Exception e) {
log.error("处理订单创建消息失败,orderId={}", event.getOrderId(), e);
throw e;
}
}
不要在消费失败时捕获异常后什么都不做,否则框架会认为消息已经处理成功,消息也就失去了自动重试机会。
3. RocketMQ 的消费模式
RocketMQ 的消费模式主要包括:
- 集群消费:同一个消费者组中的多个实例共同消费消息,每条消息只由组内某一个实例处理。
- 广播消费:同一个消费者组中的每个实例都能够收到消息。
集群消费示意
消息 A -> Consumer-1
消息 B -> Consumer-2
消息 C -> Consumer-1
广播消费示意
消息 A -> Consumer-1
消息 A -> Consumer-2
消息 A -> Consumer-3
广播消费适用于:
- 配置刷新通知。
- 本地缓存失效通知。
- 应用节点状态更新。
- 每个实例都必须执行的任务。
@Component
@RocketMQMessageListener(
topic = "system-config-topic",
consumerGroup = "config-refresh-consumer-group",
messageModel = MessageModel.BROADCASTING
)
public class ConfigRefreshConsumer implements RocketMQListener<ConfigRefreshEvent> {
@Override
public void onMessage(ConfigRefreshEvent event) {
localCacheService.refresh(event.getConfigKey());
}
}
广播消费不适合直接承担订单扣库存、积分增加等只允许处理一次的核心业务,因为多个消费者实例都会执行同一条消息。
4. 过滤消息
在一个 Topic 中,往往会存在不同业务事件。例如订单主题中可能包含:
- 订单创建。
- 订单支付。
- 订单取消。
- 订单完成。
如果每类消费者只关注自己需要的消息,就可以通过过滤机制减少无效消费。
RocketMQ 常见过滤方式包括:
- Tag 过滤。
- SQL 属性过滤。
4.1 SQL 属性过滤
tag 是最简单、使用最广泛的过滤方式。
生产者发送消息:
rocketMQTemplate.syncSend(
"order-topic:created",
new OrderCreatedEvent(orderId)
);
rocketMQTemplate.syncSend(
"order-topic:paid",
new OrderPaidEvent(orderId)
);
消费者只订阅 created:
@Component
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-created-consumer-group",
selectorExpression = "created"
)
public class OrderCreatedConsumer implements RocketMQListener<OrderCreatedEvent> {
@Override
public void onMessage(OrderCreatedEvent event) {
orderService.createSnapshot(event.getOrderId());
}
}
也可以订阅多个 Tag:
selectorExpression = "created || paid"
Tag 使用建议
tag 应用于区分同一个业务领域下的事件类型,例如:
Topic: order-topic
Tag: created / paid / cancelled / finished
不要将所有系统的消息都塞入同一个 Topic,再依赖大量 Tag 区分,否则会导致主题职责混乱,运维和扩容都变得困难
4.2 SQL 属性过滤
如果仅通过 Tag 仍不足以表达过滤条件,可以给消息附加属性,再通过 SQL 表达式过滤。
例如,只消费金额大于 1000 元的 VIP 订单:
Message<OrderCreatedEvent> message = MessageBuilder
.withPayload(event)
.setHeader("amount", 1800)
.setHeader("customerLevel", "VIP")
.build();
rocketMQTemplate.syncSend("order-topic", message);
消费者配置示意:
@Component
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "vip-order-consumer-group",
selectorType = SelectorType.SQL92,
selectorExpression = "amount > 1000 AND customerLevel = 'VIP'"
)
public class VipOrderConsumer implements RocketMQListener<OrderCreatedEvent> {
@Override
public void onMessage(OrderCreatedEvent event) {
vipOrderService.process(event);
}
}
@Component
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "vip-order-consumer-group",
selectorType = SelectorType.SQL92,
selectorExpression = "amount > 1000 AND customerLevel = 'VIP'"
)
public class VipOrderConsumer implements RocketMQListener<OrderCreatedEvent> {
@Override
public void onMessage(OrderCreatedEvent event) {
vipOrderService.process(event);
}
}
SQL 过滤能力更强,但也比 Tag 更复杂。一般建议优先使用 Topic 与 Tag 完成清晰的业务分类,仅在确有必要时使用属性过滤。RocketMQ 支持基于 Tag 和消息属性表达式的过滤模式。
5. RocketMQ的消息类型
1. 顺序消息
2.延迟消息
3.批量消息
4.事务消息
1. 顺序消息
电商系统中,同一个订单可能产生以下事件:
创建订单 -> 支付成功 -> 仓库发货 -> 用户确认收货
如果这些消息乱序消费,就可能出现订单还没有支付却已经发货,或者订单已经完成后又被标记为待支付。
顺序消息解决的就是同一业务实体相关事件的消费顺序问题。
顺序消息的核心原理
RocketMQ 的顺序消息并不是让整个 Topic 中的所有消息都串行处理,而是通过以下方式实现局部有序:
- 同一个业务键的消息发送到同一个 MessageQueue。
- 消费者对该队列中的消息顺序消费。
- 不同业务键可以落入不同队列,并行处理。
例如:
orderId=1001 的所有消息 -> Queue-1
orderId=1002 的所有消息 -> Queue-2
这样既保证了同一个订单的事件顺序,也能够利用多队列提升整体吞吐量。
SpringBoot发送顺序消息
public void sendOrderStatusMessage(String orderId, OrderStatusEvent event) {
rocketMQTemplate.syncSendOrderly(
"order-status-topic",
event,
orderId
);
}
这里的第三个参数 orderId 是路由键。相同 orderId 的消息会被稳定路由到同一个队列。
消费者:
@Component
@RocketMQMessageListener(
topic = "order-status-topic",
consumerGroup = "order-status-consumer-group",
consumeMode = ConsumeMode.ORDERLY
)
public class OrderStatusConsumer implements RocketMQListener<OrderStatusEvent> {
@Override
public void onMessage(OrderStatusEvent event) {
orderService.updateStatus(event);
}
}
顺序消息注意事项
顺序消息并不等于绝对不会出现业务异常,生产环境仍然需要注意:
- 路由键必须稳定,例如使用 orderId,而不是随机值。
- 某一条消息一直消费失败,可能阻塞后续同队列消息。
- 消费者扩缩容期间需要关注队列重新分配行为。
- 业务处理仍然要保证幂等。
- RocketMQ 5.x 中应创建与 FIFO 消息匹配的 Topic 类型。
2. 延迟消息
延迟消息是指消息发送后,不立即被消费者看到,而是在指定时间后才允许消费。
典型业务场景包括:
- 订单创建 30 分钟未支付自动关闭。
- 用户注册成功 10 分钟后发送引导提醒。
- 商品签收 7 天后自动确认收货。
- 优惠券到期前发送提醒。
延迟消息业务模型:
以订单超时关闭为例:

注意:延迟消息触发时,消费者必须重新查询订单状态,不能简单地收到消息就关闭订单。因为在等待期间,订单可能已经完成支付。
RocketMQ-Spring 常见延迟等级方式
在采用延迟等级的客户端方式中,可类似这样发送:
Message<OrderCloseEvent> message = MessageBuilder
.withPayload(new OrderCloseEvent(orderId))
.build();
rocketMQTemplate.syncSend(
"order-close-topic",
message,
3000,
16
);
生产者发送这条消息时,最多等 3 秒确认发送结果;
这条消息发送成功后,会按照延迟级别 16,在默认配置下大约 30 分钟后投递给消费者。
3.批量消息
当应用需要一次发送大量同类型的小消息时,逐条发送会增加网络请求次数。批量消息可以减少交互开销,提高发送吞吐量。
适合批量发送的场景:
- 批量日志上报。
- 批量数据同步。
- 批量发送非关键通知。
- 初始化大量任务事件。
Spring Boot 示例
List<Message<OrderCreatedEvent>> messages = orderList.stream()
.map(order -> MessageBuilder
.withPayload(new OrderCreatedEvent(order.getId()))
.build())
.toList();
rocketMQTemplate.syncSend("order-created-topic", messages);
批量消息注意事项
批量发送并不意味着越大越好,需要关注:
- 单批消息总体大小限制。
- 单条消息大小限制。
- 批量消息应尽量属于同一个 Topic。
- 批次过大会提高失败后的重试成本。
- 关键交易消息不应为了追求吞吐量而过度批量化。
实际项目中,可以根据消息大小、吞吐需求和网络延迟设置合理批次,例如每几十条或每几百条组成一次发送。
4.事务消息
事务消息是 RocketMQ 非常有价值的能力之一,主要解决本地数据库事务与消息发送之间的一致性问题。
例如,创建订单时需要同时完成两件事:
- 将订单写入数据库。
- 发送订单创建消息,通知库存系统锁定库存。
如果先写数据库再发消息,可能出现数据库成功但消息发送失败;如果先发消息再写数据库,可能出现库存已经锁定但订单最终创建失败。
事务消息用于解决这一类问题。
事务消息执行流程

三种事务状态
事务执行完成后,生产者需要返回以下状态之一:

Spring Boot 事务消息示例
@Service
public class OrderMessageProducer {
private final RocketMQTemplate rocketMQTemplate;
public OrderMessageProducer(RocketMQTemplate rocketMQTemplate) {
this.rocketMQTemplate = rocketMQTemplate;
}
public void createOrder(OrderCreatedEvent event) {
Message<OrderCreatedEvent> message = MessageBuilder
.withPayload(event)
.setHeader("orderId", event.getOrderId())
.build();
rocketMQTemplate.sendMessageInTransaction(
"order-transaction-topic",
message,
event
);
}
}
监听本地事务:
@Component
@RocketMQTransactionListener
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
private final OrderService orderService;
public OrderTransactionListener(OrderService orderService) {
this.orderService = orderService;
}
@Override
public RocketMQLocalTransactionState executeLocalTransaction(
Message message,
Object arg) {
OrderCreatedEvent event = (OrderCreatedEvent) arg;
try {
orderService.createOrder(event);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
String orderId = String.valueOf(message.getHeaders().get("orderId"));
return orderService.exists(orderId)
? RocketMQLocalTransactionState.COMMIT
: RocketMQLocalTransactionState.ROLLBACK;
}
}
事务消息最佳实践
事务回查不能依赖内存状态,因为服务可能已经重启。建议将事务结果记录到数据库中:
订单表 / 本地事务记录表 / 消息事件表
回查时根据持久化结果判断应提交还是回滚。
事务消息适用于:
- 创建订单后通知库存。
- 支付成功后通知履约系统。
- 账户扣款后发送记账事件。
- 业务操作成功后可靠触发异步流程。
7、Spring Boot 整合 RocketMQ
1. 快速实战
下面使用订单创建事件演示最基础的生产与消费流程。
1.1 引入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq-spring.version}</version>
</dependency>
1.2 配置文件
rocketmq:
name-server: 127.0.0.1:9876
producer:
send-message-timeout: 3000
retry-times-when-send-failed: 2
1.3 定义消息对象
public class OrderCreatedEvent {
private String orderId;
private Long userId;
private BigDecimal amount;
private LocalDateTime createdTime;
// getter / setter
}
1.4 发送消息
@RestController
@RequestMapping("/orders")
public class OrderController {
private final RocketMQTemplate rocketMQTemplate;
public OrderController(RocketMQTemplate rocketMQTemplate) {
this.rocketMQTemplate = rocketMQTemplate;
}
@PostMapping("/{orderId}/notify")
public String notifyCreated(@PathVariable String orderId) {
OrderCreatedEvent event = new OrderCreatedEvent();
event.setOrderId(orderId);
event.setCreatedTime(LocalDateTime.now());
rocketMQTemplate.syncSend("order-topic:created", event);
return "success";
}
}
1.5 消费消息
@Slf4j
@Component
@RocketMQMessageListener(
topic = "order-topic",
selectorExpression = "created",
consumerGroup = "order-created-consumer-group"
)
public class OrderCreatedListener implements RocketMQListener<OrderCreatedEvent> {
@Override
public void onMessage(OrderCreatedEvent event) {
log.info("收到订单创建消息,orderId={}", event.getOrderId());
}
}
RocketMQ-Spring 通过 RocketMQTemplate 封装消息发送能力,通过 @RocketMQMessageListener 将消费者监听器注册到容器中,这是 Spring Boot 项目中最常用的接入方式。
2. 如何处理各种消息类型

Spring Boot 整合实现原理
Spring Boot 集成 RocketMQ 的核心并不神秘,本质上是完成了三件事:
1. 自动装配生产者
应用启动时,Starter 会根据配置文件创建并初始化生产者实例,并将其封装为 RocketMQTemplate 注入 Spring 容器。
因此业务代码不需要自行管理:
- 生产者启动。
- NameServer 连接。
- 消息序列化。
- 资源关闭。
2. 扫描监听注解
Starter 会扫描带有 @RocketMQMessageListener 的 Bean,并根据注解内容创建消费者容器。
例如:
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-consumer-group"
)
框架会读取:
- Topic。
- 消费者组。
- 过滤表达式。
- 消费模式。
- 是否顺序消费。
然后创建对应消费者并启动监听。
8、一个完整的订单消息设计案例

设计原则如下:
- 用 Topic 划分业务领域。
- 用 Tag 区分领域内事件类型。
- 用 Key 保存最重要的业务查询标识。
- 状态流转类消息采用顺序消息。
- 跨服务一致性要求高的场景采用事务消息。
- 超时触发业务采用延迟消息。
- 消费端统一完成幂等、日志、告警和异常处理。
总结
RocketMQ 的价值并不仅仅是“把消息发送出去”,而是围绕可靠性、一致性、吞吐量和业务治理提供了一套完整机制。
在实际项目中,需要重点掌握以下内容:
- 普通消息解决异步解耦问题。
- 广播消息用于每个实例都需要执行的通知类场景。
- Tag 与 SQL 过滤帮助消费者只处理关心的消息。
- 顺序消息保证同一业务实体的状态流转顺序。
- 延迟消息适用于订单超时、定时提醒等场景。
- 批量消息提高大量小消息的发送效率。
- 事务消息用于解决本地事务与消息发送的一致性问题。
- ACL 为生产环境提供访问安全保障。
- 消费端必须做好幂等、重试监控和死信治理。
最后,关于消息可靠性,最重要的一条原则是:
不要将业务正确性完全寄托在消息只投递一次上,而应通过幂等、状态校验、唯一约束、补偿机制和可观测能力,构建真正可靠的消息系统。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)