前言

        在前几期中,我们陆续学习了Java基础、Spring全家桶、MySQL优化和Redis实战。今天,我们进入分布式系统的另一个核心组件——消息中间件。无论是异步解耦、削峰填谷,还是日志采集、数据同步,消息队列(MQ)都扮演着关键角色。本文将详细对比RocketMQKafka两大主流消息中间件,涵盖核心概念架构原理消息模型高可用机制消息可靠性顺序消息事务消息性能调优以及生产最佳实践。全文配有大量代码示例,助你从零到一掌握MQ。


一、为什么需要消息队列?

  • 异步处理:用户注册成功后,发送邮件、短信等耗时操作可异步执行,提升响应速度。

  • 应用解耦:订单系统与库存系统通过MQ通信,避免直接调用,降低耦合度。

  • 流量削峰:秒杀场景下,请求先进入MQ,后端按能力消费,保护数据库。

  • 日志收集:各服务日志发送到MQ,再由Logstash消费写入ES。

  • 数据同步:跨机房、跨数据库的数据最终一致性。


二、RocketMQ vs Kafka 核心对比

特性 RocketMQ Kafka
开发语言 Java Scala + Java
消息模型 标准队列(拉模式) 分区日志(拉模式)
顺序消息 支持全局顺序(单分区)和部分顺序 同一分区内有序
事务消息 支持(分布式事务最终一致) 不支持(Kafka 0.11+支持幂等和事务,但较复杂)
延迟消息 支持(18个级别) 不支持(可通过定时任务模拟)
消息过滤 Tag + SQL92过滤 仅支持分区级别过滤
消息可靠性 同步刷盘 + 同步复制 acks=all + 副本同步
吞吐量 十万级/秒 百万级/秒
单机Topic数 支持大量Topic(数千) Topic过多会影响性能(分区多导致随机IO)
社区生态 阿里出品,国内流行 开源社区活跃,日志领域首选
适用场景 业务解耦、事务消息、金融级可靠性 日志采集、大数据管道、流处理

选型建议

  • 内部业务系统、金融交易、需要事务消息 → RocketMQ

  • 海量日志、大数据流、高吞吐为主 → Kafka


三、RocketMQ 核心概念与架构

1. 核心组件

text

Producer → NameServer → Broker → Consumer
                ↑           ↓
              Consumer    Broker集群
  • NameServer:轻量级注册中心,管理Broker路由信息(类似Kafka的ZooKeeper)。

  • Broker:消息存储节点,分为Master和Slave(主从)。

  • Producer:消息生产者,发送消息时从NameServer获取路由。

  • Consumer:消费者,支持Push(长轮询)和Pull模式。

2. 消息模型

  • Topic:逻辑消息类别,生产者和消费者通过Topic通信。

  • Message Queue:Topic下的物理分区,每个Queue内部消息有序。

  • Tag:子主题,用于消息过滤(如订单Topic下区分“创建”“支付”)。

3. 高可用机制

  • 主从同步:同步双写(SYNC_MASTER)或异步复制(ASYNC_MASTER)。

  • Dledger(Raft协议):RocketMQ 4.5+支持无主自动选举,取代手动切换。

4. 消息存储

  • CommitLog:所有消息顺序写入单个文件(顺序IO,性能极高)。

  • ConsumeQueue:每个Queue的逻辑偏移量索引文件(类似跳表)。

  • IndexFile:按Message Key的哈希索引。


四、RocketMQ 实战(Spring Boot集成)

1. 引入依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.3</version>
</dependency>

2. 配置文件

yml

rocketmq:
  name-server: 192.168.1.100:9876
  producer:
    group: my-producer-group
    send-message-timeout: 3000
    retry-times-when-send-failed: 2

3. 生产消息

@Service
public class OrderService {
    @Autowired
    private RocketMQTemplate template;
    
    public void createOrder(Order order) {
        // 同步发送
        SendResult result = template.syncSend("order-topic", order);
        // 异步发送
        template.asyncSend("order-topic", order, new SendCallback() {
            @Override public void onSuccess(SendResult sendResult) {}
            @Override public void onException(Throwable e) {}
        });
        // 发送延迟消息(级别3对应10秒)
        template.syncSend("order-topic", order, 1000, 3);
    }
}

4. 消费消息

