一、RabbitMQ的体系结构

RabbitMQ是一个基于AMQP(Advanced Message Queuing Protocol,高级消息队列协议)实现的开源消息中间件,主要用于在分布式系统中存储和转发消息。它由Erlang语言编写,以高性能、高可用性以及高扩展性而著称。

官网地址:RabbitMQ: One broker to queue them all | RabbitMQ

1.1 基本架构设计

 ①生产者(Producer):消息的发送方,负责产生并发送消息到 rabbitMQ 服务端

②交换机(Exchange):消息的分发中心,负责将接收到的消息路由到一个或多个队列

  • 直连交换机(Direct Exchange):将消息路由到与消息中的路由键完全匹配的队列
  • 主题交换机(Topic Exchange):根据通配符匹配路由键,将消息路由到相应队列
  • 扇出交换机(Fanout Exchange):将消息广播到与交换机绑定的所有队列
  • 头部交换机(Headers Exchange):根据消息头中的属性匹配到相应队列

③队列:消息的存储区,用于存储生产者发送的消息

④消费者:消息的接收方,负责从队列中获取消息并进行处理

⑤绑定:交换机和队列之间的关联关系。生产者将消息发送给交换机,队列通过绑定交换机,从                  而接收消息

⑥虚拟主机:RabbitMQ 的基本工作单元,每个虚拟主机拥有独立的用户、队列、交换机等资源

⑦连接:连接是指生产者、消费者与 RabbitMQ 之间的网络连接。每个连接可以包含多个信道                      (Channel),每个信道是一个独立的会话通道,可以进行独立的消息传递

1.2 消息中间件的作用

  • 解耦:降低应用程序之间的直接依赖性,从而实现独立开发、部署和升级的能力
  • 异步:可以将长时间的处理任务放入消息队列中异步处理,从而提升响应速度
  • 削峰:通过平衡系统负载来减轻峰值压力和填充低谷时的资源利用率

1.3 常用消息队列对比

二、基于 docker 安装 RabbitMQ

2.1 拉取镜像

docker pull rabbitmq

// 如果需要包含管理插件的镜像,可以拉取带有-management标签的镜像
docker pull rabbitmq:3.8-management

2.2 运行镜像

docker run -d --name rabbitmq --restart=always \
-p 5672:5672 -p 15672:15672 \
rabbitmq:management

2.3 查看安装结果

浏览器中输入 http:///127.0.0.1:15672,进入rabbitMQ 管理界面,默认用户名和密码都是guest

注意:如果是在阿里云环境中运行,需要将5672和15672端口添加到安全组

三、rabbitMQ 工作模式

官网介绍了7中工作模式,如下:

3.1 Work Queues 

 

生产者将消息发送到默认交换机,再推送到自定义队列,多个消费者监听同一个队列时,谁先抢到消息算谁的。

当生产者只有一个,且消费者只有一个时,就是“hello world”模式,即简单模式。

3.2 发布订阅模式(publish/Subscribe) 

需要创建一个fanout类型的交换机,无需设置路由键,交换机将消息广播到与其绑定的所有队列中,再由消费者进行消费。

3.3 路由模式(routing)

需要创建一个 direct 类型的交换机,并设置路由键,将消息发送到特定的队列中,然后由消费者消费消息。

3.4 主题模式(topics)

需要创建一个 topic 类型的交换机,并设置用通配符表示的路由键,将消息发送到匹配的队列中。

匹配规则:

* 表示匹配一个词

# 表示匹配0个或多个词

四、基于 golang 使用 RabbitMQ

在go语言中使用 RabbitMQ,需引用第三方包:github.com/rabbitmq/amqp091-go

go get https://github.com/rabbitmq/amqp091-go

4.1 连接 rabbitMQ 服务器

// 连接 rabbitMQ 服务器(延迟关闭)
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
	panic(err)
}
defer conn.Close()

4.2  创建连接管道 channel

// 创建channel
ch, err := conn.Channel()
if err != nil {
	panic(err)
}
defer ch.Close()

4.3 创建交换机 exchange

// 创建交换机
err = ch.ExchangeDeclare(
	"direct.test.go", // 交换机名称
	"direct",         // 交换机类型
	true,             // 是否持久化
	false,            // 是否自动删除
	false,            // 是否内部交换机
	false,            // 是否等待
	nil,              // 其他参数
)

4.4 创建队列 queue

