ActiveMQ 全套自学教程:从入门到实践

📌 前言:什么是消息队列?

在正式学习 ActiveMQ 之前,我们先理解一个生活场景:你点了一份外卖,餐厅不需要等你本人过来取餐,而是通过外卖骑手将餐品送到你手中。在这个过程中,订单系统(生产者) -> 外卖平台(消息队列) -> 外卖骑手(消费者),三者被解耦开来,各自独立运行。

这就是消息队列的核心思想:异步通信、系统解耦、流量削峰

随着分布式架构的普及,各系统间需要在不同节点之间高效传递消息,同时应对流量洪峰进行削峰填谷。消息中间件正是为了解决这些问题而诞生的。

ActiveMQ 就是诸多消息中间件中的经典代表,市面上常见的还有 RabbitMQ、Kafka、RocketMQ 等。

第一章:ActiveMQ 概述

1.1 什么是 ActiveMQ?

ActiveMQ 是 Apache 软件基金会出品的一个开源消息中间件,采用纯 Java 语言编写,完全遵循 JMS(Java Message Service)1.1 规范。它诞生于分布式系统蓬勃发展的时期,专门用来解决各个系统组件之间的通信与协作问题。

简单来说,ActiveMQ 就像一个 "消息邮局":发送方把消息投递到邮局,接收方到邮局去取,发件人和收件人不需要同时在场,极大地提高了系统的灵活性和可靠性。

只要操作系统支持 Java 虚拟机,ActiveMQ 就可以稳定运行,具有出色的跨平台能力

1.2 历史地位与现状

  • 2004年:ActiveMQ 首次发布,基于 JMS 规范

  • 2007年:被 Apache 软件基金会纳入顶级项目

  • 2010年代:成为企业级应用 JMS 实现的首选

  • 2020年至今:虽然 Kafka 等新一代消息队列崛起,ActiveMQ 在高吞吐场景有所退让,但在企业级稳定场景中仍占据重要地位

正如一句总结所说:ActiveMQ 是 "企业级消息传递的稳健选择",Kafka 是 "大数据实时处理的性能之王"

1.3 核心特性一览

特性 说明
多协议支持 兼容 JMS 1.1/2.0、STOMP、AMQP、MQTT、OpenWire 等协议
多种消息模型 支持点对点(Queue)和发布/订阅(Topic)两种模式
消息持久化 支持 KahaDB、JDBC 等多种持久化方式,防止消息丢失
高可用性 支持主从架构(Master-Slave)和网络集群(Network of Brokers)
事务支持 支持 JMS 事务,保证消息的原子性发送与接收
Web 控制台 提供直观的 Web 管理界面,方便监控和运维
跨平台 纯 Java 实现,支持 Windows、Linux、macOS 等

第二章:JMS 规范基础(必学核心)

在学习 ActiveMQ 之前,必须理解它的根基——JMS 规范

2.1 什么是 JMS?

JMS(Java Message Service) 是 Java 平台中关于面向消息中间件(MOM)的一套 API 标准。它定义了 Java 应用程序之间进行异步消息通信的统一接口。

可以用一个类比来理解:JMS 就好比是 JDBC 接口,而 ActiveMQ 就是 MySQL 驱动实现。JDBC 提供了统一的数据库访问方式,JMS 提供了统一的消息收发方式。不同的厂商只要实现了这套规范,就能让应用程序流畅切换。

2.2 JMS 核心对象模型

JMS 定义了几个关键对象,它们是编写任何 ActiveMQ 程序的基础

text

ConnectionFactory(连接工厂)
    ↓ 创建
Connection(连接)
    ↓ 创建
Session(会话)
    ↓ 创建
Producer/Consumer(生产者/消费者) ←→ Destination(目的地,即 Queue 或 Topic)
    ↓ 发送/接收
Message(消息)

各对象详解:

对象 作用 通俗理解
ConnectionFactory 创建连接的工厂 就像 "邮政总局",负责建立通信链路
Connection 客户端与服务器之间的活动连接 就像是 "电话线",接通后才能通话
Session 生产和消费消息的单线程上下文 就像是 "一次会话",所有的收发都在此进行
Destination 消息的目标地址(Queue/Topic) 就像是 "收件地址",决定消息去哪
Producer 由 Session 创建,用于发送消息 "发件人"
Consumer 由 Session 创建,用于接收消息 "收件人"
Message 实际传输的数据载体 "信件本身"

