本文以 Apache RocketMQ 5.x 的消息模型为主线,并结合常见的 RocketMQ-Spring 使用方式讲解。需要注意的是,RocketMQ 4.x 与 5.x 在延迟消息、Topic 类型和部分客户端 API 上存在差异,实际开发时应以项目采用的版本为准。

一、为什么选择 RocketMQ

在微服务架构中,订单创建、库存扣减、支付通知、物流更新、营销积分等业务往往不适合全部通过同步调用完成。同步调用链越长,系统之间的耦合越重,任何一个下游服务异常,都可能影响主流程。

  1. 异步解耦:生产者只负责发送事件,不必等待所有消费者完成处理。
  2. 削峰填谷:高峰期先将请求写入消息队列,消费者根据自身能力平稳处理。
  3. 最终一致性:通过事务消息、重试和补偿机制完成跨服务状态同步。
  4. 事件驱动:将系统从“服务调用”演进为“事件传播”。

RocketMQ 不只是一个简单的消息中间件,它还原生支持顺序消息、延迟消息、事务消息、过滤、重试与死信队列等企业级能力。

二、深入理解 RocketMQ 的消息模型

1. RocketMQ 客户端基本流程

RocketMQ 中常见的核心角色包括

  • Producer:消息生产者,负责发送消息。
  • Consumer:消息消费者,负责订阅并处理消息。
  • NameServer:保存路由信息,帮助客户端发现 Broker。
  • Broker:负责消息存储、查询和投递。
  • Topic:消息主题,用于区分业务类型。
  • MessageQueue:Topic 下的逻辑队列,是消息存储和负载分配的基本单位。

一条普通消息的生命周期大致如下:

生产者发送消息流程

生产者启动后,通常会完成以下动作:

  1. 配置 NameServer 地址、生产者组和安全认证信息。
  2. 从 NameServer 获取 Topic 的路由信息。
  3. 根据负载均衡策略选择某个 MessageQueue。
  4. 将消息发送到对应 Broker。
  5. 根据发送方式决定是否等待 Broker 响应。

消费者消费消息流程

消费者通常以消费者组为单位订阅 Topic。对于集群消费,同一个消费者组中的多个实例会共同分担消息;对于广播消费,每个实例都会收到完整消息。

消费者处理消息的一般过程为:

  1. 订阅 Topic 和过滤条件。
  2. 从 Broker 拉取消息或接收推送式封装后的消息。
  3. 执行业务逻辑。
  4. 返回消费成功或消费失败状态。
  5. 失败消息根据规则进入重试,超过上限后进入死信队列。

RocketMQ 5.x 里,Topic 不再只是一个“消息分类名字”,它还要声明自己准备用来发哪一种消息。你创建 Topic 时,要明确它是普通消息 Topic、顺序消息 Topic、延迟消息 Topic,还是事务消息 Topic。

2. 消息确认机制

消息系统最重要的问题之一是:消息是否真的可靠地到达并被正确处理?

RocketMQ 的可靠性需要从两个方向理解:

  • 生产者到 Broker 的发送确认。
  • Broker 到消费者的消费确认。

2.1 生产者发送确实

同步发送消息时,Broker 会返回发送状态,例如:

  • SEND_OK:消息已成功发送。
  • FLUSH_DISK_TIMEOUT:刷盘超时。
  • FLUSH_SLAVE_TIMEOUT:同步复制到从节点超时。
  • SLAVE_NOT_AVAILABLE:从节点不可用。

 2.2消费者消费确认

消费者成功处理消息后,需要向 Broker 表达“这条消息已经消费完成”。

在 Spring Boot 中,监听方法正常执行结束,通常会被框架认为消费成功;如果执行过程中抛出异常,则本次消费失败,RocketMQ 会根据重试策略重新投递。

@Component
@RocketMQMessageListener(
        topic = "order-topic",
        consumerGroup = "order-consumer-group",
        selectorExpression = "created"  
)
public class OrderCreatedConsumer implements RocketMQListener<OrderCreatedEvent> {