// 创建队列
q, err := ch.QueueDeclare(
	"queue.direct.test.go", // 队列名称
	true,                   // 是否持久化
	false,                  // 是否自动删除
	false,                  // 是否排他
	false,                  // 是否等待
	nil,                    // 其他参数
)
if err != nil {
	panic(err)
}

4.5 绑定交换机与队列

// 绑定交换机与队列
ch.QueueBind(
	q.Name,           // 队列名称
	"test",           // 路由键
	"direct.test.go", // 交换机名称
	false,            // 是否等待
	nil,              // 其他参数
)

4.6 发布消息

// 创建超时context
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// 推送消息
body := "Hello World!88888"
ch.PublishWithContext(ctx,
	"direct.test.go", // 交换机名称
	"test",           // 路由键
	false,            // 是否等待
	false,            // 是否立即
	amqp.Publishing{
		ContentType: "text/plain",
		Body:        []byte(body),
	},
)

4.7 消费消息

// 消费消息
msgs, err := ch.Consume(
	"queue.direct.test.go", // 队列名称
	"",                     // 消费者标识
	false,                  // 是否自动确认
	false,                  // 是否排他
	false,                  // 是否等待
	false,                  // 其他参数
	nil,                    // 其他参数
)

// 取出管道中的消息并打印
for msg := range msgs {
	fmt.Printf("消费到消息内容:%s \n", string(msg.Body))
	msg.Ack(false)
}

  • 生产者发送消息完整代码:
package main

import (
	"context"
	"fmt"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
	// 连接 rabbitMQ 服务器(延迟关闭)
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	if err != nil {
		panic(err)
	}
	defer conn.Close()

	// 创建channel
	ch, err := conn.Channel()
	if err != nil {
		panic(err)
	}
	defer ch.Close()

	// 创建交换机
	err = ch.ExchangeDeclare(
		"direct.test.go", // 交换机名称
		"direct",         // 交换机类型
		true,             // 是否持久化
		false,            // 是否自动删除
		false,            // 是否内部交换机
		false,            // 是否等待
		nil,              // 其他参数
	)

	// 创建队列
	q, err := ch.QueueDeclare(
		"queue.direct.test.go", // 队列名称
		true,                   // 是否持久化
		false,                  // 是否自动删除
		false,                  // 是否排他
		false,                  // 是否等待
		nil,                    // 其他参数
	)
	if err != nil {
		panic(err)
	}

	// 绑定交换机与队列
	ch.QueueBind(
		q.Name,           // 队列名称
		"test",           // 路由键
		"direct.test.go", // 交换机名称
		false,            // 是否等待
		nil,              // 其他参数
	)

	// 创建超时context
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	// 推送消息
	body := "Hello World!88888"
	ch.PublishWithContext(ctx,
		"direct.test.go", // 交换机名称
		"test",           // 路由键
		false,            // 是否等待
		false,            // 是否立即
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		},
	)

	fmt.Printf("推送消息:%s", body)
}
  • 消费者端接收消息完整代码:
package main

import (
	"fmt"

	amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
	// 连接 rabbitMQ 服务器(延迟关闭)
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	if err != nil {
		panic(err)
	}
	defer conn.Close()

	// 创建channel
	ch, err := conn.Channel()
	if err != nil {
		panic(err)
	}
	defer ch.Close()

	// 消费消息 如果生产者端或手动建立了交换机和队列,及绑定关系,则消费者端可直接根据队列名消费消息,而无需再创建交换机、队列等
	msgs, err := ch.Consume(
		"queue.direct.test.go", // 队列名称
		"",                     // 消费者标识
		false,                  // 是否自动确认
		false,                  // 是否排他
		false,                  // 是否等待
		false,                  // 其他参数
		nil,                    // 其他参数
	)

	// 取出管道中的消息并打印
	for msg := range msgs {
		fmt.Printf("消费到消息内容:%s \n", string(msg.Body))
		msg.Ack(false)
	}
}

在队列 queque.direct.test.go 中,可查看到消息内容: 

五、RabbitMQ 进阶篇

5.1 消息的可靠性投递

  • 三种消息丢失场景

①生产者:发送消息到rabbitMQ服务的过程中出现丢失

②rabbitMQ服务器:进行消息持久化的过程中出现丢失,比如服务宕机重启