Session 是特别重要的概念:它是一个单线程上下文,不仅用来创建生产者和消费者,还提供了事务性支持——可以把一组消息的发送和接收合并为一个原子操作。

2.3 消息结构

JMS 消息由三部分组成:

组成部分 说明
消息头(Header) 包含路由信息、优先级、过期时间等,由 JMS 提供者或发送者自动设置
消息属性(Property) 由发送者自定义的附加信息,用于消息过滤等场景
消息体(Body) 实际传输的数据,JMS 定义了 5 种类型

五种消息体类型:

类型 说明
TextMessage 文本消息,最常用,如 JSON 字符串
ObjectMessage 序列化的 Java 对象
BytesMessage 二进制字节消息
MapMessage 键值对消息
StreamMessage Java 流数据消息

第三章:ActiveMQ 安装与环境搭建

3.1 环境要求

  • JDK 8 或更高版本(ActiveMQ 5.x 推荐 JDK 8)

  • 2GB 以上内存

3.2 Windows 系统安装

Step 1:下载
访问 ActiveMQ 官网 http://activemq.apache.org/components/classic/download/,下载推荐版本(如 5.18.3),文件名类似 apache-activemq-5.18.3-bin.zip

Step 2:解压
解压到指定目录,例如 D:\activemq\apache-activemq-5.18.3

Step 3:启动
进入 bin\win64 目录(64位系统),双击 activemq.bat 脚本。

3.3 Linux 系统安装

bash

# 1. 安装 JDK(以 Ubuntu 为例)
sudo apt update
sudo apt install openjdk-11-jdk
java -version

# 2. 下载 ActiveMQ
wget https://archive.apache.org/dist/activemq/5.18.3/apache-activemq-5.18.3-bin.tar.gz

# 3. 解压到 /opt 目录
sudo tar -xzf apache-activemq-5.18.3-bin.tar.gz -C /opt

# 4. 进入目录并启动
cd /opt/apache-activemq-5.18.3
bin/activemq start

# 5. 查看日志确认启动
tail -f data/activemq.log

# 6. 停止服务
bin/activemq stop

3.4 验证安装

打开浏览器,访问:http://localhost:8161/admin/

默认用户名和密码都是 admin / admin

成功登录后,你会看到 ActiveMQ 的 Web 管理界面,可以在此监控和管理 Queues(队列) 和 Topics(主题)

3.5 端口说明

端口 用途
61616 JMS 连接端口(应用程序连接使用),支持 TCP/OpenWire 协议
8161 Web 管理控制台端口

如果需要修改端口,可以在 conf/activemq.xml 中修改 61616 端口,在 conf/jetty.xml 中修改 8161 端口。

💡 小贴士:启动如果遇到端口冲突,关闭其他占用端口的程序,或在配置文件中修改默认端口即可。

第四章:核心概念——Queue 与 Topic

ActiveMQ 支持两种核心的消息模型,理解它们的区别是正确使用 ActiveMQ 的关键。

4.1 点对点模型(P2P / Queue)

核心思想:消息生产者将消息发送到队列(Queue)中,每条消息只能被一个消费者接收并消费。即使队列有多个消费者在监听,消息也不会被重复消费。

特点

  • 🎯 一对一投递,消息被一个消费者消费后,队列中不再保留

  • ⏳ 生产者和消费者没有时间依赖,消费者随时可以来取积压的消息

  • ⚖️ 多消费者时可实现负载均衡——多个消费者轮询处理队列中的消息

  • 💾 消息会持久保存在队列中,直到被消费或过期

类比:就像取快递,每个包裹只能被一个人取走。

4.2 发布/订阅模型(Pub/Sub / Topic)

核心思想:消息生产者(发布者)将消息发送到主题(Topic)中,所有订阅该主题的消费者都会收到这个消息。

特点

  • 📢 一对多广播,消息会被所有订阅者接收

  • ⚠️ 有时间依赖性:消费者必须在生产者发送消息之前完成订阅,否则收不到已发布的消息(除非使用持久订阅)

  • 📡 适用于消息广播场景,如系统通知、配置更新等

