[RabbitMQ]应用知识点总结
🌟 导言
本文围绕 RabbitMQ 在实际开发中的核心应用方式 展开,系统梳理了消息通信模型、交换机路由机制、常见工作模式、可靠性保障手段以及 Spring Boot 整合 RabbitMQ 的基本写法。读完后,可以快速建立一套从 概念理解 到 代码落地 的 RabbitMQ 知识框架。
🗺️ 知识脑图/架构
- RabbitMQ 核心角色
- Producer:生产者,负责发送消息
- Consumer:消费者,负责处理消息
- Queue:队列,负责存储消息
- Exchange:交换机,负责路由消息
- RoutingKey:生产者发送消息时携带的路由键
- BindingKey:队列绑定交换机时指定的匹配规则
- RabbitMQ 常见工作模式
- Simple:简单模式
- Work Queues:工作队列模式
- Publish/Subscribe:发布订阅模式
- Routing:路由模式
- Topics:通配符模式
- RPC:远程调用模式
- Publisher Confirms:发布确认模式
- 路由与交换机机制
- Fanout Exchange:广播
- Direct Exchange:精确匹配
- Topic Exchange:通配匹配
- Headers Exchange:基于消息头匹配
- 消息可靠性保障
- 持久化:队列/交换机/消息持久化
- 消费者确认:自动 ACK 与手动 ACK
- QoS / prefetch:限制未确认消息数量
- mandatory:处理不可路由消息
- Publisher Confirms:保证生产者发送侧可靠
- Spring Boot 整合 RabbitMQ
- RabbitTemplate 发送消息
- @RabbitListener 监听消息
- QueueBuilder / ExchangeBuilder / BindingBuilder 声明组件
🚀 核心知识点详解
1. RabbitMQ 到底解决了什么问题
1.1 什么是 RabbitMQ
RabbitMQ 是一个实现了 AMQP 协议的消息中间件,主要作用是在系统之间进行 异步通信、应用解耦、流量削峰、任务分发。
它的本质不是“存数据”,而是“在系统之间传递消息并按规则分发消息”。
1.2 为什么要用 RabbitMQ
在业务系统中,如果所有模块都采用同步调用,会出现几个典型问题:
- 系统耦合度高,一个服务挂掉会影响整条调用链。
- 峰值流量时容易把下游服务压垮。
- 某些任务并不需要立刻完成,更适合异步处理。
RabbitMQ 的价值正体现在这些地方:
| 痛点 | RabbitMQ 的解决方式 |
|---|---|
| 服务强耦合 | 通过消息队列让系统解耦 |
| 突发高并发 | 通过队列缓存消息,起到削峰作用 |
| 耗时任务阻塞主流程 | 改为异步消费,提高响应速度 |
| 一条消息需要多方处理 | 通过交换机广播或路由分发 |
核心理解:RabbitMQ 不是为了替代数据库,而是为了让“消息的发送”和“消息的处理”解耦。
2. RabbitMQ 的核心组成与消息流转
2.1 核心角色
| 角色 | 作用 | 关键点 |
|---|---|---|
| Producer | 发送消息 | 只负责投递,不负责处理 |
| Consumer | 消费消息 | 负责业务逻辑执行 |
| Queue | 存储消息 | 消息真正落地的地方 |
| Exchange | 路由消息 | 决定消息发往哪些队列 |
| RoutingKey | 发送路由键 | 生产者告诉交换机如何分发 |
| BindingKey | 绑定规则 | 队列告诉交换机接收哪些消息 |
2.2 消息流转总过程
- Producer 将消息发送给 Exchange。
- Exchange 根据交换机类型和 RoutingKey 进行路由。
- 满足规则的队列通过 BindingKey 接收消息。
- Consumer 从队列中拉取或监听消息并处理。
关键结论:在 RabbitMQ 中,生产者通常不是直接把消息发给队列,而是先发给 Exchange,再由交换机决定消息进入哪个队列。
2.3 RoutingKey 与 BindingKey 的区别
| 对比项 | RoutingKey | BindingKey |
|---|---|---|
| 出现阶段 | 发送消息时 | 绑定队列时 |
| 谁来指定 | 生产者 | 队列与交换机的绑定关系 |
| 作用 | 告诉交换机如何路由 | 告诉交换机队列接收什么消息 |
可以这样记:生产者发消息时用 RoutingKey,队列绑定交换机时用 BindingKey。
3. 七种工作模式总览
3.1 模式对比表
| 模式 | 是否使用交换机 | 消息分发特点 | 典型场景 |
|---|---|---|---|
| Simple | 默认交换机 | 一个生产者对应一个消费者 | 最基础的点对点通信 |
| Work Queues | 默认交换机 | 多个消费者竞争同一队列消息 | 异步任务、短信发送 |
| Publish/Subscribe | Fanout | 广播到所有绑定队列 | 通知、订阅、广播 |
| Routing | Direct | 按精确路由键分发 | 按日志级别分类 |
| Topics | Topic | 按通配符规则匹配 | 复杂业务分类分发 |
| RPC | 队列 + 回调队列 | 请求响应式通信 | 远程调用 |
| Publisher Confirms | 任意 | 保证生产者发送可靠 | 金融、订单等高可靠场景 |
4. Simple 与 Work Queues
4.1 Simple 简单模式
Simple 是最基础的消息模型:一个生产者、一个队列、一个消费者。
适用场景:
- 入门学习。
- 单一消费者处理消息。
- 对消息分发要求不复杂的简单业务。
4.2 Work Queues 工作队列模式
Work Queues 可以理解为 Simple 模式的增强版:多个消费者共同消费一个队列中的消息。
其核心价值是把大量任务分摊给多个消费者处理,提升吞吐量。
4.3 为什么 Work Queues 很重要
它解决的是“任务堆积”和“单消费者处理不过来”的问题。
比如下单成功后,需要发送短信、邮件、推送等任务,这些任务可以先进入队列,由多个消费者竞争消费。
4.4 怎么用
- 创建连接和信道。
- 声明队列
queueDeclare。 - 生产者通过
basicPublish发消息。 - 消费者通过
basicConsume监听消息。
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("work_queues", true, false, false, null);
for (int i = 0; i < 10; i++) {
String msg = "Hello World" + i;
channel.basicPublish("", "work_queues", null, msg.getBytes());
}
核心特点:同一个队列中的同一条消息,只会被其中一个消费者消费,不会被多个消费者重复消费。
5. Publish/Subscribe:发布订阅模式
5.1 是什么
发布订阅模式的本质是:生产者发送一条消息,多个消费者都能收到同一份消息。
它依赖 Fanout Exchange 实现广播。
5.2 为什么需要交换机
如果没有交换机,生产者只能把消息发到某个具体队列,无法灵活地让多方同时收到消息。
而 Fanout Exchange 会把消息复制后分发给所有绑定的队列。
5.3 怎么用
核心步骤如下:
- 声明 Fanout 类型交换机。
- 声明多个队列。
- 把多个队列绑定到同一个交换机。
- 发送消息时
routingKey通常传空字符串。
channel.exchangeDeclare("test_fanout", BuiltinExchangeType.FANOUT, true, false, false, null);
channel.queueDeclare("fanout_queue1", true, false, false, null);
channel.queueDeclare("fanout_queue2", true, false, false, null);
channel.queueBind("fanout_queue1", "test_fanout", "");
channel.queueBind("fanout_queue2", "test_fanout", "");
channel.basicPublish("test_fanout", "", null, "hello fanout".getBytes());
适用场景:通知广播、实时消息推送、事件同步给多个系统。
6. Routing:路由模式
6.1 是什么
Routing 模式 是发布订阅模式的升级版,使用 Direct Exchange 按照 RoutingKey 精确匹配 把消息发送到指定队列。
6.2 为什么需要 Routing
Fanout 是“无脑广播”,所有绑定队列都会收到消息;但有些业务只希望消息分给特定消费者。
例如日志系统中:
error日志进入错误处理队列。info日志进入信息归档队列。warning日志进入监控告警队列。
6.3 路由机制图
6.4 怎么用
channel.exchangeDeclare("test_direct", BuiltinExchangeType.DIRECT, true, false, false, null);
channel.queueDeclare("direct_queue1", true, false, false, null);
channel.queueDeclare("direct_queue2", true, false, false, null);
channel.queueBind("direct_queue1", "test_direct", "orange");
channel.queueBind("direct_queue2", "test_direct", "black");
channel.queueBind("direct_queue2", "test_direct", "green");
channel.basicPublish("test_direct", "orange", null, "hello direct, I am orange".getBytes());
channel.basicPublish("test_direct", "black", null, "hello direct, I am black".getBytes());
channel.basicPublish("test_direct", "green", null, "hello direct, I am green".getBytes());
核心规则:Direct 模式下,只有 RoutingKey 与 BindingKey 完全一致,消息才会进入对应队列。
7. Topics:通配符模式
7.1 是什么
Topics 模式 可以理解为比 Routing 更灵活的路由模型,使用 Topic Exchange,支持通配符匹配。
7.2 通配符规则
| 通配符 | 含义 |
|---|---|
* |
匹配一个单词 |
# |
匹配零个或多个单词 |
RoutingKey 与 BindingKey 一般都使用 . 分隔单词,例如:
order.errororder.pay.infopay.error
7.3 路由机制图
7.4 怎么用
channel.exchangeDeclare("test_topic", BuiltinExchangeType.TOPIC, true, false, false, null);
channel.queueDeclare("topic_queue1", true, false, false, null);
channel.queueDeclare("topic_queue2", true, false, false, null);
channel.queueBind("topic_queue1", "test_topic", "*.error");
channel.queueBind("topic_queue2", "test_topic", "#.info");
channel.queueBind("topic_queue2", "test_topic", "*.error");
channel.basicPublish("test_topic", "order.error", null, "hello topic, I'm order.error".getBytes());
channel.basicPublish("test_topic", "order.pay.info", null, "hello topic, I'm order.pay.info".getBytes());
channel.basicPublish("test_topic", "pay.error", null, "hello topic, I'm pay.error".getBytes());
7.5 Topic 与 Direct 的区别
| 对比项 | Direct | Topic |
|---|---|---|
| 匹配方式 | 精确匹配 | 通配符匹配 |
| 灵活性 | 较低 | 较高 |
| 适用场景 | 路由规则固定 | 路由规则复杂、多级分类 |
重点结论:如果消息的 RoutingKey 没有匹配到任何 BindingKey,消息会被丢弃;如果设置了
mandatory,则有机会返回给生产者处理。
8. RPC:基于 RabbitMQ 的请求响应
8.1 是什么
RPC 模式 并不是传统意义上的单向消息投递,而是借助 RabbitMQ 实现“请求 - 响应”式通信。
它通常会用到两个关键属性:
- replyTo:指定回调队列。
- correlationId:标记请求与响应的对应关系。
8.2 工作流程
- 客户端把请求消息发到请求队列。
- 客户端在消息属性中设置 replyTo,告诉服务端把响应发到哪个回调队列。
- 服务端消费请求后,处理业务逻辑。
- 服务端把结果发送到 replyTo 指定的队列。
- 客户端监听回调队列,并根据 correlationId 匹配响应。
在实际开发中,一个客户端通常会同时发送很多个异步请求,但它们都复用同一个回调队列. 客户端通过监听回调队列中的correlationId就可以通过查询本地的内存HashMap, 找出和correlationId对应的业务数据,然后把业务数据交给那个等在原地的业务线程
8.3 为什么 RPC 里常配合 QoS 和手动 ACK
RPC 场景下通常希望“一个请求对应一个可靠响应”,因此服务端使用了:
channel.basicQos(1):限制同一时间只取一个未确认消息。autoAck = false:关闭自动确认。basicAck(...):业务处理完后再确认消息。
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
.correlationId(properties.getCorrelationId())
.build();
String message = new String(body);
String response = "request:" + message + ", response: 处理成功";
channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes());
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume("rpc_request_queue", false, consumer);
关键理解:RPC 虽然也基于消息队列,但它的目标不是解耦异步,而是实现带返回值的消息通信。
9. 消息可靠性:确认机制、QoS 与持久化
9.1 消费者确认机制:autoAck 与 manualAck
9.1.1 自动确认
当 autoAck=true 时,RabbitMQ 在消息投递给消费者后,就会立即认为消息已经消费成功。
优点:
- 编码简单。
- 吞吐量高。
缺点:
- 如果消费者处理失败,消息可能已经丢失。
9.1.2 手动确认
当 autoAck=false 时,消费者处理完业务后必须显式调用 basicAck。
优点:
- 更安全。
- 更适合重要消息。
缺点:
- 编程稍复杂。
- 如果忘记 ACK,消息会长期处于未确认状态。
| 方式 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| autoAck=true | 简单、吞吐高 | 容易丢消息 | 日志、低价值消息 |
| autoAck=false | 更可靠 | 代码复杂度更高 | 订单、支付、重要任务 |
建议:凡是业务上不能丢的消息,优先考虑 手动 ACK。
9.2 QoS / prefetchCount
课程中提到,如果不设置 basicQos,默认 prefetchCount=0,Broker 可能一次给消费者推送多条消息。
这会导致:
- 某个消费者拿到过多消息。
- 消费不均衡。
- 处理慢的消费者积压未确认消息。
因此可以使用:
channel.basicQos(1);
它表示:在当前消费者未确认前,不再继续分发新的消息给它。
这就是公平分发的核心手段之一,尤其适合 Work Queues 和 RPC 场景。
9.3 持久化
这里强调了 队列和交换机的 durable 参数。
例如:
queueDeclare(..., true, ...):队列持久化。exchangeDeclare(..., true, ...):交换机持久化。
其意义在于:RabbitMQ 重启后,相关元数据不会丢失。
仅仅队列持久化还不够,真正要做到高可靠,通常还需要配合 消息持久化 与 发布确认 一起使用。
10. Publisher Confirms:发布确认机制
10.1 是什么
Publisher Confirms 用于解决“生产者把消息发出去后,到底 RabbitMQ 收没收到”的问题。
它是 发送端可靠性 的核心机制。
10.2 核心原理
- 生产者调用
channel.confirmSelect()开启确认模式。 - 之后发送的每条消息都会分配一个递增序号。
- Broker 成功接收后,异步返回 ACK。
- 如果 Broker 因内部错误导致失败,则可能返回 NACK。
10.3 三种确认策略
| 策略 | 做法 | 优点 | 缺点 |
|---|---|---|---|
| 单条同步确认 | 每发一条就 waitForConfirmsOrDie |
最直观,最安全 | 性能最差 |
| 批量确认 | 每发一批后统一确认 | 性能更高 | 出错后不易定位具体哪条失败 |
| 异步确认 | addConfirmListener 注册回调 |
性能最好 | 编程复杂度最高 |
10.4 单条确认示例
channel.confirmSelect();
channel.basicPublish("", "publisher_confirms_queue1", null, "msg".getBytes());
channel.waitForConfirmsOrDie(5000);
10.5 异步确认思路
- 使用
addConfirmListener监听 ACK / NACK。 - 用一个有序集合保存未确认消息序号。
- 收到 ACK 后删除对应序号。
- 收到 NACK 后进行重发、记录日志或补偿处理。
channel.confirmSelect();
SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
confirmSet.headSet(deliveryTag + 1).clear();
} else {
confirmSet.remove(deliveryTag);
}
}
});
核心结论:发布确认解决的是“消息有没有到 Broker”,而消费者 ACK 解决的是“消息有没有被业务成功处理”。两者不是一回事。
11. Spring Boot 整合 RabbitMQ
11.1 为什么 Spring Boot 开发更高效
相比直接使用 amqp-client,Spring Boot 整合后有几个明显优势:
- 自动装配连接工厂与模板对象。
- 可以用 RabbitTemplate 统一发送消息。
- 可以用 @RabbitListener 直接监听队列。
- 可以通过 QueueBuilder / ExchangeBuilder / BindingBuilder 声明基础设施。
11.2 工作队列模式整合思路
声明队列
@Configuration
public class RabbitMQConfig {
@Bean("workQueue")
public Queue workQueue() {
return QueueBuilder.durable("work_queue").build();
}
}
使用durable可以创建一个持久化的工作队列,这样一来即使 RabbitMQ 服务器重启、崩溃或断电,队列和其中的消息仍然会被保留。
反之,noDurable可以创建一个非持久化的队列
发送消息
@RestController
@RequestMapping("/producer")
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/work")
public String work() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("", "work_queue", "hello spring amqp: work...");
}
return "发送成功";
}
}
此时我们直接在浏览器中访问127.0.0.1:8080/producer/work对应的url,
然后输入http://47.116.96.217:15672访问 RabbitMQ控制台,可以看到相应的工作队列已经被创建,且已经有了十条消息
先前的队列是我之前做项目创建的,这里可以先忽略掉

