RabbitMQ 5种核心消息模型
RabbitMQ 5种核心消息模型
一、什么是 RabbitMQ?
官方定义
RabbitMQ 是一个开源的消息代理软件(Message Broker),它实现了 AMQP(高级消息队列协议)。它接收来自生产者的消息,将其路由并存储到队列中,然后由消费者从队列中获取并处理消息。
通俗理解
可以把 RabbitMQ 想象成一个邮局:
- 你(生产者)把信(消息)交给邮局。
- 邮局内部有分拣中心(交换机 Exchange),根据信封上的地址(路由键 Routing Key)把信分到不同的信箱(队列 Queue)。
- 收信人(消费者)定期从自己的信箱里取信。
- 如果信箱满了或收信人暂时不在,邮局会暂存信件,直到收信人取走。
二、应用场景与价值
1. 异步处理
- 官方描述:将不需要同步返回结果的操作放到消息队列中,让主流程快速完成。
- 通俗解释:比如用户注册后,需要发邮件和短信。如果同步处理,用户要等邮件和短信发完才能收到“注册成功”的响应,体验很差。改为异步:注册成功后立即返回成功,同时将发送邮件/短信的任务放入队列,由后台消费者慢慢处理。
2. 服务解耦
- 官方描述:通过消息队列作为中间层,消除服务间的直接依赖。
- 通俗解释:订单系统下单后,需要通知库存系统减库存。如果订单系统直接调用库存系统的接口,一旦库存系统挂了,订单也会失败。引入 MQ:订单系统只把“订单创建”消息发给队列,库存系统自己去队列里取消息处理,两者互不影响。
3. 流量削峰
- 官方描述:将突发的海量请求暂存在消息队列中,后端服务按自身能力匀速处理,防止系统被冲垮。
- 通俗解释:秒杀活动开始时,瞬间有 10 万请求。如果直接打到数据库,数据库会崩。把请求先放入队列,后端每秒只处理 1000 个,慢慢消化,保证系统稳定。
4. 柔性事务(最终一致性)
- 官方描述:在分布式系统中,通过消息队列保证多个服务的数据最终一致,而不依赖强一致性事务。
- 通俗解释:支付成功后,需要更新订单状态、增加积分、出库。这些操作可能分布在多个系统中。支付成功后发送一条“支付成功”消息,各个系统消费后各自更新本地数据,最终所有系统状态一致。
三、AMQP 与 JMS
| 对比项 | AMQP | JMS |
|---|---|---|
| 全称 | Advanced Message Queuing Protocol | Java Message Service |
| 性质 | 协议(定义了网络层的数据格式) | API 规范(仅 Java 领域) |
| 跨语言 | 是,任何语言只要实现该协议就能通信 | 否,只适用于 Java 应用 |
| 消息模型 | 提供五种交换机模型,非常灵活 | 只有点对点(Queue)和发布订阅(Topic) |
| 消息类型 | 二进制(支持任意格式) | 支持 Java 对象、文本、字节等 |
| 典型实现 | RabbitMQ(最流行)、ActiveMQ(支持 AMQP) | ActiveMQ、HornetQ |
通俗总结:AMQP 是通用语言,不同语言写的系统都能用 RabbitMQ 通信;JMS 是Java 内部的规范,只能 Java 应用之间用。
四、同类产品对比
| 特性 | RabbitMQ | Kafka | RocketMQ | ActiveMQ |
|---|---|---|---|---|
| 单机吞吐量 | 万级 | 百万级 | 十万级 | 万级 |
| 消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒级 |
| 可靠性 | 高(配置灵活) | 极高(分布式持久化) | 高(金融级) | 中 |
| 协议支持 | AMQP, MQTT, STOMP | 自定义协议 | 自定义协议 | JMS, AMQP, MQTT |
| 社区活跃度 | 极高 | 极高 | 高 | 中 |
| 适用场景 | 企业级应用、实时性要求高 | 日志收集、流式计算、大数据 | 业务交易、金融场景 | 传统 Java 项目 |
选型建议:
- 一般业务系统(订单、消息通知)→ RabbitMQ
- 海量日志收集、用户行为追踪 → Kafka
- 金融级、高可靠、需事务支持 → RocketMQ
五、RabbitMQ 核心概念

