消息队列RabbitMQ的配置操作及使用
一、RabbitMQ的体系结构
RabbitMQ是一个基于AMQP(Advanced Message Queuing Protocol,高级消息队列协议)实现的开源消息中间件,主要用于在分布式系统中存储和转发消息。它由Erlang语言编写,以高性能、高可用性以及高扩展性而著称。
官网地址:RabbitMQ: One broker to queue them all | RabbitMQ
1.1 基本架构设计

①生产者(Producer):消息的发送方,负责产生并发送消息到 rabbitMQ 服务端
②交换机(Exchange):消息的分发中心,负责将接收到的消息路由到一个或多个队列
- 直连交换机(Direct Exchange):将消息路由到与消息中的路由键完全匹配的队列
- 主题交换机(Topic Exchange):根据通配符匹配路由键,将消息路由到相应队列
- 扇出交换机(Fanout Exchange):将消息广播到与交换机绑定的所有队列
- 头部交换机(Headers Exchange):根据消息头中的属性匹配到相应队列
③队列:消息的存储区,用于存储生产者发送的消息
④消费者:消息的接收方,负责从队列中获取消息并进行处理
⑤绑定:交换机和队列之间的关联关系。生产者将消息发送给交换机,队列通过绑定交换机,从 而接收消息
⑥虚拟主机:RabbitMQ 的基本工作单元,每个虚拟主机拥有独立的用户、队列、交换机等资源
⑦连接:连接是指生产者、消费者与 RabbitMQ 之间的网络连接。每个连接可以包含多个信道 (Channel),每个信道是一个独立的会话通道,可以进行独立的消息传递
1.2 消息中间件的作用
- 解耦:降低应用程序之间的直接依赖性,从而实现独立开发、部署和升级的能力
- 异步:可以将长时间的处理任务放入消息队列中异步处理,从而提升响应速度
- 削峰:通过平衡系统负载来减轻峰值压力和填充低谷时的资源利用率

1.3 常用消息队列对比

二、基于 docker 安装 RabbitMQ
2.1 拉取镜像
docker pull rabbitmq
// 如果需要包含管理插件的镜像,可以拉取带有-management标签的镜像
docker pull rabbitmq:3.8-management
2.2 运行镜像
docker run -d --name rabbitmq --restart=always \
-p 5672:5672 -p 15672:15672 \
rabbitmq:management

2.3 查看安装结果
浏览器中输入 http:///127.0.0.1:15672,进入rabbitMQ 管理界面,默认用户名和密码都是guest
注意:如果是在阿里云环境中运行,需要将5672和15672端口添加到安全组

三、rabbitMQ 工作模式
官网介绍了7中工作模式,如下:



3.1 Work Queues

生产者将消息发送到默认交换机,再推送到自定义队列,多个消费者监听同一个队列时,谁先抢到消息算谁的。
当生产者只有一个,且消费者只有一个时,就是“hello world”模式,即简单模式。
3.2 发布订阅模式(publish/Subscribe)

需要创建一个fanout类型的交换机,无需设置路由键,交换机将消息广播到与其绑定的所有队列中,再由消费者进行消费。

3.3 路由模式(routing)

需要创建一个 direct 类型的交换机,并设置路由键,将消息发送到特定的队列中,然后由消费者消费消息。

3.4 主题模式(topics)

需要创建一个 topic 类型的交换机,并设置用通配符表示的路由键,将消息发送到匹配的队列中。
匹配规则:
* 表示匹配一个词
# 表示匹配0个或多个词