    @Override
    public void onMessage(OrderCreatedEvent event) {
        orderService.handleCreatedEvent(event);
        // 方法正常结束,表示本次消费成功
    }
}

如果业务处理失败,应让异常继续抛出:

@Override
public void onMessage(OrderCreatedEvent event) {
    try {
        orderService.handleCreatedEvent(event);
    } catch (Exception e) {
        log.error("处理订单创建消息失败,orderId={}", event.getOrderId(), e);
        throw e;
    }
}

不要在消费失败时捕获异常后什么都不做,否则框架会认为消息已经处理成功,消息也就失去了自动重试机会。

3. RocketMQ 的消费模式

RocketMQ 的消费模式主要包括:

  • 集群消费:同一个消费者组中的多个实例共同消费消息,每条消息只由组内某一个实例处理。
  • 广播消费:同一个消费者组中的每个实例都能够收到消息。

集群消费示意

消息 A -> Consumer-1

消息 B -> Consumer-2

消息 C -> Consumer-1

广播消费示意

消息 A -> Consumer-1

消息 A -> Consumer-2

消息 A -> Consumer-3

广播消费适用于:

  • 配置刷新通知。
  • 本地缓存失效通知。
  • 应用节点状态更新。
  • 每个实例都必须执行的任务。
@Component
@RocketMQMessageListener(
        topic = "system-config-topic",
        consumerGroup = "config-refresh-consumer-group",
        messageModel = MessageModel.BROADCASTING
)
public class ConfigRefreshConsumer implements RocketMQListener<ConfigRefreshEvent> {

    @Override
    public void onMessage(ConfigRefreshEvent event) {
        localCacheService.refresh(event.getConfigKey());
    }
}

广播消费不适合直接承担订单扣库存、积分增加等只允许处理一次的核心业务,因为多个消费者实例都会执行同一条消息。

4. 过滤消息

在一个 Topic 中,往往会存在不同业务事件。例如订单主题中可能包含:

  • 订单创建。
  • 订单支付。
  • 订单取消。
  • 订单完成。

如果每类消费者只关注自己需要的消息,就可以通过过滤机制减少无效消费。

RocketMQ 常见过滤方式包括:

  • Tag 过滤。
  • SQL 属性过滤。

4.1  SQL 属性过滤

tag 是最简单、使用最广泛的过滤方式。

生产者发送消息:

rocketMQTemplate.syncSend(
        "order-topic:created",
        new OrderCreatedEvent(orderId)
);

rocketMQTemplate.syncSend(
        "order-topic:paid",
        new OrderPaidEvent(orderId)
);

消费者只订阅 created:

@Component
@RocketMQMessageListener(
        topic = "order-topic",
        consumerGroup = "order-created-consumer-group",
        selectorExpression = "created"
)
public class OrderCreatedConsumer implements RocketMQListener<OrderCreatedEvent> {

    @Override
    public void onMessage(OrderCreatedEvent event) {
        orderService.createSnapshot(event.getOrderId());
    }
}

也可以订阅多个 Tag:

selectorExpression = "created || paid"

Tag 使用建议

tag 应用于区分同一个业务领域下的事件类型,例如:

Topic: order-topic

Tag: created / paid / cancelled / finished

不要将所有系统的消息都塞入同一个 Topic,再依赖大量 Tag 区分,否则会导致主题职责混乱,运维和扩容都变得困难

4.2 SQL 属性过滤

如果仅通过 Tag 仍不足以表达过滤条件,可以给消息附加属性,再通过 SQL 表达式过滤。

例如,只消费金额大于 1000 元的 VIP 订单:

Message<OrderCreatedEvent> message = MessageBuilder
        .withPayload(event)
        .setHeader("amount", 1800)
        .setHeader("customerLevel", "VIP")
        .build();

rocketMQTemplate.syncSend("order-topic", message);

消费者配置示意:

@Component
@RocketMQMessageListener(
        topic = "order-topic",
        consumerGroup = "vip-order-consumer-group",
        selectorType = SelectorType.SQL92,
        selectorExpression = "amount > 1000 AND customerLevel = 'VIP'"
)
public class VipOrderConsumer implements RocketMQListener<OrderCreatedEvent> {

    @Override
    public void onMessage(OrderCreatedEvent event) {
        vipOrderService.process(event);
    }
}

@Component
@RocketMQMessageListener(
        topic = "order-topic",
        consumerGroup = "vip-order-consumer-group",
        selectorType = SelectorType.SQL92,
        selectorExpression = "amount > 1000 AND customerLevel = 'VIP'"
)
public class VipOrderConsumer implements RocketMQListener<OrderCreatedEvent> {

    @Override
    public void onMessage(OrderCreatedEvent event) {
        vipOrderService.process(event);
    }
}

SQL 过滤能力更强,但也比 Tag 更复杂。一般建议优先使用 Topic 与 Tag 完成清晰的业务分类,仅在确有必要时使用属性过滤。RocketMQ 支持基于 Tag 和消息属性表达式的过滤模式。

5. RocketMQ的消息类型

1. 顺序消息

2.延迟消息

3.批量消息

4.事务消息

1. 顺序消息

电商系统中,同一个订单可能产生以下事件:

创建订单 -> 支付成功 -> 仓库发货 -> 用户确认收货

如果这些消息乱序消费,就可能出现订单还没有支付却已经发货,或者订单已经完成后又被标记为待支付。

顺序消息解决的就是同一业务实体相关事件的消费顺序问题。

顺序消息的核心原理

RocketMQ 的顺序消息并不是让整个 Topic 中的所有消息都串行处理,而是通过以下方式实现局部有序:

  1. 同一个业务键的消息发送到同一个 MessageQueue。
  2. 消费者对该队列中的消息顺序消费。
  3. 不同业务键可以落入不同队列,并行处理。

例如:

orderId=1001 的所有消息 -> Queue-1

orderId=1002 的所有消息 -> Queue-2

这样既保证了同一个订单的事件顺序,也能够利用多队列提升整体吞吐量。

SpringBoot发送顺序消息

public void sendOrderStatusMessage(String orderId, OrderStatusEvent event) {
    rocketMQTemplate.syncSendOrderly(
            "order-status-topic",
            event,
            orderId
    );
}

这里的第三个参数 orderId 是路由键。相同 orderId 的消息会被稳定路由到同一个队列。

消费者:

@Component
@RocketMQMessageListener(
        topic = "order-status-topic",
        consumerGroup = "order-status-consumer-group",
        consumeMode = ConsumeMode.ORDERLY
)
public class OrderStatusConsumer implements RocketMQListener<OrderStatusEvent> {

    @Override
    public void onMessage(OrderStatusEvent event) {
        orderService.updateStatus(event);
    }
}

顺序消息注意事项

顺序消息并不等于绝对不会出现业务异常,生产环境仍然需要注意:

  • 路由键必须稳定,例如使用 orderId,而不是随机值。
  • 某一条消息一直消费失败,可能阻塞后续同队列消息。
  • 消费者扩缩容期间需要关注队列重新分配行为。
  • 业务处理仍然要保证幂等。
  • RocketMQ 5.x 中应创建与 FIFO 消息匹配的 Topic 类型。

2. 延迟消息

延迟消息是指消息发送后,不立即被消费者看到,而是在指定时间后才允许消费。

典型业务场景包括:

  • 订单创建 30 分钟未支付自动关闭。
  • 用户注册成功 10 分钟后发送引导提醒。
  • 商品签收 7 天后自动确认收货。
  • 优惠券到期前发送提醒。

延迟消息业务模型:

以订单超时关闭为例:

注意:延迟消息触发时,消费者必须重新查询订单状态,不能简单地收到消息就关闭订单。因为在等待期间,订单可能已经完成支付。

RocketMQ-Spring 常见延迟等级方式

在采用延迟等级的客户端方式中,可类似这样发送:

Message<OrderCloseEvent> message = MessageBuilder
        .withPayload(new OrderCloseEvent(orderId))
        .build();

rocketMQTemplate.syncSend(
        "order-close-topic",
        message,
        3000,
        16
);

生产者发送这条消息时,最多等 3 秒确认发送结果;
这条消息发送成功后,会按照延迟级别 16,在默认配置下大约 30 分钟后投递给消费者。

3.批量消息

当应用需要一次发送大量同类型的小消息时,逐条发送会增加网络请求次数。批量消息可以减少交互开销,提高发送吞吐量。

适合批量发送的场景:

  • 批量日志上报。
  • 批量数据同步。
  • 批量发送非关键通知。
  • 初始化大量任务事件。

Spring Boot 示例

List<Message<OrderCreatedEvent>> messages = orderList.stream()
        .map(order -> MessageBuilder
                .withPayload(new OrderCreatedEvent(order.getId()))
                .build())
        .toList();

rocketMQTemplate.syncSend("order-created-topic", messages);

批量消息注意事项

批量发送并不意味着越大越好,需要关注:

  • 单批消息总体大小限制。
  • 单条消息大小限制。
  • 批量消息应尽量属于同一个 Topic。
  • 批次过大会提高失败后的重试成本。
  • 关键交易消息不应为了追求吞吐量而过度批量化。

实际项目中,可以根据消息大小、吞吐需求和网络延迟设置合理批次,例如每几十条或每几百条组成一次发送。

4.事务消息

事务消息是 RocketMQ 非常有价值的能力之一,主要解决本地数据库事务与消息发送之间的一致性问题。

例如,创建订单时需要同时完成两件事:

  1. 将订单写入数据库。
  2. 发送订单创建消息,通知库存系统锁定库存。

如果先写数据库再发消息,可能出现数据库成功但消息发送失败;如果先发消息再写数据库,可能出现库存已经锁定但订单最终创建失败。

事务消息用于解决这一类问题。

事务消息执行流程

三种事务状态

事务执行完成后,生产者需要返回以下状态之一:

Spring Boot 事务消息示例

@Service
public class OrderMessageProducer {

    private final RocketMQTemplate rocketMQTemplate;

    public OrderMessageProducer(RocketMQTemplate rocketMQTemplate) {
        this.rocketMQTemplate = rocketMQTemplate;
    }

    public void createOrder(OrderCreatedEvent event) {
        Message<OrderCreatedEvent> message = MessageBuilder
                .withPayload(event)
                .setHeader("orderId", event.getOrderId())
                .build();

        rocketMQTemplate.sendMessageInTransaction(
                "order-transaction-topic",
                message,
                event
        );
    }
}

监听本地事务:

@Component
@RocketMQTransactionListener
public class OrderTransactionListener implements RocketMQLocalTransactionListener {

    private final OrderService orderService;

    public OrderTransactionListener(OrderService orderService) {
        this.orderService = orderService;
    }

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(
            Message message,
            Object arg) {
        OrderCreatedEvent event = (OrderCreatedEvent) arg;

        try {
            orderService.createOrder(event);
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        String orderId = String.valueOf(message.getHeaders().get("orderId"));

        return orderService.exists(orderId)
                ? RocketMQLocalTransactionState.COMMIT
                : RocketMQLocalTransactionState.ROLLBACK;
    }
}

事务消息最佳实践

事务回查不能依赖内存状态,因为服务可能已经重启。建议将事务结果记录到数据库中:

订单表 / 本地事务记录表 / 消息事件表

回查时根据持久化结果判断应提交还是回滚。

事务消息适用于:

  • 创建订单后通知库存。
  • 支付成功后通知履约系统。
  • 账户扣款后发送记账事件。
  • 业务操作成功后可靠触发异步流程。

7、Spring Boot 整合 RocketMQ

1. 快速实战

下面使用订单创建事件演示最基础的生产与消费流程。

1.1 引入依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>${rocketmq-spring.version}</version>
</dependency>

1.2 配置文件

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    send-message-timeout: 3000
    retry-times-when-send-failed: 2

1.3 定义消息对象

public class OrderCreatedEvent {

