RabbitMQ 基础指南
一、简介
RabbitMQ 是一个开源的消息代理(Message Broker),实现了 AMQP(Advanced Message Queuing Protocol)协议,采用 Erlang 语言开发。它以高可靠、灵活路由、易用著称,广泛应用于异步处理、应用解耦、流量削峰、分布式事务等场景。
1.1 核心特点
- 高可靠性:支持持久化、消息确认、集群镜像队列,支持自动故障转移
- 灵活路由:通过多种交换机(Direct、Topic、Fanout、Headers)实现复杂路由
- 多语言客户端:Java、Python、Go、Node.js、Ruby、C# 等
- 易管理:Web 管理界面(15672)、HTTP API、命令行工具
- 生态丰富:插件支持延时队列、死信队列、消息追踪等
1.2 核心概念
- Broker:RabbitMQ 服务实例,负责消息接收、存储、转发,默认端口 5672(AMQP)、15672(Web 管理)。
- Virtual Host(vhost):逻辑隔离空间(类似数据库),不同 vhost 资源独立,用于多租户 / 多环境隔离。
- Connection:客户端与 Broker 的 TCP 长连接。
- Channel:Connection 内的虚拟连接(轻量级),所有操作(发消息、声明队列)都在 Channel 上执行,减少 TCP 开销。
- Producer:消息发送方,将消息发送到 Exchange(不直接发队列)。
- Exchange:消息路由中心,不存储消息,按类型 + 绑定规则将消息转发到队列。
- Queue:消息存储容器,FIFO 顺序,支持持久化、优先级、惰性等配置。
- Binding:Exchange 与 Queue 的绑定关系,通过 Routing Key 匹配路由。
- Consumer:消息接收方,监听队列并消费,支持自动 / 手动确认。
1.3 典型应用场景
- 异步处理:用户注册后发送邮件/短信,不阻塞主流程。
- 应用解耦:订单系统和库存系统通过消息通信,降低直接依赖。
- 流量削峰:秒杀请求写入队列,后端逐步处理,保护数据库。
- 分布式事务:结合最终一致性(如本地消息表 + 消息确认)。
- 日志收集:各服务将日志发送到 RabbitMQ,统一消费处理。
1.4 环境搭建
(1)安装依赖(Erlang)
RabbitMQ 基于 Erlang,需先安装对应版本(版本匹配很重要):
- Linux:apt install erlang / yum install erlang
- Windows/macOS:官网下载安装包
(2)安装 RabbitMQ
可以通过下载官方安装包进行安装,或者直接使用 Docker 启动 RabbitMQ 容器。
-
docker拉取RabbitMQ镜像
docker pull rabbitmq -
使用官方定义的端口启动 RabbitMQ 容器
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:latest -
启动后访问管理后台:http://localhost:15672,默认账号 guest/guest(仅本地可用)。
(3)客户端依赖(以 Java/Spring Boot 为例)
<!-- Spring Boot AMQP -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
二、系统架构
2.1 逻辑架构图