监听消息
使用@RabbitListener 可以实现消息的消费, 在注解内可以配置绑定的队列名,队列名这类常量我们也可以写一个constants包来统一管理
@Component
public class WorkListener {
@RabbitListener(queues = Constants.WORK_QUEUE)
public void work(Message message, Channel channel){
System.out.println("listener 1 ["+Constants.WORK_QUEUE+"] 接收到消息:" +message + ",channel:"+channel);
}
@RabbitListener(queues = Constants.WORK_QUEUE)
public void queueListener2(String message){
System.out.println("listener 2 ["+Constants.WORK_QUEUE+"] 接收到消息:" +message);
}
}
此时再次重启项目,调用producer/work接口来发送消息,观察控制台的日志结果

以上展示了 RabbitMQ 最经典的工作队列模式:
- 10 条消息被两个消费者均分
2.每条消息只被消费一次- 自动负载均衡(轮询)
11.3 Fanout / Direct / Topic 在 Spring Boot 中的声明方式
Fanout (广播模式)
配置广播模式交换机和队列,并建立绑定关系:
@Bean("fanoutQueue1")
public Queue fanoutQueue1() {
return QueueBuilder.durable(Constants.FANOUT_QUEUE1).build();
}
@Bean("fanoutQueue2")
public Queue fanoutQueue2() {
return QueueBuilder.durable(Constants.FANOUT_QUEUE2).build();
}
//声明交换机
@Bean("fanoutExchange")
public FanoutExchange fanoutExchange() {
return ExchangeBuilder.fanoutExchange(Constants.FANOUT_EXCHANGE).durable(true).build();
}
//队列和交换机绑定
@Bean
public Binding fanoutBinding(@Qualifier("fanoutExchange") FanoutExchange exchange, @Qualifier("fanoutQueue1") Queue queue) {
return BindingBuilder.bind(queue).to(exchange);
}
@Bean
public Binding fanoutBinding2(@Qualifier("fanoutExchange") FanoutExchange exchange, @Qualifier("fanoutQueue2") Queue queue) {
return BindingBuilder.bind(queue).to(exchange);
}
设置消息生产者:
@RequestMapping("/fanout")
public String fanoutProduct(@RequestParam(defaultValue = "10") int count){
for (int i = 0; i < count; i++) {
rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE, "", "hello spring amqp: fanout..." + i);
}
return "ok";
}
配置FanoutListener
@Component
public class FanoutListener {
//指定监听队列的名称
@RabbitListener(queues = Constants.FANOUT_QUEUE1)
public void ListenerQueue(String message){
System.out.println("["+Constants.FANOUT_QUEUE1+ "]接收到消息:"+ message);
}
@RabbitListener(queues = Constants.FANOUT_QUEUE2)
public void ListenerQueue2(String message){
System.out.println("["+Constants.FANOUT_QUEUE2+ "]接收到消息:"+ message);
}
}
访问对应接口,传入count参数
Direct 示例(路由模式)
配置路由模式交换机,队列,绑定关系
//路由模式
@Bean("directQueue1")
public Queue directQueue1(){
return QueueBuilder.durable(Constants.DIRECT_QUEUE1).build();
}
@Bean("directQueue2")
public Queue directQueue2(){
return QueueBuilder.durable(Constants.DIRECT_QUEUE2).build();
}
@Bean("directExchange")
public DirectExchange directExchange(){
return ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE).durable(true).build();
}
@Bean("directQueueBinding1")
public Binding directQueueBinding1(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue1") Queue queue){
return BindingBuilder.bind(queue).to(directExchange).with("orange");
}
@Bean("directQueueBinding2")
public Binding directQueueBinding2(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue2") Queue queue){
return BindingBuilder.bind(queue).to(directExchange).with("black");
}
@Bean("directQueueBinding3")
public Binding directQueueBinding3(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("directQueue2") Queue queue){
return BindingBuilder.bind(queue).to(directExchange).with("orange");
}
接口设计
@RequestMapping("/direct")
public String directProduct(@RequestParam(defaultValue = "10") String routingKey){
rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE, routingKey, "hello spring amqp:direct, my routing key is "+routingKey);
return "发送成功";
}