@Component
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "order-consumer-group")
public class OrderConsumer implements RocketMQListener<Order> {
    @Override
    public void onMessage(Order order) {
        System.out.println("收到订单:" + order.getId());
        // 业务处理
    }
}

5. 顺序消息(单Queue)

// 生产端:选择同一Queue(如订单ID hash)
SendResult result = template.syncSendOrderly("order-topic", order, "orderId_" + order.getId());
// 消费端:监听器实现顺序消费
@Component
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "order-group", consumeMode = ConsumeMode.ORDERLY)
public class OrderConsumer implements RocketMQListener<Order> { ... }

6. 事务消息(分布式事务)

// 发送半消息
template.sendMessageInTransaction("tx-topic", message, null);

需实现RocketMQLocalTransactionListener,根据本地事务结果返回COMMIT/ROLLBACK。


五、Kafka 核心概念与架构

1. 核心组件

text

Producer → Broker (Leader) → Consumer Group
                ↓
             Zookeeper (或KRaft)
  • Broker:Kafka节点。

  • Topic:逻辑主题,分为多个Partition

  • Partition:有序消息日志,每个分区有多个副本(Replica),其中一个是Leader,其余是Follower。

  • Consumer Group:组内每个消费者消费不同分区,实现水平扩展。

  • ZooKeeper:早期存储元数据和协调(Kafka 3.0后逐步移除,改用KRaft)。

2. 消息存储

  • 每个分区对应磁盘上一个Segment文件(默认1GB)。

  • 索引文件(.index)和时间戳索引(.timeindex)加快查找。

  • 消息顺序追加,零拷贝技术(sendfile)提升网络传输效率。

3. 高可用与一致性

  • ISR机制:同步副本集合,Leader挂掉后从ISR中选举新Leader。

  • acks参数

    • acks=0:不等待确认,性能最高,可能丢数据。

    • acks=1:Leader确认即成功(默认)。

    • acks=all:所有ISR副本确认,最安全。

  • min.insync.replicas:至少多少个副本同步才算写入成功(通常设为2)。


六、Kafka 实战(Spring Boot集成)

1. 引入依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2. 配置文件

yml

spring:
  kafka:
    bootstrap-servers: 192.168.1.100:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      acks: all
      retries: 3
    consumer:
      group-id: my-consumer-group
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: "*"

3. 生产消息

@Service
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    
    public void send(String topic, Object message) {
        CompletableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, message);
        future.whenComplete((result, ex) -> {
            if (ex == null) {
                System.out.println("offset: " + result.getRecordMetadata().offset());
            } else {
                ex.printStackTrace();
            }
        });
    }
}

4. 消费消息

@Component
public class KafkaConsumer {
    @KafkaListener(topics = "order-topic", groupId = "order-group")
    public void listen(ConsumerRecord<String, Order> record) {
        System.out.println("收到消息:" + record.value());
    }
}

5. 顺序消费保证

  • 将需要顺序的消息发送到同一个分区(使用相同的key)。

kafkaTemplate.send(topic, partitionKey, message);
  • 消费端单线程处理同一分区消息(concurrency=1)。

6. 事务消息(Kafka 0.11+)

@Transactional
public void sendWithTx() {
    kafkaTemplate.send("topic1", "msg1");
    kafkaTemplate.send("topic2", "msg2");
    // 如果发生异常,所有消息都不会提交
}

需配置transaction-id-prefix


七、消息可靠性与重复消费

1. 可靠性三要素

阶段 RocketMQ Kafka
生产端 同步发送+重试 + 开启事务 acks=all + 重试 + 幂等性(enable.idempotence=true)
服务端 同步刷盘(flushDiskType=SYNC_FLUSH)+ 同步复制 min.insync.replicas=2 + acks=all
消费端 手动提交offset(业务执行成功后再提交) enable.auto.commit=false + 手动commitSync

2. 幂等消费方案

  • 消费端记录已处理的消息ID(Redis或DB唯一索引)。

  • 使用业务唯一标识(如订单号)做去重。

j

if (redis.setnx("processed:" + msgId, "1", 60, TimeUnit.MINUTES)) {
    // 执行业务
}

八、延迟消息与定时消息

RocketMQ 延迟消息

  • 默认支持18个级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

  • 发送时设置delayTimeLevel即可,无需额外组件。

Kafka 延迟消息实现

  • 方式一:生产消息时不设置过期,消费端检查时间戳,未到时间则重新投递到延迟Topic。

  • 方式二:使用时间轮算法 + 本地延迟队列(如Netty的HashedWheelTimer)。

  • 方式三:使用Kafka Streams的punctuate方法。