类比:就像订阅微信公众号,关注之后你才能收到推送的文章。

4.3 Queue vs Topic 对比总结

对比维度 Queue(队列) Topic(主题)
消息消费方式 1对1,一条消息一个消费者 1对多,一条消息所有订阅者
消息存储 消息会持久保存直到被消费 默认不持久化,只在内存中
消费者时机 随时上线都能收到积压消息 必须在消息发送前订阅
典型场景 下单处理、任务分发 系统通知、实时推送
负载均衡 ✅ 天然支持 ❌ 每条消息所有订阅者都收

⚠️ 重要提醒:在发布/订阅模式中,如果消费者在消息发布后才启动,它将收不到之前的消息。需要保证消息不丢失时,请使用 持久订阅(Durable Subscriber)

第五章:第一个 Java 程序——收发消息

5.1 环境准备(Maven 项目)

在创建 Java 项目之前,先在 pom.xml 中添加 ActiveMQ 客户端依赖:

xml

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-client</artifactId>
    <version>5.18.3</version>
</dependency>

5.2 编写消息发送者(Producer)

java

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class MyProducer {
    public static void main(String[] args) throws JMSException {
        // 1. 创建连接工厂(指定 ActiveMQ 服务器地址)
        ConnectionFactory factory = 
            new ActiveMQConnectionFactory("tcp://localhost:61616");
        
        // 2. 创建连接并启动
        Connection connection = factory.createConnection();
        connection.start();
        
        // 3. 创建会话(参数:是否事务,确认模式)
        Session session = connection.createSession(
            false, Session.AUTO_ACKNOWLEDGE);
        
        // 4. 创建目的地(队列)
        Queue queue = session.createQueue("MyFirstQueue");
        
        // 5. 创建生产者并发送消息
        MessageProducer producer = session.createProducer(queue);
        TextMessage message = session.createTextMessage("Hello, ActiveMQ!");
        producer.send(message);
        
        System.out.println("消息发送成功!");
        
        // 6. 关闭资源
        session.close();
        connection.close();
    }
}

代码关键步骤说明:

  • createConnectionFactory:配置 ActiveMQ 服务器地址(默认端口 61616)

  • createSession(false, Session.AUTO_ACKNOWLEDGE):非事务模式 + 自动确认

  • createQueue("MyFirstQueue"):创建名为 MyFirstQueue 的队列

  • send(message):将消息发送到 Broker

5.3 编写消息接收者(Consumer)

java

import javax.jms.*;
import org.apache.activemq.ActiveMQConnectionFactory;

public class MyConsumer {
    public static void main(String[] args) throws JMSException {
        // 1-3 步与生产者相同
        ConnectionFactory factory = 
            new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection = factory.createConnection();
        connection.start();
        Session session = connection.createSession(
            false, Session.AUTO_ACKNOWLEDGE);
        
        // 4. 创建目的地(与生产者使用同一个队列名)
        Queue queue = session.createQueue("MyFirstQueue");
        
        // 5. 创建消费者
        MessageConsumer consumer = session.createConsumer(queue);
        
        // 6. 接收消息(同步阻塞方式)
        TextMessage message = (TextMessage) consumer.receive();
        System.out.println("收到消息: " + message.getText());
        
        // 7. 关闭资源
        session.close();
        connection.close();
    }
}

5.4 发布/订阅模式示例

只需将上面代码中的 Queue 替换为 Topic

java

// 生产者
Topic topic = session.createTopic("MyFirstTopic");
MessageProducer producer = session.createProducer(topic);
producer.send(message);

// 消费者
Topic topic = session.createTopic("MyFirstTopic");
MessageConsumer consumer = session.createConsumer(topic);

⚠️ 注意:Topic 模式下,消费者必须先启动并完成订阅,然后生产者发送消息,消费者才能收到。

第六章:Spring Boot 集成 ActiveMQ

在实际企业开发中,绝大多数项目都会使用 Spring Boot 框架。Spring Boot 对 ActiveMQ 提供了优秀的自动配置支持,可以大大简化开发工作。

6.1 添加 Maven 依赖

xml

<!-- ActiveMQ Starter(核心依赖) -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

<!-- 连接池支持(推荐,提升性能) -->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
</dependency>

6.2 配置文件(application.yml)

yaml