③消费者:拉取信息时存在网络波动等导致消息丢失,或消息者处理消息异常导致丢失

  • 消息丢失的解决方案

 ①消息确认机制(针对生产者)

Confirm 模式是 RabbitMQ 提供的一种消息可靠性保障机制。当生产者通过 Confirm 模式发送消息时它会等待 RabbitMQ 的确认,确保消息已经被正确地投递到了指定的 Exchange 中。
消息正确投递到 queue 时,会返回 ack。
消息没有正确投递到 queue 时,会返回 nack。如果 exchange 没有绑定 queue,也会出现消息丢失

使用方法:
生产者通过 confirm.select 方法将 Channel 设置为 Confirm 模式
发送消息后,通过添加 add confirm listener 方法,监听消息的确认状态。

 ②消息持久化机制(针对rabbitMQ服务器)

持久化机制是指将消息存储到磁盘,以保证在 RabbitMQ 服务器宕机或重启时,消息不会丢失。

使用方法:
生产者通过将消息的 delivery mode 属性设置为 2,将消息标记为持久化。
队列也需要进行持久化设置,确保队列在 RabbitMQ 服务器重启后仍然存在。经典队列需要将durable属性设置为true。而仲裁队列和流式队列默认必须持久化保存。

注意事项:
持久化机制会影响性能,因此在需要确保消息不丢失的场景下使用

③ACK事务机制(针对消费者)

ACK 事务机制用于确保消息被正确消费。当消息被消费者成功处理后,消费者发送确认(ACK)给RabbitMQ,告知消息可以被移除。这个过程是自动处理的,也可以关闭进行手工发送 ACK。

使用方法:
在 RabbitMQ 中,ACK 机制默认是开启的。当消息被消费者接收后,会立即从队列中删除,除非消费者发生异常。
可以手动开启 ACK 机制,通过将 auto_ack 参数设置为 False,手动控制消息的 ACK

注意事项:
ACK 机制可以确保消息不会被重复处理,但如果消费者发生异常或者未发送 ACK,消息可能会被重复投递

  • 三种重复消费场景

①生产者:重复发送同一条消息

②rabbit服务器:消费者消费消息后,没来得及发送ACK消息,rabbit服务器挂掉,MQ认为消息还未被消费,当MQ重启后,会继续推送这条消息

③消费者:消费者处理完消息,没来得及发送ACK确认消息,消费者挂掉。MQ认为消息还未被消费,当消费者重启后,会再次接收到这条消息

  • 重复消费的解决方案 

 ①使用数据库唯一约束(局限性大)

 ②插入消费记录:根据消息ID,将消息先插入数据库,插入成功后给rabbitMQ返回ACK确认消息。消费者处理完业务,则增加标记,表示消息已处理成功;如果处理失败,则记录失败次数及原因,已提醒管理人员进行手动处理。

  • 消息堆积的原因

  •  消息堆积的解决方案

①优化消费者性能,增加消费者数量

②增加队列的容量,以存储更多的消息

③将无法处理的消息转移到死信队列

④将大消息分割为小消息,提高处理速度

⑤简化消费端业务处理逻辑

⑥控制生产者发送消息的速度

⑦设置消息优先级,优先处理高优先级的消息

5.2 消费端限流

通过设置rabbitMQ的prefetch count参数,可以控制服务器一次投递给消费者的消息数量,以适应消费者处理消息的速率(避免大量消息都投递到消费者)。

// 消费者端代码

//设置消费端限流  rabbitmq未收到ack消息时,只投递1条消息到消费者
err = ch.Qos(
	1,     // prefetch count
	0,     // prefetch size
	false, // global
)

5.3 消息超时

在 RabbitMQ 中,设置消息或队列的“超时”(即自动过期/消失)主要有两种方式,都通过 x-message-ttl 参数来实现。当消息在队列中停留的时间超过这个值,且未被消费者确认(ACK),RabbitMQ 会将该消息标记为“死信”(Dead Letter)或直接丢弃。

优先级:如果队列设置了 TTL,消息也设置了 TTL,取两者中较短的那个

  • 队列级 TTL

该队列中的所有消息都需要相同的过期时间,在声明队列 (QueueDeclare) 时,通过 Arguments 参数设置。