四、基于 golang 使用 RabbitMQ
在go语言中使用 RabbitMQ,需引用第三方包:github.com/rabbitmq/amqp091-go
go get https://github.com/rabbitmq/amqp091-go
4.1 连接 rabbitMQ 服务器
// 连接 rabbitMQ 服务器(延迟关闭)
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
panic(err)
}
defer conn.Close()
4.2 创建连接管道 channel
// 创建channel
ch, err := conn.Channel()
if err != nil {
panic(err)
}
defer ch.Close()
4.3 创建交换机 exchange
// 创建交换机
err = ch.ExchangeDeclare(
"direct.test.go", // 交换机名称
"direct", // 交换机类型
true, // 是否持久化
false, // 是否自动删除
false, // 是否内部交换机
false, // 是否等待
nil, // 其他参数
)
4.4 创建队列 queue
// 创建队列
q, err := ch.QueueDeclare(
"queue.direct.test.go", // 队列名称
true, // 是否持久化
false, // 是否自动删除
false, // 是否排他
false, // 是否等待
nil, // 其他参数
)
if err != nil {
panic(err)
}
4.5 绑定交换机与队列
// 绑定交换机与队列
ch.QueueBind(
q.Name, // 队列名称
"test", // 路由键
"direct.test.go", // 交换机名称
false, // 是否等待
nil, // 其他参数
)
4.6 发布消息
// 创建超时context
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 推送消息
body := "Hello World!88888"
ch.PublishWithContext(ctx,
"direct.test.go", // 交换机名称
"test", // 路由键
false, // 是否等待
false, // 是否立即
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
},
)
4.7 消费消息
// 消费消息
msgs, err := ch.Consume(
"queue.direct.test.go", // 队列名称
"", // 消费者标识
false, // 是否自动确认
false, // 是否排他
false, // 是否等待
false, // 其他参数
nil, // 其他参数
)
// 取出管道中的消息并打印
for msg := range msgs {
fmt.Printf("消费到消息内容:%s \n", string(msg.Body))
msg.Ack(false)
}
- 生产者发送消息完整代码:
package main
import (
"context"
"fmt"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func main() {
// 连接 rabbitMQ 服务器(延迟关闭)
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
panic(err)
}
defer conn.Close()
// 创建channel
ch, err := conn.Channel()
if err != nil {
panic(err)
}
defer ch.Close()
// 创建交换机
err = ch.ExchangeDeclare(
"direct.test.go", // 交换机名称
"direct", // 交换机类型
true, // 是否持久化
false, // 是否自动删除
false, // 是否内部交换机
false, // 是否等待
nil, // 其他参数
)
// 创建队列
q, err := ch.QueueDeclare(
"queue.direct.test.go", // 队列名称
true, // 是否持久化
false, // 是否自动删除
false, // 是否排他
false, // 是否等待
nil, // 其他参数
)
if err != nil {
panic(err)
}
// 绑定交换机与队列
ch.QueueBind(
q.Name, // 队列名称
"test", // 路由键
"direct.test.go", // 交换机名称
false, // 是否等待
nil, // 其他参数
)
// 创建超时context
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 推送消息
body := "Hello World!88888"
ch.PublishWithContext(ctx,
"direct.test.go", // 交换机名称
"test", // 路由键
false, // 是否等待
false, // 是否立即
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
},
)
fmt.Printf("推送消息:%s", body)
}
- 消费者端接收消息完整代码:
package main
import (
"fmt"
amqp "github.com/rabbitmq/amqp091-go"
)
func main() {
// 连接 rabbitMQ 服务器(延迟关闭)
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
panic(err)
}
defer conn.Close()
// 创建channel
ch, err := conn.Channel()
if err != nil {
panic(err)
}
defer ch.Close()
// 消费消息 如果生产者端或手动建立了交换机和队列,及绑定关系,则消费者端可直接根据队列名消费消息,而无需再创建交换机、队列等
msgs, err := ch.Consume(
"queue.direct.test.go", // 队列名称
"", // 消费者标识
false, // 是否自动确认
false, // 是否排他
false, // 是否等待
false, // 其他参数
nil, // 其他参数
)
// 取出管道中的消息并打印
for msg := range msgs {
fmt.Printf("消费到消息内容:%s \n", string(msg.Body))
msg.Ack(false)
}
}
在队列 queque.direct.test.go 中,可查看到消息内容:

五、RabbitMQ 进阶篇
5.1 消息的可靠性投递
- 三种消息丢失场景
①生产者:发送消息到rabbitMQ服务的过程中出现丢失
②rabbitMQ服务器:进行消息持久化的过程中出现丢失,比如服务宕机重启
③消费者:拉取信息时存在网络波动等导致消息丢失,或消息者处理消息异常导致丢失
- 消息丢失的解决方案
①消息确认机制(针对生产者)
Confirm 模式是 RabbitMQ 提供的一种消息可靠性保障机制。当生产者通过 Confirm 模式发送消息时它会等待 RabbitMQ 的确认,确保消息已经被正确地投递到了指定的 Exchange 中。
消息正确投递到 queue 时,会返回 ack。
消息没有正确投递到 queue 时,会返回 nack。如果 exchange 没有绑定 queue,也会出现消息丢失使用方法:
生产者通过 confirm.select 方法将 Channel 设置为 Confirm 模式
发送消息后,通过添加 add confirm listener 方法,监听消息的确认状态。②消息持久化机制(针对rabbitMQ服务器)
持久化机制是指将消息存储到磁盘,以保证在 RabbitMQ 服务器宕机或重启时,消息不会丢失。
使用方法:
生产者通过将消息的 delivery mode 属性设置为 2,将消息标记为持久化。
队列也需要进行持久化设置,确保队列在 RabbitMQ 服务器重启后仍然存在。经典队列需要将durable属性设置为true。而仲裁队列和流式队列默认必须持久化保存。注意事项:
持久化机制会影响性能,因此在需要确保消息不丢失的场景下使用③ACK事务机制(针对消费者)
ACK 事务机制用于确保消息被正确消费。当消息被消费者成功处理后,消费者发送确认(ACK)给RabbitMQ,告知消息可以被移除。这个过程是自动处理的,也可以关闭进行手工发送 ACK。
使用方法:
在 RabbitMQ 中,ACK 机制默认是开启的。当消息被消费者接收后,会立即从队列中删除,除非消费者发生异常。
可以手动开启 ACK 机制,通过将 auto_ack 参数设置为 False,手动控制消息的 ACK注意事项:
ACK 机制可以确保消息不会被重复处理,但如果消费者发生异常或者未发送 ACK,消息可能会被重复投递
- 三种重复消费场景
①生产者:重复发送同一条消息
②rabbit服务器:消费者消费消息后,没来得及发送ACK消息,rabbit服务器挂掉,MQ认为消息还未被消费,当MQ重启后,会继续推送这条消息
③消费者:消费者处理完消息,没来得及发送ACK确认消息,消费者挂掉。MQ认为消息还未被消费,当消费者重启后,会再次接收到这条消息
- 重复消费的解决方案
①使用数据库唯一约束(局限性大)
②插入消费记录:根据消息ID,将消息先插入数据库,插入成功后给rabbitMQ返回ACK确认消息。消费者处理完业务,则增加标记,表示消息已处理成功;如果处理失败,则记录失败次数及原因,已提醒管理人员进行手动处理。
- 消息堆积的原因

- 消息堆积的解决方案
①优化消费者性能,增加消费者数量
②增加队列的容量,以存储更多的消息
③将无法处理的消息转移到死信队列
④将大消息分割为小消息,提高处理速度
⑤简化消费端业务处理逻辑
⑥控制生产者发送消息的速度
⑦设置消息优先级,优先处理高优先级的消息
5.2 消费端限流
通过设置rabbitMQ的prefetch count参数,可以控制服务器一次投递给消费者的消息数量,以适应消费者处理消息的速率(避免大量消息都投递到消费者)。
// 消费者端代码
//设置消费端限流 rabbitmq未收到ack消息时,只投递1条消息到消费者
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)

5.3 消息超时