Topic (通配符模式)
//通配符模式
@Bean("topicQueue1")
public Queue topicQueue1(){
return QueueBuilder.durable(Constants.TOPIC_QUEUE1).build();
}
@Bean("topicQueue2")
public Queue topicQueue2(){
return QueueBuilder.durable(Constants.TOPIC_QUEUE2).build();
}
@Bean("topicExchange")
public TopicExchange topicExchange(){
return ExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE).durable(true).build();
}
@Bean("topicQueueBinding1")
public Binding topicQueueBinding1(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue1") Queue queue){
return BindingBuilder.bind(queue).to(topicExchange).with("*.orange.*");
}
@Bean("topicQueueBinding2")
public Binding topicQueueBinding2(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue2") Queue queue){
return BindingBuilder.bind(queue).to(topicExchange).with("*.*.rabbit");
}
@Bean("topicQueueBinding3")
public Binding topicQueueBinding3(@Qualifier("topicExchange") TopicExchange topicExchange, @Qualifier("topicQueue2") Queue queue){
return BindingBuilder.bind(queue).to(topicExchange).with("lazy.#");
}
@RequestMapping("/topic/{routingKey}")
public String topic(@PathVariable("routingKey") String routingKey){
rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE, routingKey, "hello spring amqp:topic, my routing key is "+routingKey);
return "发送成功";
}
@Component
public class DirectListener {
@RabbitListener(queues = Constants.DIRECT_QUEUE1)
public void queueListener1(String message){
System.out.println("队列["+Constants.DIRECT_QUEUE1+"] 接收到消息:" +message);
}
@RabbitListener(queues = Constants.DIRECT_QUEUE2)
public void queueListener2(String message){
System.out.println("队列["+Constants.DIRECT_QUEUE2+"] 接收到消息:" +message);
}
}
访问 http://127.0.0.1:8080/producer/topic/lazy.orange.cheems
此时通过路径传参,rountingKey为lazy.orange.cheems ,topic.queue1和topic.queue2都能接收到消息
11.4 @RabbitListener 的理解
@RabbitListener 可以接收多种类型参数,例如:
String:直接拿到消息内容。Message:拿到原始消息体和属性。Channel:便于手动确认消息等高级操作。
如果只是快速开发,
String够用;如果要处理消息头、投递信息或手动 ACK,优先使用 Message 或 Channel。
这里要使用rabbit提供的原生api接口,注意不到导包错误, 否则会报错~~
⚠️ 踩坑指南 / 最佳实践
- 不要把 Exchange 当成存储组件。 交换机只负责路由,不负责保存消息;没有匹配队列时,消息可能直接丢失。
- 生产者通常发给交换机,而不是直接发给业务队列。 直接发队列虽然简单,但扩展性差。
- Direct 和 Topic 最容易混淆。 Direct 是完全匹配,Topic 才支持
*与#通配符。 - Work Queues 场景下建议配合
basicQos(1)。 否则可能出现某个消费者拿到过多消息,负载不均衡。 - 重要业务不要默认开启
autoAck=true。 自动确认会在业务未真正处理完成时就删除消息,存在丢消息风险。 - 开启持久化不等于绝对可靠。 想要完整保障链路,通常要同时考虑队列持久化、消息持久化、消费者 ACK、发布确认。
- 未匹配路由的消息要重点关注。 课程中特别提到可以通过
mandatory让不可路由消息返回给生产者,而不是悄悄丢失。 - RPC 场景必须注意
replyTo与correlationId。 少了这两个字段,请求与响应很难正确对应。 - Spring Boot 整合时,建议把队列、交换机、绑定关系统一放到配置类中管理。 这样结构更清晰,也更方便维护。
✍️ 复习与自测
- RoutingKey 和 BindingKey 有什么区别?它们分别在哪个阶段发挥作用?
- 为什么说 Fanout、Direct、Topic 三种交换机分别适合不同复杂度的业务路由?
- Work Queues 模式下,如果不设置
basicQos(1),可能出现什么问题? - Publisher Confirms 和消费者 ACK 分别解决的是哪一段链路的可靠性问题?
- 在 Spring Boot 中,为什么推荐使用 QueueBuilder / ExchangeBuilder / BindingBuilder 配置 RabbitMQ 基础设施?
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)