// 创建队列
q, err := ch.QueueDeclare(
	"queue.direct.test.ttl2", // 队列名称
	true,                     // 是否持久化
	false,                    // 是否自动删除
	false,                    // 是否排他
	false,                    // 是否等待
	map[string]any{           // 其他参数
		amqp.QueueMessageTTLArg: 30000, // 队列中消息过期时间,单位是毫秒
	},
)
  • 消息级 TTL        

同一个队列中,不同消息需要不同的过期时间(例如:VIP 用户消息保留 1 小时,普通用户保留 5 分钟)。在发布消息 (Publish) 时,通过 Publishing 属性的 Expiration 字段设置。

body := fmt.Sprintf("Hello World! %d", i)
ch.PublishWithContext(ctx,
	"direct.test.go", // 交换机名称
	"test",           // 路由键
	false,            // 是否等待
	false,            // 是否立即
	amqp.Publishing{
		ContentType: "text/plain",
		Body:        []byte(body),
		Expiration:  "5000", // 发布消息时,可以设置该条消息的过期时间,与队列的消息过期时间参数对比,取较短的过期时间
	},
)

5.4 死信队列

死信队列是 RabbitMQ 中一种特殊的机制,用于接收那些无法正常被消费的消息。

产生的原因:

  • 消息被拒绝
  • 消息过期
  • 队列满了

使用方法:

x-dead-letter-exchange:指定死信消息要转发到的交换机名称

x-dead-letter-routing-key:指定死信消息转发时的路由键

// 创建队列
q, err := ch.QueueDeclare(
	"queue.direct.test.ttl2", // 队列名称
	true,                     // 是否持久化
	false,                    // 是否自动删除
	false,                    // 是否排他
	false,                    // 是否等待
	map[string]any{ // 其他参数
		amqp.QueueMessageTTLArg:     30000,                        // 队列中消息过期时间
		"x-dead-letter-exchange":    "exchange.direct.dead.leter", // 过期后发往死信队列绑定的交换机
		"x-dead-letter-routing-key": "routing.key.dead.leter",     // 指定路由键  
	},
)

应用场景:

  • 订单超时自动取消
  • 异常消息隔离与报警
  • 消息重试机制

5.5 延迟队列

延迟队列是消息中间件中一种特殊的场景,指消息被发送后,不会立即被消费者消费,而是需要在指定的时间之后才能被消费。

实现方式:

  • 方案一:正常队列 TTL + 死信队列
  • 方案二:rabbitmq_delayed_message_exchange 插件

应用场景:

5.6 惰性队列

尽可能将消息直接存储在磁盘上,只在消费者请求时才将少量消息加载到 RAM 中。内存占用极低且稳定,能轻松处理百万级甚至亿级的消息堆积,不会因内存爆炸而宕机。

使用方法:创建队列时,配置 x-queue-mode 参数为 lazy。

// 创建队列
q, err := ch.QueueDeclare(
	"queue.direct.test.ttl3", // 队列名称
	true,                     // 是否持久化
	false,                    // 是否自动删除
	false,                    // 是否排他
	false,                    // 是否等待
	map[string]any{ // 其他参数
		"x-queue-mode": "lazy", // 关键配置:开启惰性模式
	},
)

使用场景:

5.7 优先级队列

优先级队列允许你在发送消息时给每条消息分配一个优先级(0-255)。当消费者从队列中取消息时,RabbitMQ 会优先投递优先级高的消息,而不是严格按照“先进先出”(FIFO)的顺序。

使用方法:

在 RabbitMQ 中,优先级队列不是默认开启的,需要在声明队列时设置最大优先级参数 x-max-priority。数值越大,优先级越高。

创建队列时,配置 x-max-priority 参数;发送消息时设置消息的优先级 Priority。

// 创建队列
q, err := ch.QueueDeclare(
	"queue.direct.test.ttl3", // 队列名称
	true,                     // 是否持久化
	false,                    // 是否自动删除
	false,                    // 是否排他
	false,                    // 是否等待
	map[string]any{ // 其他参数
		"x-max-priority": int32(10), // 设置最大优先级为 10
	},
)


// 发送消息
ch.PublishWithContext(ctx,
	"direct.test.go", // 交换机名称
	"test",           // 路由键
	false,            // 是否等待
	false,            // 是否立即
	amqp.Publishing{
		ContentType: "text/plain",
		Body:        []byte(body),
		Priority: 9, // 设置消息优先级,值越大优先级越高
	},
)

应该场景:

优先级队列缺点:

  • 内存消耗剧增
  • 只有“相对”优先,没有“绝对”插队