在 RabbitMQ 中,设置消息或队列的“超时”(即自动过期/消失)主要有两种方式,都通过 x-message-ttl 参数来实现。当消息在队列中停留的时间超过这个值,且未被消费者确认(ACK),RabbitMQ 会将该消息标记为“死信”(Dead Letter)或直接丢弃。
优先级:如果队列设置了 TTL,消息也设置了 TTL,取两者中较短的那个
- 队列级 TTL
该队列中的所有消息都需要相同的过期时间,在声明队列 (QueueDeclare) 时,通过 Arguments 参数设置。
// 创建队列
q, err := ch.QueueDeclare(
"queue.direct.test.ttl2", // 队列名称
true, // 是否持久化
false, // 是否自动删除
false, // 是否排他
false, // 是否等待
map[string]any{ // 其他参数
amqp.QueueMessageTTLArg: 30000, // 队列中消息过期时间,单位是毫秒
},
)
- 消息级 TTL
同一个队列中,不同消息需要不同的过期时间(例如:VIP 用户消息保留 1 小时,普通用户保留 5 分钟)。在发布消息 (Publish) 时,通过 Publishing 属性的 Expiration 字段设置。
body := fmt.Sprintf("Hello World! %d", i)
ch.PublishWithContext(ctx,
"direct.test.go", // 交换机名称
"test", // 路由键
false, // 是否等待
false, // 是否立即
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
Expiration: "5000", // 发布消息时,可以设置该条消息的过期时间,与队列的消息过期时间参数对比,取较短的过期时间
},
)

5.4 死信队列
死信队列是 RabbitMQ 中一种特殊的机制,用于接收那些无法正常被消费的消息。
产生的原因:
- 消息被拒绝
- 消息过期
- 队列满了

使用方法:
x-dead-letter-exchange:指定死信消息要转发到的交换机名称
x-dead-letter-routing-key:指定死信消息转发时的路由键
// 创建队列
q, err := ch.QueueDeclare(
"queue.direct.test.ttl2", // 队列名称
true, // 是否持久化
false, // 是否自动删除
false, // 是否排他
false, // 是否等待
map[string]any{ // 其他参数
amqp.QueueMessageTTLArg: 30000, // 队列中消息过期时间
"x-dead-letter-exchange": "exchange.direct.dead.leter", // 过期后发往死信队列绑定的交换机
"x-dead-letter-routing-key": "routing.key.dead.leter", // 指定路由键
},
)


应用场景:
- 订单超时自动取消
- 异常消息隔离与报警
- 消息重试机制
5.5 延迟队列
延迟队列是消息中间件中一种特殊的场景,指消息被发送后,不会立即被消费者消费,而是需要在指定的时间之后才能被消费。
实现方式:
- 方案一:正常队列 TTL + 死信队列
- 方案二:rabbitmq_delayed_message_exchange 插件
应用场景:

5.6 惰性队列
尽可能将消息直接存储在磁盘上,只在消费者请求时才将少量消息加载到 RAM 中。内存占用极低且稳定,能轻松处理百万级甚至亿级的消息堆积,不会因内存爆炸而宕机。
使用方法:创建队列时,配置 x-queue-mode 参数为 lazy。
// 创建队列
q, err := ch.QueueDeclare(
"queue.direct.test.ttl3", // 队列名称
true, // 是否持久化
false, // 是否自动删除
false, // 是否排他
false, // 是否等待
map[string]any{ // 其他参数
"x-queue-mode": "lazy", // 关键配置:开启惰性模式
},
)
使用场景:

5.7 优先级队列
优先级队列允许你在发送消息时给每条消息分配一个优先级(0-255)。当消费者从队列中取消息时,RabbitMQ 会优先投递优先级高的消息,而不是严格按照“先进先出”(FIFO)的顺序。
使用方法:
在 RabbitMQ 中,优先级队列不是默认开启的,需要在声明队列时设置最大优先级参数 x-max-priority。数值越大,优先级越高。
创建队列时,配置 x-max-priority 参数;发送消息时设置消息的优先级 Priority。
// 创建队列
q, err := ch.QueueDeclare(
"queue.direct.test.ttl3", // 队列名称
true, // 是否持久化
false, // 是否自动删除
false, // 是否排他
false, // 是否等待
map[string]any{ // 其他参数
"x-max-priority": int32(10), // 设置最大优先级为 10
},
)
// 发送消息
ch.PublishWithContext(ctx,
"direct.test.go", // 交换机名称
"test", // 路由键
false, // 是否等待
false, // 是否立即
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
Priority: 9, // 设置消息优先级,值越大优先级越高
},
)
应该场景:

优先级队列缺点:
- 内存消耗剧增
- 只有“相对”优先,没有“绝对”插队
六、RabbitMQ 集群篇
6.1 集群搭建
环境说明: windows wsl 系统中运用 docker compose 搭建 rabbitmq 集群
6.1.1 创建相关目录
目录结构如下:
rabbitmq-cluster/
├── docker-compose.yml
├── entrypoint.sh
└── data/ # (可选) 用于持久化数据,脚本运行后会自动生成子文件夹
6.1.2 创建自动加入集群的 shell 脚本 entrypoint.sh
创建 entrypoint.sh 文件,用于将 rabbitmq 节点自动加入到集群,创建完该文件后需要确保脚本有执行权限(执行命令 chmod +x entrypoint.sh)
#!/bin/bash
set -e
# 配置变量
COOKIE_VAL="${RABBITMQ_ERLANG_COOKIE:-secret_cookie_123}"
COOKIE_FILE="/var/lib/rabbitmq/.erlang.cookie"
MY_HOST="$(hostname)"
MASTER_NODE="rabbit@rabbitmq1"
echo "[$(date)] 启动节点: $MY_HOST"
# 1. 统一 Erlang Cookie (集群通信的关键)
if [ ! -f "$COOKIE_FILE" ]; then
echo "写入 Erlang Cookie..."
echo "$COOKIE_VAL" > "$COOKIE_FILE"
chmod 600 "$COOKIE_FILE"
chown rabbitmq:rabbitmq "$COOKIE_FILE"
fi
# 2. 判断是否为主节点 (rabbitmq1)
if [ "$MY_HOST" == "rabbitmq1" ]; then
echo "[$(date)] 检测到是主节点 (rabbitmq1),直接启动服务..."
exec rabbitmq-server
fi
# 3. 非主节点逻辑:后台启动 -> 等待主节点 -> 加入集群 -> 前台运行
echo "[$(date)] 检测到是从节点,准备加入集群 $MASTER_NODE ..."
# 启动 RabbitMQ 为后台守护进程
rabbitmq-server -detached
# 等待本地服务完全就绪
echo "[$(date)] 等待本地服务就绪..."
until rabbitmqctl status > /dev/null 2>&1; do
sleep 2
done
# 等待主节点可连接 (防止主节点还没起好就尝试加入)
echo "[$(date)] 等待主节点 $MASTER_NODE 响应..."
while ! rabbitmqctl ping -n "$MASTER_NODE" > /dev/null 2>&1; do
echo "主节点未就绪,等待 2 秒..."
sleep 2
done
# 执行集群加入操作
echo "[$(date)] 执行加入集群操作..."
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster "$MASTER_NODE"
rabbitmqctl start_app
echo "[$(date)] 成功加入集群!重启进程以前台模式运行..."
# 停止后台进程,以便 exec 接管容器保持存活
rabbitmqctl stop
# 4. 以前台模式正式运行 (此时已属于集群)
exec rabbitmq-server
6.1.3 创建 docker-compose.yml
services:
# --- 节点 1 (主节点/种子节点) ---
rabbitmq1:
image: rabbitmq:3.13-management-alpine
hostname: rabbitmq1
container_name: rabbitmq1
environment:
# 基础认证
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=admin
# 集群通信密钥 (所有节点必须一致)
- RABBITMQ_ERLANG_COOKIE=secret_cookie_123
ports:
- "15673:15672" # 管理界面
- "5673:5672" # AMQP 端口
volumes:
- ./data/rabbitmq1:/var/lib/rabbitmq
# 挂载脚本到容器内
- ./entrypoint.sh:/entrypoint.sh:ro
entrypoint: ["bash", "/entrypoint.sh"]
networks:
- rabbitmq_net
healthcheck:
test: ["CMD", "rabbitmq-diagnostics", "ping"]
interval: 10s
timeout: 5s
retries: 5
restart: always
# --- 节点 2 ---
rabbitmq2:
image: rabbitmq:3.13-management-alpine
hostname: rabbitmq2
container_name: rabbitmq2
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=admin
- RABBITMQ_ERLANG_COOKIE=secret_cookie_123
ports:
- "15674:15672"
- "5674:5672"
volumes:
- ./data/rabbitmq2:/var/lib/rabbitmq
# 挂载脚本到容器内
- ./entrypoint.sh:/entrypoint.sh:ro
entrypoint: ["bash", "/entrypoint.sh"]
depends_on:
rabbitmq1:
condition: service_healthy # 确保节点 1 健康后再启动节点 2
networks:
- rabbitmq_net
healthcheck:
test: ["CMD", "rabbitmq-diagnostics", "ping"]
interval: 10s
timeout: 5s
retries: 5
restart: always
# --- 节点 3 ---
rabbitmq3:
image: rabbitmq:3.13-management-alpine
hostname: rabbitmq3
container_name: rabbitmq3
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=admin
- RABBITMQ_ERLANG_COOKIE=secret_cookie_123
ports:
- "15675:15672"
- "5675:5672"
volumes:
- ./data/rabbitmq3:/var/lib/rabbitmq
# 挂载脚本到容器内
- ./entrypoint.sh:/entrypoint.sh:ro
entrypoint: ["bash", "/entrypoint.sh"]
depends_on:
rabbitmq1:
condition: service_healthy
rabbitmq2:
condition: service_healthy
networks:
- rabbitmq_net
healthcheck:
test: ["CMD", "rabbitmq-diagnostics", "ping"]
interval: 10s
timeout: 5s
retries: 5
restart: always
networks:
rabbitmq_net:
driver: bridgeservices:
# --- 节点 1 (主节点/种子节点) ---
rabbitmq1:
image: rabbitmq:3.13-management-alpine
hostname: rabbitmq1
container_name: rabbitmq1
environment:
# 基础认证
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=admin
# 集群通信密钥 (所有节点必须一致)
- RABBITMQ_ERLANG_COOKIE=secret_cookie_123
ports:
- "15673:15672" # 管理界面
- "5673:5672" # AMQP 端口
volumes:
- ./data/rabbitmq1:/var/lib/rabbitmq
# 挂载脚本到容器内
- ./entrypoint.sh:/entrypoint.sh:ro
entrypoint: ["bash", "/entrypoint.sh"]
networks:
- rabbitmq_net
healthcheck:
test: ["CMD", "rabbitmq-diagnostics", "ping"]
interval: 10s
timeout: 5s
retries: 5
restart: always
# --- 节点 2 ---
rabbitmq2:
image: rabbitmq:3.13-management-alpine
hostname: rabbitmq2
container_name: rabbitmq2
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=admin
- RABBITMQ_ERLANG_COOKIE=secret_cookie_123
ports:
- "15674:15672"
- "5674:5672"
volumes:
- ./data/rabbitmq2:/var/lib/rabbitmq
# 挂载脚本到容器内
- ./entrypoint.sh:/entrypoint.sh:ro
entrypoint: ["bash", "/entrypoint.sh"]
depends_on:
rabbitmq1:
condition: service_healthy # 确保节点 1 健康后再启动节点 2
networks:
- rabbitmq_net
healthcheck:
test: ["CMD", "rabbitmq-diagnostics", "ping"]
interval: 10s
timeout: 5s
retries: 5
restart: always
# --- 节点 3 ---
rabbitmq3:
image: rabbitmq:3.13-management-alpine
hostname: rabbitmq3
container_name: rabbitmq3
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=admin
- RABBITMQ_ERLANG_COOKIE=secret_cookie_123
ports:
- "15675:15672"
- "5675:5672"
volumes:
- ./data/rabbitmq3:/var/lib/rabbitmq
# 挂载脚本到容器内
- ./entrypoint.sh:/entrypoint.sh:ro
entrypoint: ["bash", "/entrypoint.sh"]
depends_on:
rabbitmq1:
condition: service_healthy
rabbitmq2:
condition: service_healthy
networks:
- rabbitmq_net
healthcheck:
test: ["CMD", "rabbitmq-diagnostics", "ping"]
interval: 10s
timeout: 5s
retries: 5
restart: always
networks:
rabbitmq_net:
driver: bridge
6.1.4 启动集群并查看相关日志
# 启动集群
docker compose up -d
# 查看日志
docker logs rabbitmq2