1. Broker(消息代理)
- 官方:RabbitMQ 服务器实例,负责接收、存储、转发消息。
- 通俗:就是整个邮局大楼。
2. Virtual Host(虚拟主机)
- 官方:一个逻辑隔离的命名空间,内含独立的交换机、队列、绑定关系,类似数据库中的 database。
- 通俗:邮局大楼里的不同楼层,1 楼处理个人信件,2 楼处理商业包裹,互不干扰。
- 权限管理-账户管理-靠虚拟主机来完成的。
3. Connection(连接)
- 官方:客户端与 Broker 之间的 TCP 长连接。
- 通俗:从你家到邮局的专用通道。
4. Channel(信道)
- 官方:在 Connection 内创建的虚拟通道,多数 AMQP 命令都在 Channel 上执行,一个连接可以创建多个 Channel。
- 通俗:通道上的多个窗口,每个窗口可以独立办理业务,不用重新排队。
5. Exchange(交换机)
- 官方:接收生产者发送的消息,并根据路由规则将消息分发到一个或多个队列。
- 通俗:邮局的分拣中心,根据信封上的地址决定把信投到哪个信箱。
6. Queue(队列)
- 官方:存储消息的容器,消息在队列中等待被消费者取走,具有 FIFO 特性。
- 通俗:你家门口的信箱,信件到了就放里面,等你回家取。
7. Binding(绑定)
- 官方:交换机与队列之间的关联关系,定义了如何将消息从交换机路由到队列。
- 通俗:分拣规则,比如“所有地址带‘北京’的信,都投到 1 号信箱”。
8. Routing Key(路由键)
- 官方:生产者发送消息时附带的一个字符串,供交换机判断消息应该发给哪些队列。
- 通俗:信封上的地址。
9. Binding Key(绑定键)
- 官方:在绑定时设置的字符串,与 Routing Key 进行匹配(匹配规则取决于交换机类型)。
- 通俗:信箱上写的收件规则,比如“只收挂号信”。
六、RabbitMq 创建交换机和队列
- 管理界面:重点关注交换机和队列
创建队列

- 队列名称:
lqb.queue.simple(项目简写.队列.简单模式) - 持久化:Durable,D表示支持持久化
- 属性参数:队列超时时间、队列参数等
创建广播交换机(fanout)
1. 创建交换机,执行类型和持久化。

- 定义交换机名称
- 交换机类型选择:广播
- 消息持久化:持久化
- 是否自动删除交换机,否NO
- 是否是内部交换机,不是NO。(内部交换机是Rabbitmq自己要用的交换机)
2. 新增两个消息队列

3. 交换机绑定消息队列
-
点击交换机名字

-
广播类型的交换机不需要写路由名称
创建路由交换机(direct)

1. 创建路由交换机,类型选择为direct

2. 绑定路由交换机

- 交换机绑定消息队列时,必须指定路由名称。
创建主题交换机(tipic)

1. 创建主题交换机

2. 创建消息队列