六、RabbitMQ 集群篇

6.1 集群搭建

环境说明: windows wsl 系统中运用 docker compose 搭建 rabbitmq 集群

6.1.1 创建相关目录

目录结构如下:
rabbitmq-cluster/
├── docker-compose.yml
├── entrypoint.sh
└── data/           # (可选) 用于持久化数据,脚本运行后会自动生成子文件夹

6.1.2 创建自动加入集群的 shell 脚本 entrypoint.sh

创建 entrypoint.sh 文件,用于将 rabbitmq 节点自动加入到集群,创建完该文件后需要确保脚本有执行权限(执行命令 chmod +x entrypoint.sh)

#!/bin/bash
set -e

# 配置变量
COOKIE_VAL="${RABBITMQ_ERLANG_COOKIE:-secret_cookie_123}"
COOKIE_FILE="/var/lib/rabbitmq/.erlang.cookie"
MY_HOST="$(hostname)"
MASTER_NODE="rabbit@rabbitmq1"

echo "[$(date)] 启动节点: $MY_HOST"

# 1. 统一 Erlang Cookie (集群通信的关键)
if [ ! -f "$COOKIE_FILE" ]; then
    echo "写入 Erlang Cookie..."
    echo "$COOKIE_VAL" > "$COOKIE_FILE"
    chmod 600 "$COOKIE_FILE"
    chown rabbitmq:rabbitmq "$COOKIE_FILE"
fi

# 2. 判断是否为主节点 (rabbitmq1)
if [ "$MY_HOST" == "rabbitmq1" ]; then
    echo "[$(date)] 检测到是主节点 (rabbitmq1),直接启动服务..."
    exec rabbitmq-server
fi

# 3. 非主节点逻辑:后台启动 -> 等待主节点 -> 加入集群 -> 前台运行
echo "[$(date)] 检测到是从节点,准备加入集群 $MASTER_NODE ..."

# 启动 RabbitMQ 为后台守护进程
rabbitmq-server -detached

# 等待本地服务完全就绪
echo "[$(date)] 等待本地服务就绪..."
until rabbitmqctl status > /dev/null 2>&1; do
    sleep 2
done

# 等待主节点可连接 (防止主节点还没起好就尝试加入)
echo "[$(date)] 等待主节点 $MASTER_NODE 响应..."
while ! rabbitmqctl ping -n "$MASTER_NODE" > /dev/null 2>&1; do
    echo "主节点未就绪,等待 2 秒..."
    sleep 2
done

# 执行集群加入操作
echo "[$(date)] 执行加入集群操作..."
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster "$MASTER_NODE"
rabbitmqctl start_app

echo "[$(date)] 成功加入集群!重启进程以前台模式运行..."

# 停止后台进程,以便 exec 接管容器保持存活
rabbitmqctl stop

# 4. 以前台模式正式运行 (此时已属于集群)
exec rabbitmq-server

6.1.3 创建 docker-compose.yml 