spring:
  activemq:
    broker-url: tcp://localhost:61616  # ActiveMQ 服务器地址
    user: admin                         # 用户名
    password: admin                     # 密码
    pool:
      enabled: true                     # 启用连接池
      max-connections: 10               # 最大连接数

  jms:
    pub-sub-domain: false               # false=Queue模式, true=Topic模式(全局配置)

如果你的项目中既有 Queue 又有 Topic,建议设为 false,然后通过代码层面分别配置。

6.3 配置 Bean(Queue 和 Topic)

java

@Configuration
public class ActiveMQConfig {
    
    @Bean
    public Queue queue() {
        return new ActiveMQQueue("springboot.queue");
    }
    
    @Bean
    public Topic topic() {
        return new ActiveMQTopic("springboot.topic");
    }
}

6.4 消息生产者(使用 JmsMessagingTemplate)

JmsMessagingTemplate 是 Spring 提供的消息发送工具类,能大幅简化代码。

java

@RestController
@RequestMapping("/message")
public class MessageController {
    
    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;
    
    @Autowired
    private Queue queue;
    
    @Autowired
    private Topic topic;
    
    // 发送队列消息
    @GetMapping("/sendQueue")
    public String sendQueueMsg(@RequestParam String msg) {
        jmsMessagingTemplate.convertAndSend(queue, msg);
        return "队列消息发送成功: " + msg;
    }
    
    // 发送主题消息
    @GetMapping("/sendTopic")
    public String sendTopicMsg(@RequestParam String msg) {
        jmsMessagingTemplate.convertAndSend(topic, msg);
        return "主题消息发送成功: " + msg;
    }
}

6.5 消息消费者(使用 @JmsListener)

java

@Component
public class MessageListener {
    
    // 监听队列消息
    @JmsListener(destination = "springboot.queue")
    public void receiveQueueMsg(String message) {
        System.out.println("收到队列消息: " + message);
        // 这里写你的业务处理逻辑
    }
    
    // 监听主题消息(需要在配置中启用 pub-sub)
    @JmsListener(destination = "springboot.topic", 
                 containerFactory = "topicListenerContainerFactory")
    public void receiveTopicMsg(String message) {
        System.out.println("收到主题消息: " + message);
    }
}

6.6 典型应用场景

ActiveMQ 在企业应用中的三个经典场景:

场景 说明 示例
异步任务处理 解耦耗时操作 用户注册后发送邮件、短信验证码
系统间通信 跨服务数据同步 订单系统通知库存系统扣减库存
流量削峰 缓冲高并发请求 秒杀活动时将请求放入队列,排队处理

第七章:消息持久化

ActiveMQ 的消息默认是持久化的,即消息会存入磁盘,即便 Broker 崩溃重启也不会丢失。理解消息持久化,是确保系统可靠性的关键。

7.1 消息的可靠性级别

级别 说明 直接成本 性能
持久性消息(PERSISTENT) ActiveMQ 默认模式,保证消息"只传送一次,成功使用一次"。可靠性是绝对优先的 持久化存储 I/O 较低
非持久性消息(NON_PERSISTENT) 最多传送一次,服务器重启后消息会丢失 减少存储开销 较高

7.2 三种持久化方式