或者查看节点的集群状态,Running Nodes 显示三个节点,表示创建集群成功
# 查看集群状态
docker exec -it rabbitmq1 rabbitmqctl cluster_status
6.1.5 访问网页管理端查看集群状态
访问任意节点的管理端,可以查看到三个节点,如图所示:
# 访问管理端,用户名及密码都是admin(docker-compose.yml中配置的)
http://127.0.0.1:15673

6.2 Quorum 队列
6.2.1 概念
Quorum Queue(仲裁队列)只能在 RabbitMQ 集群(Cluster)环境中创建和使用,创建队列后,会在集群中的所有节点都创建副本,其中一个节点被选举为 Leader(负责处理读写请求), 另外的节点作为 Follower(同步存储数据副本)。
采用 “多数派写入” 机制,当消息写入时,必须由集群中超过半数的节点确认成功后,才视为写入成功并返回给生产者。
当消费者成功处理消息并发送 ACK 确认后,该消息会从集群中所有节点(Leader 和所有 Follower)上同时删除。
6.2.2 特点

6.3.3 使用方法
在任意一个节点上创建队列,type 选择 Quorum,即可创建仲裁队列。


6.3 Stream 队列
RabbitMQ Stream Queue(流队列)是 RabbitMQ 从 3.9 版本引入的一种全新队列类型,专为高吞吐、大数据量、日志类场景设计。 它的核心设计理念借鉴了 Apache Kafka,将 RabbitMQ 从一个传统的“即时消费”消息代理,扩展为支持“日志回放”和“无限存储”的流处理平台。
机制:
消息像写日志一样,按顺序追加到磁盘文件中。
区别:
经典/仲裁队列:消息被消费并 ACK 后,立即删除。
流队列:消息被消费后不会删除,而是永久保留(直到达到配置的保留策略,如时间或大小限制)。
优势:
支持历史消息回放。新消费者加入时,可以从头开始读取,或者从任意时间点(Offset)开始读取。
6.4 基于 go 语言使用集群消息队列的方法
6.4.1 创建连接
func connectToCluster(nodes []string) (*amqp.Connection, error) {
var lastErr error
// 遍历所有节点尝试连接
for _, nodeUrl := range nodes {
fmt.Printf("正在尝试连接节点: %s ...\n", nodeUrl)
conn, err := amqp.Dial(nodeUrl)
if err == nil {
// 连接成功!
fmt.Printf("✅ 成功连接到节点: %s\n", nodeUrl)
return conn, nil
}
lastErr = err
log.Printf("❌ 连接节点 %s 失败: %v", nodeUrl, err)
// 可选:可以在这里加一个极短的延时,避免瞬间风暴
time.Sleep(100 * time.Millisecond)
}
// 如果所有节点都试过了还是失败,返回最后一个错误
return nil, fmt.Errorf("无法连接到集群中的所有节点,最后错误: %w", lastErr)
}
6.4.2 创建仲裁队列
需要指定队列的类型为仲裁队列,x-queue-type = "quorum"
// 创建仲裁队列
q, err := ch.QueueDeclare(
"queue.direct.test.cluster", // 队列名称
true, // 是否持久化
false, // 是否自动删除
false, // 是否排他
false, // 是否等待
amqp.Table{
amqp.QueueTypeArg: amqp.QueueTypeQuorum, // 重点:集群使用仲裁队列
},
)
6.4.3 完整代码示例
package main
import (
"context"
"fmt"
"log"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
// 配置集群地址:用逗号分隔多个节点
// 格式:amqp://用户名:密码@节点1:端口,节点2:端口,节点3:端口/虚拟主机
var clusterUrls = []string{
"amqp://admin:admin@127.0.0.1:5673/",
"amqp://admin:admin@127.0.0.1:5674/",
"amqp://admin:admin@127.0.0.1:5675/",
}
func main() {
// 启动重连循环
reconnectLoop()
}
func connectToCluster(nodes []string) (*amqp.Connection, error) {
var lastErr error
// 遍历所有节点尝试连接
for _, nodeUrl := range nodes {
fmt.Printf("正在尝试连接节点: %s ...\n", nodeUrl)
conn, err := amqp.Dial(nodeUrl)
if err == nil {
// 连接成功!
fmt.Printf("✅ 成功连接到节点: %s\n", nodeUrl)
return conn, nil
}
lastErr = err
log.Printf("❌ 连接节点 %s 失败: %v", nodeUrl, err)
// 可选:可以在这里加一个极短的延时,避免瞬间风暴
time.Sleep(100 * time.Millisecond)
}
// 如果所有节点都试过了还是失败,返回最后一个错误
return nil, fmt.Errorf("无法连接到集群中的所有节点,最后错误: %w", lastErr)
}
func reconnectLoop() {
var conn *amqp.Connection
var ch *amqp.Channel
var notifyClose chan *amqp.Error
// 重连间隔
reconnectInterval := 5 * time.Second
for {
// 1. 尝试连接集群
// Dial 会依次尝试 urls 中的地址,直到成功或全部失败
// 注意:amqp091-go 的 Dial 通常只接受一个 URL 字符串
fmt.Printf("正在尝试连接 RabbitMQ 集群: %s\n", clusterUrls)
c, err := connectToCluster(clusterUrls)
if err != nil {
log.Printf("连接集群失败: %v, %s 后重试...", err, reconnectInterval)
time.Sleep(reconnectInterval)
continue
}
conn = c
notifyClose = make(chan *amqp.Error)
conn.NotifyClose(notifyClose)
// 2. 创建 Channel
channel, err := conn.Channel()
if err != nil {
log.Printf("创建 Channel 失败: %v", err)
conn.Close()
time.Sleep(reconnectInterval)
continue
}
ch = channel
fmt.Println("✅ 成功连接到 RabbitMQ 集群并创建 Channel")
// 3. 在这里执行业务逻辑 (发布/消费)
// 建议将 ch 传递给具体的业务协程
doWork(ch)
// 4. 等待连接关闭通知 (阻塞)
err = <-notifyClose
log.Printf("连接断开: %v", err)
// 清理资源
if ch != nil {
ch.Close()
}
if conn != nil {
conn.Close()
}
// 5. 等待一段时间后重试
time.Sleep(reconnectInterval)
}
}
func doWork(ch *amqp.Channel) {
// 模拟业务运行
// 在实际应用中,这里会启动消费者协程或生产者循环
// 它们会使用传入的 ch
// 创建交换机
err := ch.ExchangeDeclare(
"direct.test.cluster", // 交换机名称
"direct", // 交换机类型
true, // 是否持久化
false, // 是否自动删除
false, // 是否内部交换机
false, // 是否等待
nil, // 其他参数
)
if err != nil {
fmt.Println(err)
}
// 创建队列
q, err := ch.QueueDeclare(
"queue.direct.test.cluster", // 队列名称
true, // 是否持久化
false, // 是否自动删除
false, // 是否排他
false, // 是否等待
amqp.Table{
amqp.QueueTypeArg: amqp.QueueTypeQuorum, // 重点:集群使用仲裁队列
},
)
if err != nil {
panic(err)
}
// 绑定交换机与队列
ch.QueueBind(
q.Name, // 队列名称
"test", // 路由键
"direct.test.cluster", // 交换机名称
false, // 是否等待
nil, // 其他参数
)
// 创建超时context
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// 推送消息
for i := range 30 {
body := fmt.Sprintf("Hello World! rabbitmq cluster %d", i)
ch.PublishWithContext(ctx,
"direct.test.cluster", // 交换机名称
"test", // 路由键
false, // 是否等待
false, // 是否立即
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
},
)
fmt.Printf("推送消息:%s\n", body)
time.Sleep(1 * time.Second)
}
}


参考视频:消息中间件夺命连环18问,一口气刷完面试必问的消息中间件面试内容,让你面试少走99%的弯路!_哔哩哔哩_bilibili
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐




所有评论(0)