services:
  # --- 节点 1 (主节点/种子节点) ---
  rabbitmq1:
    image: rabbitmq:3.13-management-alpine
    hostname: rabbitmq1
    container_name: rabbitmq1
    environment:
      # 基础认证
      - RABBITMQ_DEFAULT_USER=admin
      - RABBITMQ_DEFAULT_PASS=admin
      # 集群通信密钥 (所有节点必须一致)
      - RABBITMQ_ERLANG_COOKIE=secret_cookie_123
    ports:
      - "15673:15672"  # 管理界面
      - "5673:5672"    # AMQP 端口
    volumes:
      - ./data/rabbitmq1:/var/lib/rabbitmq
      # 挂载脚本到容器内
      - ./entrypoint.sh:/entrypoint.sh:ro
    entrypoint: ["bash", "/entrypoint.sh"]
    networks:
      - rabbitmq_net
    healthcheck:
      test: ["CMD", "rabbitmq-diagnostics", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5
    restart: always

  # --- 节点 2 ---
  rabbitmq2:
    image: rabbitmq:3.13-management-alpine
    hostname: rabbitmq2
    container_name: rabbitmq2
    environment:
      - RABBITMQ_DEFAULT_USER=admin
      - RABBITMQ_DEFAULT_PASS=admin
      - RABBITMQ_ERLANG_COOKIE=secret_cookie_123
    ports:
      - "15674:15672"
      - "5674:5672"
    volumes:
      - ./data/rabbitmq2:/var/lib/rabbitmq
      # 挂载脚本到容器内
      - ./entrypoint.sh:/entrypoint.sh:ro
    entrypoint: ["bash", "/entrypoint.sh"]
    depends_on:
      rabbitmq1:
        condition: service_healthy  # 确保节点 1 健康后再启动节点 2
    networks:
      - rabbitmq_net
    healthcheck:
      test: ["CMD", "rabbitmq-diagnostics", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5
    restart: always

  # --- 节点 3 ---
  rabbitmq3:
    image: rabbitmq:3.13-management-alpine
    hostname: rabbitmq3
    container_name: rabbitmq3
    environment:
      - RABBITMQ_DEFAULT_USER=admin
      - RABBITMQ_DEFAULT_PASS=admin
      - RABBITMQ_ERLANG_COOKIE=secret_cookie_123
    ports:
      - "15675:15672"
      - "5675:5672"
    volumes:
      - ./data/rabbitmq3:/var/lib/rabbitmq
      # 挂载脚本到容器内
      - ./entrypoint.sh:/entrypoint.sh:ro
    entrypoint: ["bash", "/entrypoint.sh"]
    depends_on:
      rabbitmq1:
        condition: service_healthy
      rabbitmq2:
        condition: service_healthy
    networks:
      - rabbitmq_net
    healthcheck:
      test: ["CMD", "rabbitmq-diagnostics", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5
    restart: always

networks:
  rabbitmq_net:
    driver: bridgeservices:
  # --- 节点 1 (主节点/种子节点) ---
  rabbitmq1:
    image: rabbitmq:3.13-management-alpine
    hostname: rabbitmq1
    container_name: rabbitmq1
    environment:
      # 基础认证
      - RABBITMQ_DEFAULT_USER=admin
      - RABBITMQ_DEFAULT_PASS=admin
      # 集群通信密钥 (所有节点必须一致)
      - RABBITMQ_ERLANG_COOKIE=secret_cookie_123
    ports:
      - "15673:15672"  # 管理界面
      - "5673:5672"    # AMQP 端口
    volumes:
      - ./data/rabbitmq1:/var/lib/rabbitmq
      # 挂载脚本到容器内
      - ./entrypoint.sh:/entrypoint.sh:ro
    entrypoint: ["bash", "/entrypoint.sh"]
    networks:
      - rabbitmq_net
    healthcheck:
      test: ["CMD", "rabbitmq-diagnostics", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5
    restart: always

  # --- 节点 2 ---
  rabbitmq2:
    image: rabbitmq:3.13-management-alpine
    hostname: rabbitmq2
    container_name: rabbitmq2
    environment:
      - RABBITMQ_DEFAULT_USER=admin
      - RABBITMQ_DEFAULT_PASS=admin
      - RABBITMQ_ERLANG_COOKIE=secret_cookie_123
    ports:
      - "15674:15672"
      - "5674:5672"
    volumes:
      - ./data/rabbitmq2:/var/lib/rabbitmq
      # 挂载脚本到容器内
      - ./entrypoint.sh:/entrypoint.sh:ro
    entrypoint: ["bash", "/entrypoint.sh"]
    depends_on:
      rabbitmq1:
        condition: service_healthy  # 确保节点 1 健康后再启动节点 2
    networks:
      - rabbitmq_net
    healthcheck:
      test: ["CMD", "rabbitmq-diagnostics", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5
    restart: always

  # --- 节点 3 ---
  rabbitmq3:
    image: rabbitmq:3.13-management-alpine
    hostname: rabbitmq3
    container_name: rabbitmq3
    environment:
      - RABBITMQ_DEFAULT_USER=admin
      - RABBITMQ_DEFAULT_PASS=admin
      - RABBITMQ_ERLANG_COOKIE=secret_cookie_123
    ports:
      - "15675:15672"
      - "5675:5672"
    volumes:
      - ./data/rabbitmq3:/var/lib/rabbitmq
      # 挂载脚本到容器内
      - ./entrypoint.sh:/entrypoint.sh:ro
    entrypoint: ["bash", "/entrypoint.sh"]
    depends_on:
      rabbitmq1:
        condition: service_healthy
      rabbitmq2:
        condition: service_healthy
    networks:
      - rabbitmq_net
    healthcheck:
      test: ["CMD", "rabbitmq-diagnostics", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5
    restart: always

networks:
  rabbitmq_net:
    driver: bridge

6.1.4 启动集群并查看相关日志

# 启动集群
docker compose up -d

# 查看日志
docker logs rabbitmq2

或者查看节点的集群状态,Running Nodes 显示三个节点,表示创建集群成功

# 查看集群状态
docker exec -it rabbitmq1 rabbitmqctl cluster_status

6.1.5 访问网页管理端查看集群状态

访问任意节点的管理端,可以查看到三个节点,如图所示:

# 访问管理端,用户名及密码都是admin(docker-compose.yml中配置的)
http://127.0.0.1:15673

6.2 Quorum 队列

6.2.1 概念

Quorum Queue(仲裁队列)只能在 RabbitMQ 集群(Cluster)环境中创建和使用,创建队列后,会在集群中的所有节点都创建副本,其中一个节点被选举为 Leader(负责处理读写请求), 另外的节点作为 Follower(同步存储数据副本)。

采用 “多数派写入” 机制,当消息写入时,必须由集群中超过半数的节点确认成功后,才视为写入成功并返回给生产者。

当消费者成功处理消息并发送 ACK 确认后,该消息会从集群中所有节点(Leader 和所有 Follower)上同时删除。

6.2.2 特点

6.3.3 使用方法

在任意一个节点上创建队列,type 选择 Quorum,即可创建仲裁队列。

6.3 Stream 队列

RabbitMQ Stream Queue(流队列)是 RabbitMQ 从 3.9 版本引入的一种全新队列类型,专为高吞吐、大数据量、日志类场景设计。 它的核心设计理念借鉴了 Apache Kafka,将 RabbitMQ 从一个传统的“即时消费”消息代理,扩展为支持“日志回放”和“无限存储”的流处理平台。

机制:

消息像写日志一样,按顺序追加到磁盘文件中。

区别:

经典/仲裁队列:消息被消费并 ACK 后,立即删除。

流队列:消息被消费后不会删除,而是永久保留(直到达到配置的保留策略,如时间或大小限制)。

优势:

支持历史消息回放。新消费者加入时,可以从头开始读取,或者从任意时间点(Offset)开始读取。

6.4 基于 go 语言使用集群消息队列的方法

6.4.1 创建连接

func connectToCluster(nodes []string) (*amqp.Connection, error) {
	var lastErr error

	// 遍历所有节点尝试连接
	for _, nodeUrl := range nodes {
		fmt.Printf("正在尝试连接节点: %s ...\n", nodeUrl)

		conn, err := amqp.Dial(nodeUrl)
		if err == nil {
			// 连接成功!
			fmt.Printf("✅ 成功连接到节点: %s\n", nodeUrl)
			return conn, nil
		}

		lastErr = err
		log.Printf("❌ 连接节点 %s 失败: %v", nodeUrl, err)
		// 可选:可以在这里加一个极短的延时,避免瞬间风暴
		time.Sleep(100 * time.Millisecond)
	}

	// 如果所有节点都试过了还是失败,返回最后一个错误
	return nil, fmt.Errorf("无法连接到集群中的所有节点,最后错误: %w", lastErr)
}

6.4.2 创建仲裁队列

需要指定队列的类型为仲裁队列,x-queue-type = "quorum"

// 创建仲裁队列
q, err := ch.QueueDeclare(
	"queue.direct.test.cluster", // 队列名称
	true,                        // 是否持久化
	false,                       // 是否自动删除
	false,                       // 是否排他
	false,                       // 是否等待
	amqp.Table{
		amqp.QueueTypeArg: amqp.QueueTypeQuorum, // 重点:集群使用仲裁队列
	},
)

6.4.3 完整代码示例

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

// 配置集群地址:用逗号分隔多个节点
// 格式:amqp://用户名:密码@节点1:端口,节点2:端口,节点3:端口/虚拟主机
var clusterUrls = []string{
	"amqp://admin:admin@127.0.0.1:5673/",
	"amqp://admin:admin@127.0.0.1:5674/",
	"amqp://admin:admin@127.0.0.1:5675/",
}

func main() {
	// 启动重连循环
	reconnectLoop()
}

func connectToCluster(nodes []string) (*amqp.Connection, error) {
	var lastErr error

	// 遍历所有节点尝试连接
	for _, nodeUrl := range nodes {
		fmt.Printf("正在尝试连接节点: %s ...\n", nodeUrl)

		conn, err := amqp.Dial(nodeUrl)
		if err == nil {
			// 连接成功!
			fmt.Printf("✅ 成功连接到节点: %s\n", nodeUrl)
			return conn, nil
		}

		lastErr = err
		log.Printf("❌ 连接节点 %s 失败: %v", nodeUrl, err)
		// 可选:可以在这里加一个极短的延时,避免瞬间风暴
		time.Sleep(100 * time.Millisecond)
	}

	// 如果所有节点都试过了还是失败,返回最后一个错误
	return nil, fmt.Errorf("无法连接到集群中的所有节点,最后错误: %w", lastErr)
}

func reconnectLoop() {
	var conn *amqp.Connection
	var ch *amqp.Channel
	var notifyClose chan *amqp.Error

	// 重连间隔
	reconnectInterval := 5 * time.Second

	for {
		// 1. 尝试连接集群
		// Dial 会依次尝试 urls 中的地址,直到成功或全部失败
		// 注意:amqp091-go 的 Dial 通常只接受一个 URL 字符串

		fmt.Printf("正在尝试连接 RabbitMQ 集群: %s\n", clusterUrls)

		c, err := connectToCluster(clusterUrls)
		if err != nil {
			log.Printf("连接集群失败: %v, %s 后重试...", err, reconnectInterval)
			time.Sleep(reconnectInterval)
			continue
		}

		conn = c
		notifyClose = make(chan *amqp.Error)
		conn.NotifyClose(notifyClose)

		// 2. 创建 Channel
		channel, err := conn.Channel()
		if err != nil {
			log.Printf("创建 Channel 失败: %v", err)
			conn.Close()
			time.Sleep(reconnectInterval)
			continue
		}
		ch = channel

		fmt.Println("✅ 成功连接到 RabbitMQ 集群并创建 Channel")

		// 3. 在这里执行业务逻辑 (发布/消费)
		// 建议将 ch 传递给具体的业务协程
		doWork(ch)

		// 4. 等待连接关闭通知 (阻塞)
		err = <-notifyClose
		log.Printf("连接断开: %v", err)

		// 清理资源
		if ch != nil {
			ch.Close()
		}
		if conn != nil {
			conn.Close()
		}

		// 5. 等待一段时间后重试
		time.Sleep(reconnectInterval)
	}
}

func doWork(ch *amqp.Channel) {
	// 模拟业务运行
	// 在实际应用中,这里会启动消费者协程或生产者循环
	// 它们会使用传入的 ch
	// 创建交换机
	err := ch.ExchangeDeclare(
		"direct.test.cluster", // 交换机名称
		"direct",              // 交换机类型
		true,                  // 是否持久化
		false,                 // 是否自动删除
		false,                 // 是否内部交换机
		false,                 // 是否等待
		nil,                   // 其他参数
	)
	if err != nil {
		fmt.Println(err)
	}

	// 创建队列
	q, err := ch.QueueDeclare(
		"queue.direct.test.cluster", // 队列名称
		true,                        // 是否持久化
		false,                       // 是否自动删除
		false,                       // 是否排他
		false,                       // 是否等待
		amqp.Table{
			amqp.QueueTypeArg: amqp.QueueTypeQuorum, // 重点:集群使用仲裁队列
		},
	)
	if err != nil {
		panic(err)
	}

	// 绑定交换机与队列
	ch.QueueBind(
		q.Name,                // 队列名称
		"test",                // 路由键
		"direct.test.cluster", // 交换机名称
		false,                 // 是否等待
		nil,                   // 其他参数
	)

	// 创建超时context
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	// 推送消息
	for i := range 30 {
		body := fmt.Sprintf("Hello World! rabbitmq cluster %d", i)
		ch.PublishWithContext(ctx,
			"direct.test.cluster", // 交换机名称
			"test",                // 路由键
			false,                 // 是否等待
			false,                 // 是否立即
			amqp.Publishing{
				ContentType: "text/plain",
				Body:        []byte(body),
			},
		)

		fmt.Printf("推送消息:%s\n", body)
		time.Sleep(1 * time.Second)
	}
}

参考视频:消息中间件夺命连环18问,一口气刷完面试必问的消息中间件面试内容,让你面试少走99%的弯路!_哔哩哔哩_bilibili

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