① KahaDB(默认,强烈推荐

从 ActiveMQ 5.4 开始引入,基于文件的高性能日志存储,专门为消息持久化设计。数据被追加到日志文件中,当数据不再被需要时,日志文件会被自动清理。

② JDBC 持久化(基于数据库)

将消息存入 MySQL 等关系型数据库,会创建三张表:activemq_msgs(消息存储)、activemq_acks(确认信息)、activemq_lock(分布式锁)。Queue 和 Topic 的消息都存储在 activemq_msgs 表中。优点是便于跨节点恢复和统一管理,但性能相对较差。

③ LevelDB 持久化(高可用)

基于 Google 开发的 LevelDB 高性能键值存储引擎,虽然已经标记为弃用(推荐 KahaDB),但在基于 ZooKeeper 的高可用集群中仍然有重要作用。

7.3 配置持久化方式

在 conf/activemq.xml 中指定:

xml

<!-- KahaDB 持久化(默认,无需修改) -->
<persistenceAdapter>
    <kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>

<!-- JDBC 持久化示例 -->
<persistenceAdapter>
    <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
</persistenceAdapter>

第八章:消息确认机制(ACK)

消息确认机制决定了消息何时被视为 "已投递" 或 "已消费",直接影响消息丢失和重复消费的风险。ActiveMQ 支持三种确认模式:

确认模式 说明 适用场景
AUTO_ACKNOWLEDGE(自动确认) 默认模式,receive() 方法返回或 onMessage() 成功执行后,自动确认 对消息丢失不太敏感的场景
CLIENT_ACKNOWLEDGE(客户端手动确认) 在消费端显式调用 message.acknowledge() 才会确认 必须先确保业务处理完成再确认的场景
SESSION_TRANSACTED(事务会话) 一组消息在一个事务中进行,可批量提交或回滚 需要原子性发送/接收的场景

代码示例——手动确认模式

java

// 创建会话时指定手动确认
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

// 消费端
Message message = consumer.receive();
// ... 处理业务逻辑 ...
message.acknowledge();  // 业务处理成功后手动确认

第九章:高级特性速览

9.1 持久订阅(Durable Subscriber)

在 Topic 模式下,普通消费者如果离线,就会错过离线期间的消息。持久订阅则能解决这个问题:即便消费者离线,ActiveMQ 也会为其保存消息,等到它重新上线后投递

创建持久订阅的关键代码:

java

// 设置客户端ID(订阅者唯一标识)
connection.setClientID("myDurableSubscriber");
// 创建持久订阅者
MessageConsumer consumer = 
    session.createDurableSubscriber(topic, "subscriptionName");

9.2 虚拟主题(Virtual Topics)

虚拟主题是 ActiveMQ 的一个高级特性,结合了 Queue 和 Topic 的优点:既能像 Topic 一样广播消息,又能像 Queue 一样让消费者持久接收

9.3 消息过滤与选择器

消费者可以通过选择器有选择地接收消息:

java

// 只接收 priority > 5 且 type = ' urgent ' 的消息
String selector = "priority > 5 AND type = 'urgent'";
MessageConsumer consumer = session.createConsumer(queue, selector);

9.4 死信队列(DLQ)

如果消息因为某些原因无法被正常消费(如消息格式错误),ActiveMQ 会将其转入死信队列(Dead Letter Queue),避免阻塞正常消息处理。

9.5 延迟/定时消息

ActiveMQ 支持在指定时间后投递消息,实现定时任务的效果:

java

TextMessage message = session.createTextMessage("延迟消息");
// 设置延迟 30 秒投递
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 30000);
producer.send(message);

第十章:集群与高可用架构

生产环境中,单台 ActiveMQ 存在单点故障风险。通过集群和主从架构,可以有效提升可用性。

10.1 主从架构(Master-Slave)

ActiveMQ 主从架构的原理是:在多个 Broker 实例之间共享持久化存储。只有一台 "主" 服务器对外提供读写服务,其余 "从" 服务器则处于待命状态等待随时接管。一旦主服务器宕机,某个从服务器会迅速获得持久化存储的锁,升级为新的主服务器,从而保障客户端能够持续正常运转。

三种主从实现方式对比:

方式 原理 优缺点
共享文件系统(KahaDB) 多个 Broker 共享同一个 KahaDB 数据目录 简单易配,但依赖共享存储的可靠性
JDBC 主从 多个 Broker 共享同一个数据库 可利用数据库高可用,但性能较低
ZooKeeper + LevelDB 通过 ZooKeeper 注册所有 Broker 并进行选举 自动故障转移,生产环境推荐方案

10.2 ZooKeeper + LevelDB 集群方案

这是目前生产环境最推荐的高可用方案,从 ActiveMQ 5.9 开始引入。

工作原理

  1. 在多个独立的物理机或虚拟机上都分别安装并运行 ActiveMQ,这些节点通过 ZooKeeper 进行注册和协调

  2. ZooKeeper 从各节点中选举一个作为 Master(主节点),其余作为 Slave(从节点)

  3. Master 负责提供服务,Slave 只与 Master 同步数据且不接受客户端连接

  4. 如果 Master 宕机,ZooKeeper 自动从 Slave 中选举新的 Master

  5. 故障节点恢复后自动加入集群成为 Slave

优势:自动故障转移、数据多副本保障不丢失,而代价是需要额外维护一个 ZooKeeper 集群。