九、消息堆积处理

现象

  • 消费速度跟不上生产速度,导致消息积压。

  • 原因:消费者数量不足、业务处理慢、下游瓶颈等。

解决方案

  1. 增加消费者实例:RocketMQ中一个Queue只能被同一消费组内一个消费者消费,需增加Queue数量(如defaultTopicQueueNums)。

  2. 临时扩容:临时增加消费者实例,消费完后再缩容。

  3. 并发消费:使用线程池处理消息(注意顺序要求)。

java

@RocketMQMessageListener(consumeThreadMax = 32)
  1. 丢弃非核心消息:低优先级消息可暂时跳过,业务高峰后补处理。

  2. 消费者降级:关闭非核心业务逻辑,只做必要入库。


十、监控与运维

RocketMQ

  • 控制台rocketmq-console(可视化查看Topic、消费者、消息轨迹)。

  • 关键指标

    • 生产TPS、消费TPS、消息积压量(rocketmq-consumerdiffTotal)。

    • Broker磁盘使用率、PageCache命中率。

  • 告警:积压超过阈值、消息发送失败率>1%。

Kafka

  • 监控工具:Kafka Eagle(EFAK)、Confluent Control Center、Prometheus + JMX Exporter。

  • 重要指标

    • UnderReplicatedPartitions(副本不同步数)

    • OfflinePartitionsCount(离线分区数)

    • Consumer Lag(消费延迟,kafka-consumer-groups.sh查看)

  • 集群平衡kafka-reassign-partitions.sh 重分配分区。


十一、生产最佳实践

RocketMQ

  • 避免创建过多Topic(每个Topic创建多个Queue会消耗内存)。

  • 合理设置maxMessageSize(默认4MB,避免大消息)。

  • 批量发送:syncSend(collection) 提高吞吐。

  • 使用FilterServer进行复杂过滤(Tag不够用时)。

Kafka

  • 分区数设置:单分区吞吐约10MB/s,根据目标吞吐计算(一般1~3个分区/ Broker)。

  • 日志保留策略:按时间(log.retention.hours)或大小(log.retention.bytes)。

  • 压缩方式:推荐snappylz4(节省磁盘和网络带宽)。

  • 避免使用通配符订阅大量Topic。

通用

  • 消息体尽量精简,不要携带大字段(改用对象存储路径)。

  • 消费端保证幂等,做好重试和死信队列处理。

  • 生产环境开启消息轨迹(RocketMQ的enableMsgTrace)。


十二、经典案例:秒杀系统的削峰填谷

场景:10万人抢购100件商品,瞬间流量极大。

架构

text

前端请求 → Nginx限流 → 应用层(验证) → RocketMQ(排队) → 库存服务(消费) → 数据库

代码要点

// 秒杀接口
@PostMapping("/seckill")
public String seckill(Long userId, Long productId) {
    // 1. 简单校验
    // 2. 发送消息(异步)
    rocketMQTemplate.syncSend("seckill-order", new SeckillMessage(userId, productId));
    return "排队中";
}

// 库存服务消费
@RocketMQMessageListener(topic = "seckill-order", consumerGroup = "stock-group")
public class SeckillConsumer implements RocketMQListener<SeckillMessage> {
    @Override
    public void onMessage(SeckillMessage msg) {
        // 数据库预减库存(乐观锁)
        int rows = stockMapper.decreaseStock(productId, version);
        if (rows > 0) {
            // 生成订单
        } else {
            // 库存不足,通知用户
        }
    }
}

结语

消息中间件是构建高并发、高可靠分布式系统的基石。RocketMQ和Kafka各有千秋,选择适合业务的组件并掌握其原理,能够让你在面对复杂场景时游刃有余。

学习建议

  1. 搭建单机/集群环境,熟悉常用命令和API。

  2. 压测对比不同参数下的吞吐量(消息大小、批量、压缩等)。

  3. 模拟故障:Broker宕机、网络分区、消息积压,观察如何恢复。

  4. 阅读源码:RocketMQ的CommitLogConsumeQueue;Kafka的ReplicaManagerLogSegment

下期预告:容器化与Kubernetes实战:从Docker到K8s全栈指南。
如果本文对你有帮助,欢迎点赞、收藏、转发

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