    private String orderId;
    private Long userId;
    private BigDecimal amount;
    private LocalDateTime createdTime;

    // getter / setter
}

1.4 发送消息

​
@RestController
@RequestMapping("/orders")
public class OrderController {

    private final RocketMQTemplate rocketMQTemplate;

    public OrderController(RocketMQTemplate rocketMQTemplate) {
        this.rocketMQTemplate = rocketMQTemplate;
    }

    @PostMapping("/{orderId}/notify")
    public String notifyCreated(@PathVariable String orderId) {
        OrderCreatedEvent event = new OrderCreatedEvent();
        event.setOrderId(orderId);
        event.setCreatedTime(LocalDateTime.now());

        rocketMQTemplate.syncSend("order-topic:created", event);

        return "success";
    }
}

​

1.5 消费消息

@Slf4j
@Component
@RocketMQMessageListener(
        topic = "order-topic",
        selectorExpression = "created",
        consumerGroup = "order-created-consumer-group"
)
public class OrderCreatedListener implements RocketMQListener<OrderCreatedEvent> {

    @Override
    public void onMessage(OrderCreatedEvent event) {
        log.info("收到订单创建消息,orderId={}", event.getOrderId());
    }
}

RocketMQ-Spring 通过 RocketMQTemplate 封装消息发送能力,通过 @RocketMQMessageListener 将消费者监听器注册到容器中,这是 Spring Boot 项目中最常用的接入方式。

2. 如何处理各种消息类型

Spring Boot 整合实现原理

Spring Boot 集成 RocketMQ 的核心并不神秘,本质上是完成了三件事:

1. 自动装配生产者

应用启动时,Starter 会根据配置文件创建并初始化生产者实例,并将其封装为 RocketMQTemplate 注入 Spring 容器。

因此业务代码不需要自行管理:

  • 生产者启动。
  • NameServer 连接。
  • 消息序列化。
  • 资源关闭。

2. 扫描监听注解

Starter 会扫描带有 @RocketMQMessageListener 的 Bean,并根据注解内容创建消费者容器。

例如:

@RocketMQMessageListener(
        topic = "order-topic",
        consumerGroup = "order-consumer-group"
)

框架会读取:

  • Topic。
  • 消费者组。
  • 过滤表达式。
  • 消费模式。
  • 是否顺序消费。

然后创建对应消费者并启动监听。

8、一个完整的订单消息设计案例

设计原则如下:

  1. 用 Topic 划分业务领域。
  2. 用 Tag 区分领域内事件类型。
  3. 用 Key 保存最重要的业务查询标识。
  4. 状态流转类消息采用顺序消息。
  5. 跨服务一致性要求高的场景采用事务消息。
  6. 超时触发业务采用延迟消息。
  7. 消费端统一完成幂等、日志、告警和异常处理。

总结

RocketMQ 的价值并不仅仅是“把消息发送出去”,而是围绕可靠性、一致性、吞吐量和业务治理提供了一套完整机制。

在实际项目中,需要重点掌握以下内容:

  • 普通消息解决异步解耦问题。
  • 广播消息用于每个实例都需要执行的通知类场景。
  • Tag 与 SQL 过滤帮助消费者只处理关心的消息。
  • 顺序消息保证同一业务实体的状态流转顺序。
  • 延迟消息适用于订单超时、定时提醒等场景。
  • 批量消息提高大量小消息的发送效率。
  • 事务消息用于解决本地事务与消息发送的一致性问题。
  • ACL 为生产环境提供访问安全保障。
  • 消费端必须做好幂等、重试监控和死信治理。

最后,关于消息可靠性,最重要的一条原则是:

不要将业务正确性完全寄托在消息只投递一次上,而应通过幂等、状态校验、唯一约束、补偿机制和可观测能力,构建真正可靠的消息系统。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