10.3 网络集群(Network of Brokers)

适用于需要负载均衡跨 Broker 消息路由的场景。多个独立的 Broker 组成一个网络,互相转发消息,实现消息在不同 Broker 之间的自由流动。

一个实用的组合策略是:Master-Slave 负责高可用,Network of Brokers 负责负载均衡,两者搭配使用:先搭建多组主从实现高可用,再通过网络集群串联这些主从组来实现横向扩展。

第十一章:监控与运维

11.1 Web 管理控制台

ActiveMQ 自带功能全面的 Web 后台管理界面,访问地址 http://localhost:8161/admin/,默认账号密码均为 admin

登录后可以实时查看:

  • 📊 Queues 和 Topics 的消息数量、消费者数量

  • 🗑️ 手动删除/移动消息

  • ⚙️ Broker 运行状态和配置信息

11.2 关键性能优化参数

初学者最容易踩坑的地方在于性能配置不当,只需关注这四个参数即可解决大部分问题:

优化项 配置参数 作用
预取限制 prefetchLimit 控制消费者单次获取消息的数量,防止个别消费者 "囤积" 太多消息导致其他消费者饥饿
异步发送 useAsyncSend=true 生产者改为异步发送,大幅提升吞吐量
内存分配 ACTIVEMQ_OPTS 环境变量 根据服务器负载调整 JVM 内存大小
死信队列 自动启用(DLQ) 处理无法投递的消息,避免阻塞正常队列

11.3 常见问题排查

问题 可能原因 解决思路
消息堆积 消费速度跟不上生产速度 增加消费者数量、优化消费逻辑、设置消息过期时间
连接不上 网络配置或防火墙 检查 61616 和 8161 端口是否开放
内存溢出 JVM 内存配置过低 调整 ACTIVEMQ_OPTS 中的 -Xmx 参数

🎯 学习路线图

对于小白同学,建议按照以下顺序逐步学习:

text

第一步:理解核心概念
  ├── 什么是消息队列?为什么需要它?
  └── JMS 规范的基础对象模型(Connection、Session、Producer/Consumer)

第二步:动手实践
  ├── 安装 ActiveMQ → 访问 Web 控制台
  ├── 编写纯 Java 程序收发消息(Queue 和 Topic)
  └── 理解 Queue 和 Topic 的核心区别

第三步:Spring Boot 集成
  ├── 引入依赖和配置
  ├── 使用 JmsMessagingTemplate 发送消息
  └── 使用 @JmsListener 接收消息

第四步:深入进阶
  ├── 消息持久化(KahaDB、JDBC)
  ├── 消息确认机制(AUTO / CLIENT / 事务)
  ├── 持久订阅和死信队列
  └── 集群与高可用部署

❓ 常见面试题精选

Q1:ActiveMQ 的消息模型有哪些?有什么区别?

答:主要有点对点(Queue)和发布/订阅(Topic)两种。Queue 中一条消息只能被一个消费者消费,消息会持久保存;Topic 中一条消息会被所有订阅者收到,消费者必须在消息发送前订阅。Queue 适用于任务分发,Topic 适用于消息广播。

Q2:如何保证消息不丢失?

答:从三个环节保障——① 生产者发送时使用持久化消息(默认);② Broker 端使用持久化存储(KahaDB/JDBC)并及时 fsync 刷盘;③ 消费者端使用手动确认(CLIENT_ACKNOWLEDGE),确保消息处理完成后才确认。

Q3:ActiveMQ 如何实现高可用?

答:常用方案是 ZooKeeper + LevelDB 主从集群。ZooKeeper 负责选举 Master,LevelDB 负责数据持久化和同步。Master 对外提供服务,Slave 实时同步数据,Master 宕机时 ZooKeeper 自动选举新的 Master 实现故障转移。

Q4:如何处理消息堆积?

答:首先增加消费者数量进行分流;其次优化消费端的业务处理逻辑,缩短单条消息处理时间;此外可以设置消息过期时间(TTL),避免消息无限期积压;最后合理配置 prefetchLimit 预取限制,防止消息分配不均。

📖 推荐资源

祝学习愉快!遇到问题可以先去 Web 控制台的 Queues 页面检查消息状态,这是排查问题最直观有效的方式。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