2.2 核心组件
2.2.1 Virtual Host(虚拟主机)
- 作用:逻辑隔离,不同的 vhost 拥有独立的 exchange、queue、binding。
- 权限管理:可按 vhost 分配用户权限(读、写、配置)。
- 默认:/(根 vhost)。
# 创建 vhost
rabbitmqctl add_vhost my_vhost
# 设置权限
rabbitmqctl set_permissions -p my_vhost user ".*" ".*" ".*"
2.2.2 Exchange(交换器)
接收生产者发送的消息,并根据路由规则将消息分发到队列。四种常用类型:
| 类型 | 路由规则 | 适用场景 |
|---|---|---|
| Direct | Routing Key 完全匹配 | 点对点、任务分发 |
| Topic | Routing Key 模式匹配(* 匹配一个词,# 匹配零个或多个) | 多维度分类(如日志分级) |
| Fanout | 广播给所有绑定的队列 | 事件广播、配置更新 |
| Headers | 匹配消息 Header 属性(忽略 Routing Key) | 复杂过滤规则 |
Direct Exchange(直连交换器)
路由规则:Routing Key 完全匹配。
Producer: routing_key = "error"
│
▼
┌───────────────┐
│ Direct Exchange│
└───────┬───────┘
│
┌───────────┼───────────┐
│ │ │
binding: binding: binding:
"error" "info" "warning"
│ │ │
▼ ▼ ▼
┌───────┐ ┌───────┐ ┌───────┐
│Queue A│ │Queue B│ │Queue C│
│(error)│ │(info) │ │(warn) │
└───────┘ └───────┘ └───────┘
应用场景:
- 任务分发(按任务类型)
- 日志分级(error/info/debug)
代码示例:
# 生产者
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
channel.basic_publish(
exchange='direct_logs',
routing_key='error',
body='Disk full error'
)
# 消费者(只关心 error 日志)
channel.queue_bind(exchange='direct_logs', queue='error_queue', routing_key='error')
Topic Exchange(主题交换器)
路由规则:Routing Key 模式匹配。
| 通配符 | 含义 | 示例 |
|---|---|---|
| * | 匹配一个单词 | kern.* → kern.critical |
| # | 匹配零个或多个单词 | kern.# → kern.critical.disk |
Producer: routing_key = "kern.critical"
│
▼
┌───────────────┐
│ Topic Exchange│
└───────┬───────┘
│
┌───────────┼───────────┐
│ │ │
binding: binding: binding:
"kern.*" "*.error" "#"
│ │ │
▼ ▼ ▼
┌───────┐ ┌───────┐ ┌───────┐
│Queue A│ │Queue B│ │Queue C│
│(kernel│ │(error │ │(all │
│ logs) │ │ logs) │ │ logs) │
└───────┘ └───────┘ └───────┘
应用场景:
- 多维度分类(如日志:service.env.level)
- 事件驱动架构(如 user.created、order.paid)
代码示例:
# 生产者
channel.basic_publish(
exchange='topic_events',
routing_key='user.created.v1',
body='User 123 created'
)
# 消费者:接收所有用户事件
channel.queue_bind(exchange='topic_events', queue='user_queue', routing_key='user.*')
# 消费者:接收所有 v1 事件
channel.queue_bind(exchange='topic_events', queue='v1_queue', routing_key='*.v1')
Fanout Exchange(扇出交换器)
路由规则:广播到所有绑定的队列(忽略 Routing Key)。
Producer: any routing_key
│
▼
┌───────────────┐
│ Fanout Exchange│
└───────┬───────┘
│
┌───────────┼───────────┬───────────┐
│ │ │ │
▼ ▼ ▼ ▼
┌───────┐ ┌───────┐ ┌───────┐ ┌───────┐
│Queue A│ │Queue B│ │Queue C│ │Queue D│
└───────┘ └───────┘ └───────┘ └───────┘
应用场景:
- 广播通知(配置更新、缓存刷新)
- 日志收集到多个系统
代码示例:
# 生产者
channel.exchange_declare(exchange='broadcast', exchange_type='fanout')
channel.basic_publish(exchange='broadcast', routing_key='', body='Config updated')
# 所有绑定到此 exchange 的队列都收到消息
Headers Exchange(头交换器)
路由规则:匹配消息的 Header 属性(忽略 Routing Key)。
Producer: headers = {"format": "pdf", "type": "report"}
│
▼
┌───────────────┐
│Headers Exchange│
└───────┬───────┘
│
┌───────────┼───────────┐
│ │ │
binding: binding: binding:
x-match=all x-match=any x-match=all
format=pdf format=pdf format=image
type=report type=report type=photo
│ │ │
▼ ▼ ▼
┌───────┐ ┌───────┐ ┌───────┐
│Queue A│ │Queue B│ │Queue C│
└───────┘ └───────┘ └───────┘
配置示例:
# 消费者绑定(x-match=all 表示所有 header 都必须匹配)
args = {
"x-match": "all",
"format": "pdf",
"type": "report"
}
channel.queue_bind(exchange='headers_logs', queue='pdf_report', arguments=args)
# 生产者发送消息
headers = {"format": "pdf", "type": "report"}
props = pika.BasicProperties(headers=headers)
channel.basic_publish(exchange='headers_logs', routing_key='', body='PDF report', properties=props)
2.2.3 Queue(队列)
- 消息存储:FIFO 队列,支持持久化(durable)和临时队列(exclusive/auto-delete)。
- 属性:
- durable:队列持久化( Broker 重启后队列依然存在,但消息不一定持久化)。
- exclusive:仅创建它的连接可用,连接断开后自动删除。
- auto-delete:最后一个消费者取消后自动删除。
惰性队列(Lazy Queue):
# 将消息直接写入磁盘,减少内存占用,适合大消息、长队列
arguments = {"x-queue-mode": "lazy"}
channel.queue_declare(queue='my_queue', durable=True, arguments=arguments)
2.2.4 Binding(绑定)
- 定义:Exchange 与 Queue 之间的关联规则。
- 绑定键:根据 Exchange 类型,匹配 Routing Key。
# 绑定队列到交换器
channel.queue_bind(exchange='direct_logs', queue='error_queue', routing_key='error')
2.2.5 Connection 与 Channel
- Connection:TCP 长连接,由客户端创建。
- Channel:逻辑连接,在 TCP 连接上复用,减少资源消耗。
# Python 示例
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel() # 创建一个 channel,实际复用 TCP 连接
设计原因:
- 每个线程/协程一个 channel,避免 TCP 握手开销。
- 单个 TCP 连接可承载数千个 channel。
2.3 消息完整流转路径
Producer (publish)
│
▼
1. Connection / Channel 建立
│
▼
2. Exchange (接收消息 + Routing Key)
│
▼
3. Binding (匹配规则)
├── 匹配 Queue A → 路由到 Queue A
├── 匹配 Queue B → 路由到 Queue B
└── 无匹配 →
├── mandatory=true → 返回 Basic.Return
└── mandatory=false → 丢弃
2.4 消息消费模式
推模式(Push):RabbitMQ 主动推送消息给消费者。
- 优点:实时性好,低延迟。
- 缺点:消费者压力不可控。
# 自动推送消息到消费者
channel.basic_consume(queue='my_queue', on_message_callback=callback, auto_ack=False)
channel.start_consuming()
拉模式(Pull):消费者主动拉取消息。
- 优点:消费者控制节奏。
- 缺点:需要轮询,有延迟。
# 主动拉取一条消息
method, properties, body = channel.basic_get(queue='my_queue', auto_ack=False)
三、经典工作模式
3.1 简单模式
是最简单的消息传输模式,它包含一个生产者、一个消费者和一个队列。生产者向队列里发送消息,消费者从队列中获取消息并消费。使用的是默认的 Direct 交换机类型。
3.2 工作队列
是指向多个互相竞争的消费者发送消息的模式,由一个生产者、一个队列和多个消费者组成。多个消费者订阅同一个队列,当有消息进入队列时,这多个消费者会竞争去获取队列的消息消费,对于任务过重或任务较多的情况使用工作队列可以提高任务处理速度。使用的也是默认的 Direct 交换机类型。
3.3 发布 / 订阅
该模式下,生产者将消息发送到交换机上,再根据该交换机所绑定的队列,将要发送的消息存放到这些队列中,最终将被订阅了这些队列的消费者消费掉,这种方式也就导致了一个消息可能会被多个消费者同时消费。这种模式下交换机的类型为 Fanout。
如上图所示:
- 生产者发送消息到交换机后,消息会同时存放到队列1和队列2中。
3.4 路由模式
这种模式下,生产者将消息发送到交换机时,会携带一个路由键Routing key。只有当这个Routing key与交换机绑定队列的binding key完全匹配时,这个消息就会进入到该binding key所对应的队列中。一个队列可以由不同的binding key来绑定,Routing key与其中任何一个binding key匹配都会进入到与之对应的队列中。 同一个binding key也可以绑定多个不同的队列,这时则跟fanout模式类似,当Routing key与这个binding key匹配时,则消息会进入到与这个binding key对应的多个队列中。此模式下交换机的类型为Direct。
如上图所示:
- 当Routing key为key1时,消息会存放到队列1和队列3中;
- 当Routing key为key2或者key3时,消息会存放到队列2中。
3.5 主题模式
该模式与路由模式类似也是通过路由键去匹配要发送消息的队列,只不过该方式的Routing key必须具有固定格式:以.间隔的一串单词,比如:com.rabbit.message,且这种格式可以使用通配符*或者#来进行模糊匹配,*可以替代一个单词,#可以替代 0 或多个单词。该模式下的交换机类型为Topic,且Routing key 最多不能超过255byte。
如上图所示:
- 当消息的Routing key为三个单词,且中间的单词为 rabbit 时,消息会存放到队列1中;
- 当Routing key以 com 开头时,消息会存放到队列2中;
- 当Routing key为三个单词,且最后一个单词为message的时候,消息会存放到队列3中。
3.6 Headers 模式
该模式下交换机的类型为Headers,不依赖于路由键的匹配规则来路由消息,是通过Headers头部来将消息映射到队列的,Headers头部携带一个Hash结构,Hash结构中要求携带一个键"x-match",这个键的Value可以是any或者all,这代表消息携带的Hash是需要全部匹配(all),还是仅匹配一个键(any)就可以了。当发送消息到交换器时,RabbitMQ会获取到该消息的headers,对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对。如果完全匹配,则路由该消息到此队列中。相比直连交换机,首部交换机的优势是匹配的规则不被限定为字符串String类型。headers类型的交换器的性能很差,不建议使用。
四、Queue 到 Consumer 的分发机制
4.1 轮询分发(Round-Robin)
默认行为:RabbitMQ 按顺序将消息轮流发送给消费者。
Queue: [M1, M2, M3, M4, M5, M6]
│
┌──────┼──────┬──────┐
│ │ │ │
▼ ▼ ▼ ▼
Consumer1 Consumer2 Consumer3 Consumer4
M1 M2 M3 M4
M5 M6
问题:如果消息处理时间不同,会导致负载不均。
4.2 公平分发(QoS Prefetch)
原理:限制消费者未确认消息的数量,避免快消费者闲置、慢消费者堆积。
机制:
- 队列给消费者最多同时推送 N 条消息(N = prefetch_count)
- 消费者必须 ACK 确认 后,才能收到下一条
- 谁处理得快,队列就给谁发更多消息
# 设置预取计数为 1(一次只发一条消息,确认后才发下一条)
channel.basic_qos(prefetch_count=1)
Queue: [M1, M2, M3, M4, M5, M6]
│
┌──────┼──────┐
│ │ │
▼ ▼ ▼
Consumer1 Consumer2 Consumer3
(处理慢) (处理快) (处理中)
M1 M2 M3
(等待) M4
M5
常见配置:
- prefetch_count=1:严格能者多劳,最稳
- prefetch_count=10~100:批量消费,提高吞吐量
- prefetch_count=0:不限制(默认,就是轮询)
4.3 优先级队列
配置:
# 声明优先级队列(最高优先级 10)
args = {"x-max-priority": 10}
channel.queue_declare(queue='priority_queue', arguments=args)
# 发送高优先级消息
props = pika.BasicProperties(priority=9)
channel.basic_publish(exchange='', routing_key='priority_queue', body='urgent', properties=props)
分发表:高优先级消息先被分发。
4.4 延迟队列(通过插件或 TTL)
消息分发的延迟:
Producer → Delay Exchange → (等待 TTL) → Dead Letter Exchange → Real Queue → Consumer
代码示例:
# 延迟交换器插件方式
channel.exchange_declare(
exchange='delayed_exchange',
exchange_type='x-delayed-message',
arguments={'x-delayed-type': 'direct'}
)
# 发送延迟 30 秒的消息
headers = {"x-delay": 30000}
props = pika.BasicProperties(headers=headers)
channel.basic_publish(exchange='delayed_exchange', routing_key='my_queue', body='delayed', properties=props)
五、消息确认机制
RabbitMQ 的消息确认机制是保证消息可靠传递的核心,分为生产者确认(确保消息正确到达 Broker)和消费者确认(确保消息被成功处理)。两者共同构成了 RabbitMQ 的 可靠投递 体系。
5.1 生产者确认(Publisher Confirm)
生产者发送消息后,Broker 需要反馈确认,让生产者知道消息是否已被正确接收。RabbitMQ 提供了两种途径:事务(Transaction) 和 发布确认(Publisher Confirm)。由于事务性能极差(比 Confirm 慢数百倍),生产环境几乎不使用。
5.1.1 开启 Confirm 模式
Channel channel = connection.createChannel();
channel.confirmSelect(); // 开启发布确认
5.1.2 三种确认方式
| 方式 | 说明 | 适用场景 |
|---|---|---|
| 普通确认 | 单条发送,同步等待 waitForConfirms() | 低吞吐,每一条都确认 |
| 批量确认 | 发送一批消息,等待批量结果 | 吞吐较高,但一旦失败需重试整个批次 |
| 异步确认 | 注册 ConfirmListener,回调处理成功/失败 | 高吞吐,最佳性能 |
普通确认:
channel.confirmSelect();
channel.basicPublish("", queue, null, msg.getBytes());
if (channel.waitForConfirms()) {
System.out.println("发送成功");
} else {
// 失败处理(极少发生,因为在waitForConfirms时已经内部重试)
}
批量确认:
int batchSize = 100;
for (int i = 0; i < 1000; i++) {
channel.basicPublish("", queue, null, msg.getBytes());
if ((i+1) % batchSize == 0) {
channel.waitForConfirms(); // 阻塞直到这批全部确认或失败
}
}
异步确认:
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) {
// deliveryTag: 消息序号(从1开始)
// multiple: true 表示该序号之前的所有消息都已确认
System.out.println("Ack: " + deliveryTag);
}
@Override
public void handleNack(long deliveryTag, boolean multiple) {
System.out.println("Nack: " + deliveryTag);
// 可以重新发送未确认的消息
}
});
// 发送消息...
5.1.3 确认的条件
- 成功确认(Ack):消息被正确写入所有需要同步的队列(取决于队列的持久化、镜像策略等)。
- 失败确认(Nack):协议错误、路由不可达(未设置 mandatory)、内部错误等情况下会 Nack。
5.1.4 与 Return 机制的区别
- Confirm:保证消息到达交换机。即使交换机没有绑定任何队列,也会 Ack(如果没有设置 mandatory)。如果需要保证消息被路由到至少一个队列,必须配合 mandatory=true 或使用备份交换机(Alternate Exchange)。
- Return:当消息无法路由到队列时,若设置了 mandatory=true,Broker 会通过 ReturnListener 将消息返回给生产者,并附带原因(如 no queues)。
最佳实践:同时开启 Confirm 和 Return,确保消息不丢失且能正确路由。
channel.confirmSelect();
channel.addReturnListener((replyCode, replyText, exchange, routingKey, props, body) -> {
System.out.println("消息未路由: " + new String(body));
// 处理未路由的消息(如重新发送、记录日志)
});
channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
channel.waitForConfirms(); // 或异步确认
5.2 消费者确认(Consumer Acknowledgement)
消费者从队列中获取消息后,需要告知 Broker 是否已成功处理。如果未确认,Broker 不会删除该消息,并可重新投递给其他消费者。
5.2.1 自动确认 vs 手动确认
| 模式 | 配置 | 行为 | 风险 |
|---|---|---|---|
| 自动确认 | autoAck=true | 消息发送给消费者后立即从队列删除 | 消费者崩溃 → 消息丢失 |
| 手动确认 | autoAck=false | 等待消费者显式调用 basicAck 才删除 | 安全,但需正确管理确认 |
5.2.2 手动确认 API
channel.basicConsume(queue, false, (consumerTag, delivery) -> {
try {
process(delivery.getBody());
// 单条确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
// 拒绝并重新入队(requeue=true)
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
}
}, consumerTag -> {});
- basicAck:肯定确认,删除消息。
- basicNack:否定确认,可批量拒绝,并可选择是否重新入队。
- basicReject:同 Nack,但不能批量(一次只能拒绝一条)。
5.2.3 重新入队与死信
- 重新入队(requeue=true):消息被重新放回队列头部(可能立即再次投递给同一消费者,导致死循环)。应避免无限重试,可结合死信队列。
- 死信队列(DLX):消息被拒绝(requeue=false)、过期、队列达到最大长度时,可发送到死信交换机,最终进入死信队列以便人工处理。
// 拒绝并禁止重新入队 → 进入死信队列(若队列配置了 x-dead-letter-exchange)
channel.basicNack(deliveryTag, false, false);
5.2.4 预取计数(Prefetch)
为避免一次性推送过多未确认消息导致消费者内存溢出或处理延迟,应限制消费者未确认消息的数量。
// 每次只推送 1 条未确认消息
channel.basicQos(1);
basicQos 设置的是预取阈值,当消费者未确认的消息数达到该值时,Broker 将停止推送新消息。
六、消息幂等性
6.1 什么是消息幂等性?
幂等性(Idempotence)是指:同一个操作执行一次与执行多次所产生的影响相同。在消息队列(RabbitMQ)场景中,消息幂等性意味着:同一条消息被消费者重复消费多次时,对业务系统造成的结果与只消费一次完全一致。
6.2 为什么会出现重复消费?
RabbitMQ 为保证消息不丢,采用手动 ACK + 重试机制,这是重复消费的根本原因:
- 消费者未及时 ACK:业务处理完成,但网络波动 / 消费者宕机,未发送 ACK,MQ 认为消息未消费,重新投递。
- 生产者重试:生产者未收到 Broker 的 Confirm 确认,重试发送同一条消息(消息 ID 不变)。
- MQ 故障转移:集群节点故障、队列主从切换,导致少量消息重复投递。
- 消费失败重试:业务异常,消息重回队列,多次重试导致重复消费。
- 网络分区 / 脑裂:集群网络隔离,恢复后消息重复同步。
后果:库存重复扣减、订单重复创建、资金重复转账、数据不一致。
6.3 幂等性设计核心原则
- 全局唯一 ID:每条消息必须携带全局唯一标识(Message ID / 幂等 Key),作为去重依据。
- 先校验后处理:消费前先校验 ID 是否已处理,已处理则直接 ACK 跳过。
- 原子性操作:校验 ID 与执行业务必须原子化,避免并发重复处理。
- 过期清理:已处理 ID 需设置过期时间,防止存储无限膨胀。
- 业务幂等优先:优先通过业务逻辑实现幂等(如状态机、唯一约束),技术方案兜底。
6.4 主流幂等性实现方案
方案 1:唯一 ID + Redis 分布式锁
原理:
- 生产者生成全局唯一 ID(UUID/Snowflake/ 业务唯一键),放入消息属性(messageId)。
- 消费者消费时,用 Redis SETNX(原子性)尝试以消息 ID 为 Key 加锁。
- 加锁成功:执行业务,处理完成后设置 Key 过期时间(避免内存泄漏)。
- 加锁失败:说明消息已处理,直接 ACK 跳过。
实现步骤:
-
生产者生成唯一 ID
// 发送消息时设置全局唯一ID MessageProperties props = new MessageProperties(); props.setMessageId(UUID.randomUUID().toString()); // 全局唯一ID Message message = new Message("order_content".getBytes(StandardCharsets.UTF_8), props); rabbitTemplate.send("order.exchange", "order.routing.key", message); -
消费者 Redis 去重
@RabbitListener(queues = "order.queue") public void consume(Message message, Channel channel) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); String messageId = message.getMessageProperties().getMessageId(); String redisKey = "mq:consumed:" + messageId; // 1. Redis SETNX 原子加锁,过期时间3天(覆盖重试周期) Boolean lock = redisTemplate.opsForValue().setIfAbsent(redisKey, "processed", 3, TimeUnit.DAYS); if (Boolean.FALSE.equals(lock)) { // 已处理,直接ACK channel.basicAck(deliveryTag, false); log.info("消息{}已处理,跳过", messageId); return; } try { // 2. 执行业务逻辑(如创建订单、扣减库存) orderService.createOrder(new String(message.getBody())); // 3. 业务成功,手动ACK channel.basicAck(deliveryTag, false); } catch (Exception e) { log.error("消息{}处理失败", messageId, e); // 4. 处理失败:不ACK,重回队列重试(或死信) channel.basicNack(deliveryTag, false, true); // 失败释放锁,允许重试 redisTemplate.delete(redisKey); } }
优缺点:
- 优点:性能极高(Redis 单线程 + 内存操作)、原子性强、支持高并发、实现简单。
- 缺点:依赖 Redis 可用性;需设置合理过期时间;极端情况下(Redis 宕机)可能短暂失效。
- 适用:高并发、短周期(24h-7 天)、核心业务(订单、支付)。
方案 2:数据库唯一约束
原理:
- 建立已消费消息表,message_id 设为主键 / 唯一索引。
- 消费者先尝试插入 message_id,唯一约束冲突则说明已处理,直接跳过。
- 插入成功:执行业务,业务与插入操作在同一事务中,保证原子性。
实现步骤:
-
创建已消费消息表
CREATE TABLE consumed_message ( message_id VARCHAR(64) PRIMARY KEY COMMENT '消息全局唯一ID', queue_name VARCHAR(100) NOT NULL COMMENT '队列名称', consume_time DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '消费时间', status TINYINT DEFAULT 1 COMMENT '1-成功 0-失败' ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; -
消费者代码(事务保证原子性)
@RabbitListener(queues = "order.queue") @Transactional // 事务保证插入与业务原子性 public void consume(Message message, Channel channel) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); String messageId = message.getMessageProperties().getMessageId(); String queueName = message.getMessageProperties().getConsumerQueue(); try { // 1. 尝试插入已消费记录(唯一约束冲突则抛DuplicateKeyException) consumedMessageMapper.insert(new ConsumedMessage(messageId, queueName, new Date(), 1)); } catch (DuplicateKeyException e) { // 已处理,直接ACK channel.basicAck(deliveryTag, false); log.info("消息{}已处理,跳过", messageId); return; } try { // 2. 执行业务逻辑 orderService.createOrder(new String(message.getBody())); // 3. 手动ACK channel.basicAck(deliveryTag, false); } catch (Exception e) { log.error("消息{}处理失败", messageId, e); // 事务回滚,插入操作撤销,允许重试 channel.basicNack(deliveryTag, false, true); TransactionAspectSupport.currentTransactionStatus().setRollbackOnly(); } }
优缺点:
- 优点:不依赖第三方组件、数据库天然保证唯一性、事务原子性强、可审计。
- 缺点:高并发下数据库插入性能瓶颈;需定期清理历史数据。
- 适用:低频、长周期、需审计、无 Redis 环境的业务。
方案 3:业务状态机(订单 / 支付场景最优)
原理:
- 业务数据自带状态字段(如订单状态:待支付→已支付→已完成)。
- 消费者处理前先查询状态,仅在允许状态下执行,否则跳过。
- 状态变更原子性(数据库更新 + 条件判断),天然幂等。
实现示例(订单支付):
@RabbitListener(queues = "pay.queue")
@Transactional
public void consumePayMessage(Message message, Channel channel) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
PayMessage payMsg = JSON.parseObject(message.getBody(), PayMessage.class);
String orderId = payMsg.getOrderId();
// 1. 查询订单状态(仅待支付状态可处理)
Order order = orderMapper.selectById(orderId);
if (order == null || !OrderStatus.WAIT_PAY.equals(order.getStatus())) {
channel.basicAck(deliveryTag, false);
log.info("订单{}状态异常,跳过", orderId);
return;
}
try {
// 2. 原子性更新订单状态(条件:状态=待支付)
int updateCount = orderMapper.updateStatusById(orderId, OrderStatus.PAID, OrderStatus.WAIT_PAY);
if (updateCount == 0) {
// 并发下状态已变更,跳过
channel.basicAck(deliveryTag, false);
return;
}
// 3. 执行支付后续逻辑(如通知物流)
payService.afterPaySuccess(orderId);
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("订单{}支付处理失败", orderId, e);
channel.basicNack(deliveryTag, false, true);
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
}
}
优缺点:
- 优点:最优雅、无额外存储、业务天然幂等、无性能瓶颈。
- 缺点:仅适用于有状态流转的业务;需设计合理状态机。
- 适用:订单、支付、物流等有明确状态的核心业务。
七、持久化机制
RabbitMQ 的持久化机制是为了防止 Broker 因重启、崩溃或关闭而导致元数据(交换机、队列、绑定)和消息丢失。它通过将数据写入磁盘,在重启后重新加载,从而实现高可用性和可靠性。
7.1 持久化的三层范围
RabbitMQ 持久化包含三个独立层面,需要分别配置:
| 持久化对象 | 是否持久化 | 效果 | 配置方式 |
|---|---|---|---|
| 交换机(Exchange) | 可选 | 重启后交换机是否还存在 | 创建时设置 durable=true |
| 队列(Queue) | 可选 | 重启后队列是否还存在 | 创建时设置 durable=true |
| 消息(Message) | 可选 | 重启后队列中的消息是否还在 | 发布时设置 delivery_mode=2(或 persistent=true) |
关键点:只有同时将队列和消息都设置为持久化,消息才真正在 Broker 重启后不丢失。交换机持久化保证了消息能够正确路由到持久化队列。
7.2 各层持久化详解
7.2.1 交换机持久化(Exchange Durable)
-
交换机持久化仅保存交换机的名称、类型、属性等元数据。
-
声明时指定 durable = true。
-
重启后交换机依然存在
-
非持久化交换机(durable=false)重启后消失
-
示例:
channel.exchangeDeclare("my.exchange", BuiltinExchangeType.DIRECT, true); // durable=true
7.2.2 队列持久化(Queue Durable)
-
队列持久化保存队列的名称、属性、绑定关系等。但队列中的消息是否持久化独立决定。
-
如果一个队列声明为持久化但消息非持久化,重启后队列存在但所有消息丢失。
-
声明队列时 durable = true。
-
重启后队列结构保留。
-
队列不持久化,重启后队列消失,消息自然也没了。
-
示例:
channel.queueDeclare("my.queue", true, false, false, null); // durable=true, exclusive=false, autoDelete=false
7.2.3 消息持久化(Message Persistent)
-
通过设置消息的 delivery_mode 为 2(或 AMQP 0-9-1 中的 persistent 标志)使消息写入磁盘。
-
生产者示例:
channel.basicPublish("my.exchange", "rk", MessageProperties.PERSISTENT_TEXT_PLAIN, // 设置持久化 "message body".getBytes());
只有:交换机持久化 + 队列持久化 + 消息持久化 三者同时满足,消息才会真正落盘,重启不丢失。
7.3 持久化消息的存储原理
RabbitMQ 将持久化消息存入磁盘,但为了性能不是每条消息立即 fsync。主要组件:
-
消息存储结构
- 每个队列是一个 Erlang 进程,由其自己的 消息索引(msg_store)管理。
- RabbitMQ 使用 两个消息存储文件(.idx 和 .dat):
- .dat 文件:实际消息内容的二进制数据(按顺序追加)。
- .idx 文件:消息在 .dat 中的位置索引。
- 另外有 msg_store_transients 用于暂存非持久化消息的高水位处理。
-
写入策略
- 当生产者发送持久化消息时:
- 消息首先写入内存缓冲(同时写入磁盘日志 rabbit@host.log)。
- 如果消息到达了一个持久化队列且设置了持久化标志,RabbitMQ 会尽可能快地将消息刷盘。
- 默认的刷盘间隔是 200 ms(可通过 queue_index_embed_msgs_below 控制小消息行为)。
- 为了保证真正持久化,生产者可以使用 Publisher Confirms 等待 Broker 确认消息已落盘。
- 当生产者发送持久化消息时:
-
消息在队列中的状态
- 持久化消息:内存 + 磁盘都有一份。
- 非持久化消息:仅留在内存,当内存压力大或 Broker 重启时丢失。
- 惰性队列(Lazy Queues):自 RabbitMQ 3.6.0 引入,强制将消息尽早写入磁盘,减少内存占用,适合大量消息堆积但消费者缓慢的场景。惰性队列中的消息也是持久化的(若消息本身为持久化)。
-
重启恢复过程
- 启动时从磁盘加载持久化交换机和队列定义。
- 加载持久化消息索引和 .dat 文件。
- 从磁盘重建队列中的消息(顺序读取 .dat → 重建内存索引)。
- 消息恢复完成后,队列可以正常投递。
当队列非常多(如数万个)且每个队列都有大量持久化消息时,启动恢复会很慢。
7.4 惰性队列(Lazy Queue)持久化
- 普通队列:消息优先放内存,快但堆积多时 OOM。
- 惰性队列:消息直接落磁盘,几乎不占内存。
开启方式:
Map<String, Object> args = new HashMap<>();
args.put("x-queue-mode", "lazy");
new Queue("queue.lazy", true, false, false, args);
特点:
- 消息一进来就落磁盘
- 消费时才加载到内存
- 适合海量消息堆积(百万、千万级)
- 不会 OOM
- 性能比内存队列低,但比数据库高很多
7.5 仲裁队列(Quorum Queue)持久化
RabbitMQ 3.8+ 推荐的高可靠队列,基于 Raft 强一致复制。
- 队列数据在多节点同步
- 自动主从切换
- 天然持久化
- 无脑裂、不丢消息
- 性能比普通持久化队列略低,但可靠性最高
Queue quorumQueue = QueueBuilder.durable("quorum.queue").quorum().build();
7.6 完整持久化流程
- 生产者发送 PERSISTENT 消息
- 交换机 durable、队列 durable
- Broker 收到消息 → 写入 PageCache
- 异步刷入磁盘
- 消息状态写入队列索引
- 消费者消费并 ACK
- 消息被标记为删除,等待 GC 回收
7.7 最佳实践
- 生产环境建议:同时开启交换机、队列、消息三层持久化,并且使用 Publisher Confirms 等待 Broker 确认落盘。
- 谨慎使用惰性队列:当消费者正常时,普通持久化队列性能更好;只有堆积严重时才适合惰性队列。
- 避免大量队列都堆积持久化消息:这会显著拖慢启动速度和内存效率,考虑使用单一队列 + 多个消费者。
- 监控磁盘空间:持久化会占用磁盘,RabbitMQ 有磁盘空间预警(默认低于 50MB 停止接收消息)。
- 性能测试:如果不需要消息持久化(如实时交易确认),使用非持久化。
八、事务机制
RabbitMQ 的事务机制是 AMQP 0-9-1 协议原生支持的一组命令,用于保证生产者在一次事务中发送的多条消息的原子性:要么全部被 Broker 成功接收,要么全部失败(被回滚)。它主要用于解决生产端消息确认场景下的原子性问题,防止部分消息发送成功、部分失败导致业务数据不一致。
8.1 事务的三个核心命令
txSelect():用于将当前channel设置成transaction模式,通过调用tx.select方法开启事务模式。txCommit():用于提交事务。txRollback():发生异常时,回滚事务。
8.2 事务机制完整流程
- 客户端发送 tx.Select 命令,Broker 回复 tx.Select-Ok → 事务开启
- 生产者发送 1/N 条消息
- 执行本地数据库操作
- 一切正常 → tx.Commit;
异常 → tx.Rollback - Broker 收到 Commit 才真正落盘 / 路由;Rollback 则丢弃消息
示例:
Channel channel = connection.createChannel();
try {
// 1. 开启事务
channel.txSelect();
// 2. 发送多条消息
channel.basicPublish("exchange1", "routingKey1", null, "msg1".getBytes());
channel.basicPublish("exchange2", "routingKey2", null, "msg2".getBytes());
// 可能还有队列声明等操作
channel.queueDeclare("my.queue", true, false, false, null);
// 3. 提交事务
channel.txCommit();
System.out.println("事务提交,两条消息被 Broker 接收");
} catch (Exception e) {
// 发生异常,回滚事务
channel.txRollback();
System.out.println("事务回滚,两条消息均未生效");
}
注意:事务提交时,Broker 会执行消息的持久化(如果消息或队列是持久化的)并返回确认。只有收到 txCommit-Ok 才算事务成功。
8.3 事务机制原理
- 同步阻塞:发送 txCommit 后必须同步等待 Broker 回复,才能继续执行。
- 单线程顺序执行:事务内部消息必须一条一条同步确认,不允许并发。
- Broker 内部日志记录:消息先暂存,等 Commit 才真正进入队列存储。
- 强一致但极慢:一次事务 = 至少 2 次网络 RTT(txSelect + txCommit),吞吐量大约是 普通发送的 1/1000。
8.4 事务的局限性
- 性能极差:每个事务至少增加两次网络往返(txSelect-Ok 和 txCommit-Ok),且事务期间会锁定通道,极高并发下会严重降低吞吐量。实测性能下降数倍到数十倍。
- 不支持跨通道事务:事务仅作用于单个通道,无法跨多个通道或连接实现分布式事务(如需跨队列、跨 Broker 需用到分布式事务方案如 JTA/XA,RabbitMQ 也支持 XA 接口,但更复杂)。
- 无法回滚已提交的消息:txCommit 后无法撤销。
- 不支持异步:必须阻塞等待 Broker 回应,严重拖慢业务。
- 只保证生产者→Broker 可靠,不保证消费:事务只管发,不管消费是否成功。
- 不支持批量消息事务:不能批量提交,只能逐条。
8.5 事务 vs. Publisher Confirm (发布确认)
RabbitMQ 从 3.x 开始引入了更轻量高效的 Publisher Confirm 机制。两者对比:
| 特性 | 事务机制 (tx) | Publisher Confirm |
|---|---|---|
| 模式 | 同步阻塞 | 异步非阻塞 |
| 原子性 | 多条消息作为一个整体(全有或全无) | 每条消息单独确认,但可通过批量或同步等待实现类事务 |
| 性能 | 极低(每个事务需要 2 次额外同步 RPC:txCommit + txSelect 各自同步) | 高(异步回调或批量等待,没有额外 RPC 开销) |
| 原理 | AMQP 事务同步 | 异步 ACK 机制 |
| 吞吐量 | 1/1000 | 接近原生 |
| 实现复杂度 | 使用简单,但需处理回滚逻辑 | 需管理确认序号和回调,批量模式稍复杂 |
| 适用场景 | 少量消息、低吞吐、严格原子性要求 | 高吞吐、需要可靠投递但可接受部分成功的场景 |
一句话结论:事务是 “强一致但残废性能”,Confirm 是 “高可靠 + 高性能”,生产统一用 Confirm。
九、死信队列
死信队列是 RabbitMQ 中处理无法被正常消费的消息的一种机制。当消息在一个队列中变成“死信”(Dead Letter)后,它会被重新发布到另一个交换机(称为死信交换机,Dead Letter Exchange,DLX),然后路由到一个或多个队列(称为死信队列,DLQ)。
其本质是一条正常的交换机 + 队列,只是用来接收 “死掉” 的消息。
9.1 消息成为死信的三个条件
满足以下任一条件,消息将变成死信:
| 条件 | 说明 | 示例 |
|---|---|---|
| 消息被消费者拒绝且不重新入队 | 消费者调用 basic.reject 或 basic.nack,并且 requeue 参数设置为 false | channel.basicReject(deliveryTag, false); |
| 消息过期(TTL) | 消息设置了 TTL(生存时间)且超时未被消费,或者队列设置了 x-message-ttl 且消息超时 | MessageProperties 设置 expiration 属性,或队列 x-message-ttl |
| 队列达到最大长度 | 队列设置了 x-max-length 或 x-max-length-bytes,当队列已满时,最早的消息(队首)被移除并变成死信 | 队列参数 x-max-length: 10,第 11 条消息到达时队首消息会死信 |
注意:
- 使用 basic.reject(requeue=true) 消息会重新入队,不会成为死信。
- 如果队列没有配置死信交换机,这些消息会被直接丢弃(或根据队列的 x-overflow 配置处理)。
9.2 死信队列完整工作流程
- 生产者发送消息到业务队列(正常队列)
- 消息变成死信(上述任一原因)
- Broker 根据业务队列配置,将消息转发到 DLX(死信交换机)
- DLX 根据绑定键路由到 死信队列(DLQ)
- 消费者监听死信队列,做人工排查、日志记录、重试、补偿
关键点:
- 死信交换机就是普通交换机,可以是 direct/topic/fanout
- 死信队列也是普通队列
- 只是绑定关系特殊
9.3 核心配置
声明业务队列时指定两个参数:
- x-dead-letter-exchange:死信交换机(DLX)
- x-dead-letter-routing-key:死信路由键
示例:
Channel channel = connection.createChannel();
// 1. 声明死信交换机(例如 fanout 类型,或者 direct)
channel.exchangeDeclare("dlx.exchange", BuiltinExchangeType.DIRECT, true);
// 2. 声明死信队列,并绑定到死信交换机
channel.queueDeclare("dlq.queue", true, false, false, null);
channel.queueBind("dlq.queue", "dlx.exchange", "dlx.routing.key");
// 3. 声明主业务队列,并设置死信相关参数
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx.exchange"); // 死信交换机名称
args.put("x-dead-letter-routing-key", "dlx.routing.key"); // 死信路由键(可选,默认使用原路由键)
args.put("x-message-ttl", 10000); // 队列级别 TTL 10秒(可选)
args.put("x-max-length", 100); // 队列最大长度(可选)
channel.queueDeclare("main.queue", true, false, false, args);
// 4. 将主队列绑定到业务交换机
channel.exchangeDeclare("main.exchange", BuiltinExchangeType.DIRECT, true);
channel.queueBind("main.queue", "main.exchange", "main.rk");
9.4 常见使用场景
| 场景 | 利用的死信条件 | 说明 |
|---|---|---|
| 延迟消息/定时任务 | 消息 TTL + 死信队列 | 设置消息 TTL,过期后进入死信队列,由消费者处理。注意:RabbitMQ 本身没有延迟队列插件时的替代方案(TTL+DLX 模拟延迟)。 |
| 消息重试与失败隔离 | 消费者拒绝(requeue=false) | 消费失败多次后,将消息发送到死信队列,避免无限重试阻塞队列。 |
| 队列长度保护 | 队列最大长度限制 | 当消息堆积超过限制时,将最早的消息移到死信队列,保留最新消息,可用作消息淘汰策略。 |
| 监控与日志 | 任何死信 | 将所有死信集中到一个队列,统一记录失败原因(利用 x-death 头),便于排查问题。 |
| 业务降级 | 消费者拒绝 | 当依赖服务不可用时,将消息直接转入死信队列,待恢复后再重新处理(使用死信队列消费者重新发布或人工操作)。 |
典型例子:TTL + DLX 实现延迟消息
// 发送一条 10 秒后处理的消息
channel.basicPublish("main.exchange", "main.rk",
new AMQP.BasicProperties.Builder()
.expiration("10000") // 10秒过期
.build(),
"delayed message".getBytes());
这条消息进入 main.queue 后,10 秒后变为死信,被转发到 dlx.exchange → dlq.queue,监听 dlq.queue 的消费者将收到消息并执行延迟任务。
注意:这种方案的缺点是消息过期时间不精确(每个队列级别的 TTL 按 FIFO 队首超时处理,可能被前面消息阻塞),且需要额外维护死信队列。官方推荐使用 RabbitMQ 的延迟消息插件(rabbitmq_delayed_message_exchange)来实现更精确的延迟。
9.5 最佳实践
- 独立声明:死信交换机和死信队列应该独立于业务组件,单独定义,方便统一管理。
- 监控死信队列长度:死信队列持续增长说明有系统性问题(如消费者 bug、依赖故障),需要告警。
- 死信消息持久化:为了避免死信丢失(如 Broker 重启),建议将死信交换机和死信队列都设为 durable=true,消息也设置为持久化。
- 合理设置 TTL 和队列长度:避免因 TTL 或长度限制导致重要消息意外进入死信队列。
- 消费死信时记录原因:利用 x-death 头部信息(包含死信原因、时间、源队列等)记录日志,便于定位。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)