3. 绑定消息队列
-
消息队列
lqb.queue.topic01:绑定路由key:*.a.* -
消息队列
lqb.queue.topic02:绑定路由key:*.*.b -
消息队列
lqb.queue.topic02:绑定路由key:c.# -
虽然消息队列2绑定了两个路由key,如果一条路由key
c.*.b,同时匹配了两条路由规则,实际只有一条数据进入消息队列。
七、5 种核心消息模式详解
在 RabbitMQ 的体系中,根据消息的流转目标(是一个人收到还是多个人收到),可以将这 5 种模式清晰地划分为 “点对点 (Point-to-Point)” 和 “一对多 (Publish/Subscribe)” 两大类。
消息模式分类
点对点模式 (Point-to-Point)
核心定义:发送者发送的每一条消息,最终有且仅有一个消费者能收到并处理。
- 简单模式和工作模式,不需要创建交换机,使用RabbitMQ默认创建的交换机。
- 简单模式 (Simple)
- 结构:
P -> Q -> C - 场景:最基础的单发单收。
- 逻辑:生产者将消息放入队列,消费者从队列监听。一旦消息被消费,立即从队列删除。
- 工作队列模式 (Work Queues)
- 结构:
P -> Q -> 多C - 场景:任务分发。当生产速度大于单个消费者的处理速度时使用。
- 逻辑:
- 竞争关系:多个消费者共同监听同一个队列,但一条消息只会被其中一个消费者抢到。
- 能者多劳:配合
prefetchCount=1,处理快的机器会承担更多任务。 - 默认公平:如果不配置负载均衡,RabbitMQ 会采用轮询(Round-Robin)平均分配。
一对多模式 (Publish/Subscribe)
核心定义:发送者发送的一条消息,可以被多个不同的消费者同时收到(每个消费者拥有自己的独立队列)。这个分类下必须使用 Exchange(交换机)。
- 广播模式 (Fanout)
- 交换机:
amq.fanout - 逻辑:不看路由键。生产者发给交换机,交换机瞬间复制 N 份,转发给所有绑定到它的队列。
- 场景:全量通知(如:系统广播、实时天气更新)。
- 路由模式 (Direct)
- 交换机:
amq.direct - 逻辑:精确匹配。队列绑定时指定一个
RoutingKey。只有消息的 Key 与队列的 Key 完全一致时,消息才进入该队列。 - 场景:错误日志分级。例如:
Error队列只收error消息,而Log队列收info/warn/error所有消息。
- 主题模式 (Topic)
- 交换机:
amq.topic - 逻辑:模糊匹配(路由模式的加强版)。
*:匹配一个单词(必须有一个)。#:匹配零个或多个单词。
- 场景:按类别订阅。例如:
stock.us.#订阅所有美国股票信息,#.news订阅所有分类的新闻。
| 特性 | 点对点 (Simple/Work) | 一对多 (Fanout/Direct/Topic) |
|---|---|---|
| 队列数量 | 通常只有一个队列 | 每个消费者/业务方拥有独立队列 |
| 消息副本 | 只有 1 份消息 | 消息会被 Exchange 复制多份分发 |
| 消费者关系 | 竞争关系(你拿我就没) | 协作/独立关系(人手一份) |
| 路由中介 | 默认交换机 (隐式) | 显式声明 Exchange |
| 应用目标 | 提高任务处理效率 | 实现系统解耦、多方通知 |
问:为什么“路由模式”绑定多个名字后感觉像广播?
答: 广播是“强制性”的,只要连上就得听;而路由是“选择性”的。虽然一个队列可以绑定多个 Key(比如 A 队列绑定了 order 和 pay),但它依然只拿它感兴趣的内容。
- 广播:像村里的大喇叭,所有人听的内容都一样。
- 路由/主题:像订阅报纸,你可以订《晨报》和《晚报》,别人订《体育报》,大家互不干扰,各取所需。
1. 简单模式(Simple)
- 模型:生产者 → 队列 → 单个消费者
- 官方:最简单的点对点通信,一个队列只被一个消费者监听。
- 通俗:你写一封信,直接投到收信人的私人信箱,只有他本人能取。
配套配置类(声明队列)
@Configuration
public class SimpleConfig {
/**
* 声明一个队列
* @return Queue 对象
*
* 构造参数详解:
* - String name: 队列名称,必须唯一
* - boolean durable: 是否持久化(true:重启后队列依然存在,默认false)
* - boolean exclusive: 是否排他(true:只被当前连接使用,连接关闭后队列自动删除,默认false)
* - boolean autoDelete: 是否自动删除(true:最后一个消费者断开后自动删除,默认false)
*/
@Bean
public Queue simpleQueue() {
return new Queue("queue.simple", true); // 持久化队列
}
}
生产者完整类
@Component
public class SimpleSender {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送简单消息
*
* convertAndSend 方法参数:
* - String exchange: 交换机名称,空串表示默认交换机(AMQP default)
* - String routingKey: 路由键,在默认交换机下,routingKey 就是队列名称
* - Object message: 消息内容,会自动序列化
*/
public void send() {
String message = "Hello Simple!";
rabbitTemplate.convertAndSend("", "queue.simple", message);
System.out.println("已发送: " + message);
}
}
消费者完整类
package com.demo.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class SimpleReceiver {
/**
* 监听指定队列
* @RabbitListener 注解属性:
* - queues: 指定要监听的队列名称(可多个)
* 方法参数自动注入消息体(默认反序列化为String)
*/
@RabbitListener(queues = "queue.simple")
public void receive(String message) {
System.out.println("收到消息: " + message);
}
}
2. 工作队列模式(Worker)
模型:生产者 → 队列 → 多个消费者(竞争消费)
- 官方:多个消费者监听同一个队列,每条消息只被一个消费者处理,默认轮询分发。
- 通俗:一个信箱前站了好几个邮递员,信到了只有一个邮递员拿走,其他人拿不到。
配置类(声明队列)
@Configuration
public class WorkerConfig {
@Bean
public Queue workQueue() {
// 参数含义同简单模式
return new Queue("queue.work", true);
}
}
生产者完整类
@Component
public class WorkerSender {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送多个工作消息
* @param count 发送数量
*/
public void send(int count) {
for (int i = 1; i <= count; i++) {
String message = "任务 " + i;
// 使用默认交换机,routingKey 为队列名
rabbitTemplate.convertAndSend("queue.work", message);
}
}
}
两个消费者(竞争消费)
@Component
public class WorkerReceiver1 {
/**
* 监听工作队列
* 默认轮询分发,每条消息只被一个消费者处理
*/
@RabbitListener(queues = "queue.work")
public void receive(String message) throws InterruptedException {
System.out.println("Worker1 开始处理: " + message);
Thread.sleep(1000); // 模拟耗时
System.out.println("Worker1 完成: " + message);
}
}
@Component
public class WorkerReceiver2 {
@RabbitListener(queues = "queue.work")
public void receive(String message) throws InterruptedException {
System.out.println("Worker2 开始处理: " + message);
Thread.sleep(500); // 模拟耗时较短
System.out.println("Worker2 完成: " + message);
}
}
3. 广播模式(Fanout)
- 模型:生产者 → Fanout Exchange → 多个队列 → 多个消费者
- 官方:交换机将收到的消息广播给所有绑定的队列,忽略 Routing Key。
- 通俗:广播站大喇叭喊话,所有装了喇叭的屋子都能听到,不管门牌号。
完整配置类(声明交换机、队列、绑定)
@Configuration
public class FanoutConfig {
/**
* 1. 定义 Fanout 交换机
*
* 交换机构造函数参数:
* - String name: 交换机名称
* - boolean durable: 是否持久化(true:重启后交换机依然存在,默认false)
* - boolean autoDelete: 是否自动删除(true:最后一个队列解绑后自动删除,默认false)
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("exchange.fanout", true, false);
}
/**
* 2. 定义两个队列
* 参数含义同前
*/
@Bean
public Queue queueA() {
return new Queue("queue.fanout.a", true);
}
@Bean
public Queue queueB() {
return new Queue("queue.fanout.b", true);
}
/**
* 3. 绑定队列到交换机(Fanout 不需要 routing key)
*
* BindingBuilder 用法:
* - bind(queue).to(exchange) 返回一个无路由键的绑定
* - 也可用 with(routingKey) 指定路由键,但 Fanout 会忽略
*/
@Bean
public Binding bindingA(Queue queueA, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queueA).to(fanoutExchange);
}
@Bean
public Binding bindingB(Queue queueB, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queueB).to(fanoutExchange);
}
}
生产者完整类
@Component
public class FanoutSender {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送广播消息
* @param message 消息内容
*
* 由于 Fanout 忽略 routing key,通常传空串
*/
public void send(String message) {
rabbitTemplate.convertAndSend("exchange.fanout", "", message);
System.out.println("广播消息已发送: " + message);
}
}
消费者A(监听 queue.fanout.a)
package com.demo.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class FanoutReceiverA {
@RabbitListener(queues = "queue.fanout.a")
public void receive(String message) {
System.out.println("队列A 收到广播: " + message);
}
}
消费者B(监听 queue.fanout.b)
package com.demo.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class FanoutReceiverB {
@RabbitListener(queues = "queue.fanout.b")
public void receive(String message) {
System.out.println("队列B 收到广播: " + message);
}
}
4. 路由模式(Direct)
- 模型:生产者 → Direct Exchange → 根据 Routing Key 精确匹配 → 特定队列
- 官方:消息的 Routing Key 必须与队列绑定的 Binding Key 完全相等,才能路由到该队列。
- 通俗:挂号信,信封上写了“李四收”,只有名字叫“李四”的信箱才能收到。
完整配置类
@Configuration
public class DirectConfig {
@Bean
public DirectExchange directExchange() {
// 持久化交换机
return new DirectExchange("exchange.direct", true, false);
}
@Bean
public Queue errorQueue() {
return new Queue("queue.direct.error", true);
}
@Bean
public Queue infoQueue() {
return new Queue("queue.direct.info", true);
}
/**
* 绑定 error 队列,使用 routing key "error"
* with(routingKey) 指定绑定的路由键
*/
@Bean
public Binding bindingError(Queue errorQueue, DirectExchange directExchange) {
return BindingBuilder.bind(errorQueue).to(directExchange).with("error");
}
@Bean
public Binding bindingInfo(Queue infoQueue, DirectExchange directExchange) {
return BindingBuilder.bind(infoQueue).to(directExchange).with("info");
}
}
生产者完整类
@Component
public class DirectSender {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送错误日志
* @param message 日志内容
*/
public void sendError(String message) {
rabbitTemplate.convertAndSend("exchange.direct", "error", "[错误] " + message);
}
/**
* 发送普通日志
*/
public void sendInfo(String message) {
rabbitTemplate.convertAndSend("exchange.direct", "info", "[信息] " + message);
}
}
消费者(错误队列)
@Component
public class DirectErrorReceiver {
@RabbitListener(queues = "queue.direct.error")
public void receive(String message) {
System.out.println("错误日志处理器: " + message);
}
}
消费者(信息队列)
package com.demo.consumer;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class DirectInfoReceiver {
@RabbitListener(queues = "queue.direct.info")
public void receive(String message) {
System.out.println("普通日志处理器: " + message);
}
}
5. 主题模式(Topic)
- 模型:生产者 → Topic Exchange → 根据 Routing Key 模式匹配 → 队列
- 官方:Routing Key 由点号分隔的单词组成(如
log.error.app),绑定键可使用通配符:*:匹配一个单词#:匹配零个或多个单词
- 通俗:智能分拣,比如“所有北京发出的信件”走一个通道,无论收件人是谁。
完整配置类
@Configuration
public class TopicConfig {
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("exchange.topic", true, false);
}
@Bean
public Queue orderQueue() {
return new Queue("queue.topic.order", true);
}
@Bean
public Queue paymentQueue() {
return new Queue("queue.topic.payment", true);
}
/**
* 绑定 order 队列,匹配所有以 "order." 开头的 routing key
* 通配符:* 匹配一个单词,如 order.created, order.paid
*/
@Bean
public Binding orderBinding(Queue orderQueue, TopicExchange topicExchange) {
return BindingBuilder.bind(orderQueue).to(topicExchange).with("order.*");
}
/**
* 绑定 payment 队列,匹配所有以 "payment." 开头的 routing key(可多层)
* 通配符:# 匹配零个或多个单词,如 payment.success, payment.success.alipay
*/
@Bean
public Binding paymentBinding(Queue paymentQueue, TopicExchange topicExchange) {
return BindingBuilder.bind(paymentQueue).to(topicExchange).with("payment.#");
}
}
生产者完整类
@Component
public class TopicSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendOrderCreated() {
rabbitTemplate.convertAndSend("exchange.topic", "order.created", "订单已创建");
}
public void sendOrderPaid() {
rabbitTemplate.convertAndSend("exchange.topic", "order.paid", "订单已支付");
}
public void sendPaymentSuccess() {
rabbitTemplate.convertAndSend("exchange.topic", "payment.success", "支付成功");
}
public void sendPaymentFailed() {
rabbitTemplate.convertAndSend("exchange.topic", "payment.failed", "支付失败");
}
}
消费者(订单队列)
@Component
public class TopicOrderReceiver {
@RabbitListener(queues = "queue.topic.order")
public void receive(String message) {
System.out.println("订单处理器收到: " + message);
}
}
消费者(支付队列)
@Component
public class TopicPaymentReceiver {
@RabbitListener(queues = "queue.topic.payment")
public void receive(String message) {
System.out.println("支付处理器收到: " + message);
}
}
八、RabbitMQ 内置交换机说明
RabbitMQ 默认创建了几个交换机,你可以在管理界面看到(名称以 amq. 开头):
| 交换机名称 | 类型 | 作用 |
|---|---|---|
(AMQP default) |
direct | 默认交换机,空 routing key 表示直接发给同名队列 |
amq.direct |
direct | 直连交换机(精确匹配) |
amq.fanout |
fanout | 广播交换机 |
amq.topic |
topic | 主题交换机 |
amq.headers |
headers | 头交换机(根据消息头属性匹配) |
amq.match |
headers | 是 amq.headers 的别名 |
amq.rabbitmq.log |
topic | 内部日志交换机(Internal) |
amq.rabbitmq.trace |
topic | 消息追踪交换机(Internal) |
注意:内置交换机一般仅供系统使用,生产环境中通常自己定义交换机。
九、消息监听器容器类型
| 容器类型 | 描述 | 特点 |
|---|---|---|
| SimpleMessageListenerContainer | 传统容器,每个消费者一个线程,共享 Channel | 支持事务、批量确认,配置灵活 |
| DirectMessageListenerContainer | 较新的容器,每个消费者独占一个 Channel,直接在 RabbitMQ 客户端线程上调用监听器 | 更轻量,高吞吐,不支持事务 |
配置方式(application.yml):
- 新容器:
spring:
rabbitmq:
listener:
type: direct # 可选 simple 或 direct,默认 simple
# 新的容器,二选一
direct:
acknowledge-mode: manual
- 传统容器
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
concurrency: 3 # 并发消费者数
max-concurrency: 10
十、代码创建交换机和队列的两种方式(详细注释)
方式一:注解方式(在 @RabbitListener 中声明)
package com.demo.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class AnnotationListener {
/**
* 使用 @RabbitListener 的 bindings 属性动态声明交换机、队列和绑定
*
* @Queue 注解属性:
* - name: 队列名称
* - durable: 是否持久化(默认 "",实际解析为 false,可填 "true"/"false")
* - exclusive: 是否排他(默认 "",false)
* - autoDelete: 是否自动删除(默认 "",false)
*
* @Exchange 注解属性:
* - name: 交换机名称
* - type: 交换机类型(direct/fanout/topic/headers,默认 direct)
* - durable: 是否持久化(默认 "",false)
* - autoDelete: 是否自动删除(默认 "",false)
* - internal: 是否内部交换机(默认 "",false)
*
* @QueueBinding 属性:
* - value: 队列(@Queue)
* - exchange: 交换机(@Exchange)
* - key: 绑定路由键(可多个)
*/
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(name = "queue.annotation", durable = "true"),
exchange = @Exchange(name = "exchange.annotation", type = "direct", durable = "true"),
key = {"routing.key"}
)
)
public void handleMessage(String msg, Message message, Channel channel) throws Exception {
long tag = message.getMessageProperties().getDeliveryTag();
try {
log.info("注解方式收到消息: {}", msg);
channel.basicAck(tag, false);
} catch (Exception e) {
channel.basicNack(tag, false, true);
}
}
}
方式二:配置类方式(推荐生产环境)
package com.demo.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class AppConfig {
/**
* 声明交换机
*
* ExchangeBuilder 提供链式调用:
* - directExchange(name): 创建 Direct 交换机构建器
* - durable(boolean): 设置持久化
* - autoDelete(boolean): 设置自动删除
* - internal(boolean): 设置内部交换机
*/
@Bean
public DirectExchange myExchange() {
return ExchangeBuilder.directExchange("exchange.my")
.durable(true) // 持久化
.autoDelete(false) // 不自动删除
.build();
}
/**
* 声明队列
*
* QueueBuilder 提供链式调用:
* - durable(name): 持久化队列
* - exclusive(): 排他队列
* - autoDelete(): 自动删除
* - ttl(long): 消息 TTL(毫秒)
* - maxLength(int): 队列最大长度
* - deadLetterExchange(String): 死信交换机
* - deadLetterRoutingKey(String): 死信路由键
*/
@Bean
public Queue myQueue() {
return QueueBuilder.durable("queue.my")
.ttl(60000) // 消息存活时间 60秒
.maxLength(1000) // 最多存储 1000 条消息
.build();
}
/**
* 绑定队列到交换机
*
* BindingBuilder 用法:
* - bind(queue).to(exchange).with(routingKey)
*/
@Bean
public Binding myBinding(Queue myQueue, DirectExchange myExchange) {
return BindingBuilder.bind(myQueue).to(myExchange).with("my.key");
}
}
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)