RabbitMQ 5种核心消息模型


一、什么是 RabbitMQ?

官方定义

RabbitMQ 是一个开源的消息代理软件(Message Broker),它实现了 AMQP(高级消息队列协议)。它接收来自生产者的消息,将其路由并存储到队列中,然后由消费者从队列中获取并处理消息。

通俗理解

可以把 RabbitMQ 想象成一个邮局

  • 你(生产者)把信(消息)交给邮局。
  • 邮局内部有分拣中心(交换机 Exchange),根据信封上的地址(路由键 Routing Key)把信分到不同的信箱(队列 Queue)
  • 收信人(消费者)定期从自己的信箱里取信。
  • 如果信箱满了或收信人暂时不在,邮局会暂存信件,直到收信人取走。

二、应用场景与价值

1. 异步处理

  • 官方描述:将不需要同步返回结果的操作放到消息队列中,让主流程快速完成。
  • 通俗解释:比如用户注册后,需要发邮件和短信。如果同步处理,用户要等邮件和短信发完才能收到“注册成功”的响应,体验很差。改为异步:注册成功后立即返回成功,同时将发送邮件/短信的任务放入队列,由后台消费者慢慢处理。

2. 服务解耦

  • 官方描述:通过消息队列作为中间层,消除服务间的直接依赖。
  • 通俗解释:订单系统下单后,需要通知库存系统减库存。如果订单系统直接调用库存系统的接口,一旦库存系统挂了,订单也会失败。引入 MQ:订单系统只把“订单创建”消息发给队列,库存系统自己去队列里取消息处理,两者互不影响。

3. 流量削峰

  • 官方描述:将突发的海量请求暂存在消息队列中,后端服务按自身能力匀速处理,防止系统被冲垮。
  • 通俗解释:秒杀活动开始时,瞬间有 10 万请求。如果直接打到数据库,数据库会崩。把请求先放入队列,后端每秒只处理 1000 个,慢慢消化,保证系统稳定。

4. 柔性事务(最终一致性)

  • 官方描述:在分布式系统中,通过消息队列保证多个服务的数据最终一致,而不依赖强一致性事务。
  • 通俗解释:支付成功后,需要更新订单状态、增加积分、出库。这些操作可能分布在多个系统中。支付成功后发送一条“支付成功”消息,各个系统消费后各自更新本地数据,最终所有系统状态一致。

三、AMQP 与 JMS

对比项 AMQP JMS
全称 Advanced Message Queuing Protocol Java Message Service
性质 协议(定义了网络层的数据格式) API 规范(仅 Java 领域)
跨语言 是,任何语言只要实现该协议就能通信 否,只适用于 Java 应用
消息模型 提供五种交换机模型,非常灵活 只有点对点(Queue)和发布订阅(Topic)
消息类型 二进制(支持任意格式) 支持 Java 对象、文本、字节等
典型实现 RabbitMQ(最流行)、ActiveMQ(支持 AMQP) ActiveMQ、HornetQ

通俗总结:AMQP 是通用语言,不同语言写的系统都能用 RabbitMQ 通信;JMS 是Java 内部的规范,只能 Java 应用之间用。


四、同类产品对比

特性 RabbitMQ Kafka RocketMQ ActiveMQ
单机吞吐量 万级 百万级 十万级 万级
消息延迟 微秒级 毫秒级 毫秒级 毫秒级
可靠性 高(配置灵活) 极高(分布式持久化) 高(金融级)
协议支持 AMQP, MQTT, STOMP 自定义协议 自定义协议 JMS, AMQP, MQTT
社区活跃度 极高 极高
适用场景 企业级应用、实时性要求高 日志收集、流式计算、大数据 业务交易、金融场景 传统 Java 项目

选型建议

  • 一般业务系统(订单、消息通知)→ RabbitMQ
  • 海量日志收集、用户行为追踪 → Kafka
  • 金融级、高可靠、需事务支持 → RocketMQ

五、RabbitMQ 核心概念

RabbitMQ架构

1. Broker(消息代理)

  • 官方:RabbitMQ 服务器实例,负责接收、存储、转发消息。
  • 通俗:就是整个邮局大楼。

2. Virtual Host(虚拟主机)

  • 官方:一个逻辑隔离的命名空间,内含独立的交换机、队列、绑定关系,类似数据库中的 database。
  • 通俗:邮局大楼里的不同楼层,1 楼处理个人信件,2 楼处理商业包裹,互不干扰。
  • 权限管理-账户管理-靠虚拟主机来完成的。

3. Connection(连接)

  • 官方:客户端与 Broker 之间的 TCP 长连接。
  • 通俗:从你家到邮局的专用通道。

4. Channel(信道)

  • 官方:在 Connection 内创建的虚拟通道,多数 AMQP 命令都在 Channel 上执行,一个连接可以创建多个 Channel。
  • 通俗:通道上的多个窗口,每个窗口可以独立办理业务,不用重新排队。

5. Exchange(交换机)

  • 官方:接收生产者发送的消息,并根据路由规则将消息分发到一个或多个队列。
  • 通俗:邮局的分拣中心,根据信封上的地址决定把信投到哪个信箱。

6. Queue(队列)

  • 官方:存储消息的容器,消息在队列中等待被消费者取走,具有 FIFO 特性。
  • 通俗:你家门口的信箱,信件到了就放里面,等你回家取。

7. Binding(绑定)

  • 官方:交换机与队列之间的关联关系,定义了如何将消息从交换机路由到队列。
  • 通俗:分拣规则,比如“所有地址带‘北京’的信,都投到 1 号信箱”。

8. Routing Key(路由键)

  • 官方:生产者发送消息时附带的一个字符串,供交换机判断消息应该发给哪些队列。
  • 通俗:信封上的地址。

9. Binding Key(绑定键)

  • 官方:在绑定时设置的字符串,与 Routing Key 进行匹配(匹配规则取决于交换机类型)。
  • 通俗:信箱上写的收件规则,比如“只收挂号信”。

六、RabbitMq 创建交换机和队列

  • 管理界面:重点关注交换机和队列

创建队列

RabbitMq-创建队列.png

  • 队列名称:lqb.queue.simple项目简写.队列.简单模式
  • 持久化:Durable,D表示支持持久化
  • 属性参数:队列超时时间、队列参数等

创建广播交换机(fanout)

1. 创建交换机,执行类型和持久化。

创建广播类型fanout交换机

  • 定义交换机名称
  • 交换机类型选择:广播
  • 消息持久化:持久化
  • 是否自动删除交换机,否NO
  • 是否是内部交换机,不是NO。(内部交换机是Rabbitmq自己要用的交换机)
2. 新增两个消息队列

创建两个消息队列与广播交换机进行绑定

3. 交换机绑定消息队列
  • 点击交换机名字
    广播交换机与消息队列绑定

  • 广播类型的交换机不需要写路由名称

创建路由交换机(direct)

路由交换机

1. 创建路由交换机,类型选择为direct

在这里插入图片描述

2. 绑定路由交换机

在这里插入图片描述

  • 交换机绑定消息队列时,必须指定路由名称。

创建主题交换机(tipic)

在这里插入图片描述

1. 创建主题交换机

在这里插入图片描述

2. 创建消息队列

在这里插入图片描述

3. 绑定消息队列
  • 消息队列lqb.queue.topic01:绑定路由key:*.a.*

  • 消息队列lqb.queue.topic02:绑定路由key:*.*.b

  • 消息队列lqb.queue.topic02:绑定路由key:c.#

  • 虽然消息队列2绑定了两个路由key,如果一条路由keyc.*.b,同时匹配了两条路由规则,实际只有一条数据进入消息队列。
    在这里插入图片描述

七、5 种核心消息模式详解

在 RabbitMQ 的体系中,根据消息的流转目标(是一个人收到还是多个人收到),可以将这 5 种模式清晰地划分为 “点对点 (Point-to-Point)”“一对多 (Publish/Subscribe)” 两大类。

消息模式分类

点对点模式 (Point-to-Point)

核心定义:发送者发送的每一条消息,最终有且仅有一个消费者能收到并处理。

  • 简单模式和工作模式,不需要创建交换机,使用RabbitMQ默认创建的交换机
  1. 简单模式 (Simple)
  • 结构P -> Q -> C
  • 场景:最基础的单发单收。
  • 逻辑:生产者将消息放入队列,消费者从队列监听。一旦消息被消费,立即从队列删除。
  1. 工作队列模式 (Work Queues)
  • 结构P -> Q -> 多C
  • 场景:任务分发。当生产速度大于单个消费者的处理速度时使用。
  • 逻辑
    • 竞争关系:多个消费者共同监听同一个队列,但一条消息只会被其中一个消费者抢到。
    • 能者多劳:配合 prefetchCount=1,处理快的机器会承担更多任务。
    • 默认公平:如果不配置负载均衡,RabbitMQ 会采用轮询(Round-Robin)平均分配。

一对多模式 (Publish/Subscribe)

核心定义:发送者发送的一条消息,可以被多个不同的消费者同时收到(每个消费者拥有自己的独立队列)。这个分类下必须使用 Exchange(交换机)

  1. 广播模式 (Fanout)
  • 交换机amq.fanout
  • 逻辑:不看路由键。生产者发给交换机,交换机瞬间复制 N 份,转发给所有绑定到它的队列。
  • 场景:全量通知(如:系统广播、实时天气更新)。
  1. 路由模式 (Direct)
  • 交换机amq.direct
  • 逻辑精确匹配。队列绑定时指定一个 RoutingKey。只有消息的 Key 与队列的 Key 完全一致时,消息才进入该队列。
  • 场景:错误日志分级。例如:Error 队列只收 error 消息,而 Log 队列收 info/warn/error 所有消息。
  1. 主题模式 (Topic)
  • 交换机amq.topic
  • 逻辑模糊匹配(路由模式的加强版)。
    • *:匹配一个单词(必须有一个)。
    • #:匹配零个或多个单词。
  • 场景:按类别订阅。例如:stock.us.# 订阅所有美国股票信息,#.news 订阅所有分类的新闻。

特性 点对点 (Simple/Work) 一对多 (Fanout/Direct/Topic)
队列数量 通常只有一个队列 每个消费者/业务方拥有独立队列
消息副本 只有 1 份消息 消息会被 Exchange 复制多份分发
消费者关系 竞争关系(你拿我就没) 协作/独立关系(人手一份)
路由中介 默认交换机 (隐式) 显式声明 Exchange
应用目标 提高任务处理效率 实现系统解耦、多方通知

问:为什么“路由模式”绑定多个名字后感觉像广播?
答: 广播是“强制性”的,只要连上就得听;而路由是“选择性”的。虽然一个队列可以绑定多个 Key(比如 A 队列绑定了 orderpay),但它依然只拿它感兴趣的内容。

  • 广播:像村里的大喇叭,所有人听的内容都一样。
  • 路由/主题:像订阅报纸,你可以订《晨报》和《晚报》,别人订《体育报》,大家互不干扰,各取所需。

1. 简单模式(Simple)

  • 模型:生产者 → 队列 → 单个消费者
  • 官方:最简单的点对点通信,一个队列只被一个消费者监听。
  • 通俗:你写一封信,直接投到收信人的私人信箱,只有他本人能取。
配套配置类(声明队列)
@Configuration
public class SimpleConfig {

    /**
     * 声明一个队列
     * @return Queue 对象
     * 
     * 构造参数详解:
     * - String name: 队列名称,必须唯一
     * - boolean durable: 是否持久化(true:重启后队列依然存在,默认false)
     * - boolean exclusive: 是否排他(true:只被当前连接使用,连接关闭后队列自动删除,默认false)
     * - boolean autoDelete: 是否自动删除(true:最后一个消费者断开后自动删除,默认false)
     */
    @Bean
    public Queue simpleQueue() {
        return new Queue("queue.simple", true); // 持久化队列
    }
}
生产者完整类
@Component
public class SimpleSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送简单消息
     * 
     * convertAndSend 方法参数:
     * - String exchange: 交换机名称,空串表示默认交换机(AMQP default)
     * - String routingKey: 路由键,在默认交换机下,routingKey 就是队列名称
     * - Object message: 消息内容,会自动序列化
     */
    public void send() {
        String message = "Hello Simple!";
        rabbitTemplate.convertAndSend("", "queue.simple", message);
        System.out.println("已发送: " + message);
    }
}
消费者完整类
package com.demo.consumer;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SimpleReceiver {

    /**
     * 监听指定队列
     * @RabbitListener 注解属性:
     * - queues: 指定要监听的队列名称(可多个)
     * 方法参数自动注入消息体(默认反序列化为String)
     */
    @RabbitListener(queues = "queue.simple")
    public void receive(String message) {
        System.out.println("收到消息: " + message);
    }
}

2. 工作队列模式(Worker)

模型:生产者 → 队列 → 多个消费者(竞争消费)

  • 官方:多个消费者监听同一个队列,每条消息只被一个消费者处理,默认轮询分发。
  • 通俗:一个信箱前站了好几个邮递员,信到了只有一个邮递员拿走,其他人拿不到。
配置类(声明队列)
@Configuration
public class WorkerConfig {

    @Bean
    public Queue workQueue() {
        // 参数含义同简单模式
        return new Queue("queue.work", true);
    }
}
生产者完整类
@Component
public class WorkerSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送多个工作消息
     * @param count 发送数量
     */
    public void send(int count) {
        for (int i = 1; i <= count; i++) {
            String message = "任务 " + i;
            // 使用默认交换机,routingKey 为队列名
            rabbitTemplate.convertAndSend("queue.work", message);
        }
    }
}
两个消费者(竞争消费)
@Component
public class WorkerReceiver1 {

    /**
     * 监听工作队列
     * 默认轮询分发,每条消息只被一个消费者处理
     */
    @RabbitListener(queues = "queue.work")
    public void receive(String message) throws InterruptedException {
        System.out.println("Worker1 开始处理: " + message);
        Thread.sleep(1000); // 模拟耗时
        System.out.println("Worker1 完成: " + message);
    }
}
@Component
public class WorkerReceiver2 {

    @RabbitListener(queues = "queue.work")
    public void receive(String message) throws InterruptedException {
        System.out.println("Worker2 开始处理: " + message);
        Thread.sleep(500); // 模拟耗时较短
        System.out.println("Worker2 完成: " + message);
    }
}

3. 广播模式(Fanout)

  • 模型:生产者 → Fanout Exchange → 多个队列 → 多个消费者
  • 官方:交换机将收到的消息广播给所有绑定的队列,忽略 Routing Key。
  • 通俗:广播站大喇叭喊话,所有装了喇叭的屋子都能听到,不管门牌号。
完整配置类(声明交换机、队列、绑定)
@Configuration
public class FanoutConfig {

    /**
     * 1. 定义 Fanout 交换机
     * 
     * 交换机构造函数参数:
     * - String name: 交换机名称
     * - boolean durable: 是否持久化(true:重启后交换机依然存在,默认false)
     * - boolean autoDelete: 是否自动删除(true:最后一个队列解绑后自动删除,默认false)
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("exchange.fanout", true, false);
    }

    /**
     * 2. 定义两个队列
     * 参数含义同前
     */
    @Bean
    public Queue queueA() {
        return new Queue("queue.fanout.a", true);
    }

    @Bean
    public Queue queueB() {
        return new Queue("queue.fanout.b", true);
    }

    /**
     * 3. 绑定队列到交换机(Fanout 不需要 routing key)
     * 
     * BindingBuilder 用法:
     * - bind(queue).to(exchange) 返回一个无路由键的绑定
     * - 也可用 with(routingKey) 指定路由键,但 Fanout 会忽略
     */
    @Bean
    public Binding bindingA(Queue queueA, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queueA).to(fanoutExchange);
    }

    @Bean
    public Binding bindingB(Queue queueB, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queueB).to(fanoutExchange);
    }
}
生产者完整类
@Component
public class FanoutSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送广播消息
     * @param message 消息内容
     * 
     * 由于 Fanout 忽略 routing key,通常传空串
     */
    public void send(String message) {
        rabbitTemplate.convertAndSend("exchange.fanout", "", message);
        System.out.println("广播消息已发送: " + message);
    }
}
消费者A(监听 queue.fanout.a)
package com.demo.consumer;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class FanoutReceiverA {

    @RabbitListener(queues = "queue.fanout.a")
    public void receive(String message) {
        System.out.println("队列A 收到广播: " + message);
    }
}
消费者B(监听 queue.fanout.b)
package com.demo.consumer;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class FanoutReceiverB {

    @RabbitListener(queues = "queue.fanout.b")
    public void receive(String message) {
        System.out.println("队列B 收到广播: " + message);
    }
}

4. 路由模式(Direct)

  • 模型:生产者 → Direct Exchange → 根据 Routing Key 精确匹配 → 特定队列
  • 官方:消息的 Routing Key 必须与队列绑定的 Binding Key 完全相等,才能路由到该队列。
  • 通俗:挂号信,信封上写了“李四收”,只有名字叫“李四”的信箱才能收到。
完整配置类
@Configuration
public class DirectConfig {

    @Bean
    public DirectExchange directExchange() {
        // 持久化交换机
        return new DirectExchange("exchange.direct", true, false);
    }

    @Bean
    public Queue errorQueue() {
        return new Queue("queue.direct.error", true);
    }

    @Bean
    public Queue infoQueue() {
        return new Queue("queue.direct.info", true);
    }

    /**
     * 绑定 error 队列,使用 routing key "error"
     * with(routingKey) 指定绑定的路由键
     */
    @Bean
    public Binding bindingError(Queue errorQueue, DirectExchange directExchange) {
        return BindingBuilder.bind(errorQueue).to(directExchange).with("error");
    }

    @Bean
    public Binding bindingInfo(Queue infoQueue, DirectExchange directExchange) {
        return BindingBuilder.bind(infoQueue).to(directExchange).with("info");
    }
}
生产者完整类
@Component
public class DirectSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送错误日志
     * @param message 日志内容
     */
    public void sendError(String message) {
        rabbitTemplate.convertAndSend("exchange.direct", "error", "[错误] " + message);
    }

    /**
     * 发送普通日志
     */
    public void sendInfo(String message) {
        rabbitTemplate.convertAndSend("exchange.direct", "info", "[信息] " + message);
    }
}
消费者(错误队列)
@Component
public class DirectErrorReceiver {

    @RabbitListener(queues = "queue.direct.error")
    public void receive(String message) {
        System.out.println("错误日志处理器: " + message);
    }
}
消费者(信息队列)
package com.demo.consumer;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class DirectInfoReceiver {

    @RabbitListener(queues = "queue.direct.info")
    public void receive(String message) {
        System.out.println("普通日志处理器: " + message);
    }
}

5. 主题模式(Topic)

  • 模型:生产者 → Topic Exchange → 根据 Routing Key 模式匹配 → 队列
  • 官方:Routing Key 由点号分隔的单词组成(如 log.error.app),绑定键可使用通配符:
    • *:匹配一个单词
    • #:匹配零个或多个单词
  • 通俗:智能分拣,比如“所有北京发出的信件”走一个通道,无论收件人是谁。
完整配置类
@Configuration
public class TopicConfig {

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("exchange.topic", true, false);
    }

    @Bean
    public Queue orderQueue() {
        return new Queue("queue.topic.order", true);
    }

    @Bean
    public Queue paymentQueue() {
        return new Queue("queue.topic.payment", true);
    }

    /**
     * 绑定 order 队列,匹配所有以 "order." 开头的 routing key
     * 通配符:* 匹配一个单词,如 order.created, order.paid
     */
    @Bean
    public Binding orderBinding(Queue orderQueue, TopicExchange topicExchange) {
        return BindingBuilder.bind(orderQueue).to(topicExchange).with("order.*");
    }

    /**
     * 绑定 payment 队列,匹配所有以 "payment." 开头的 routing key(可多层)
     * 通配符:# 匹配零个或多个单词,如 payment.success, payment.success.alipay
     */
    @Bean
    public Binding paymentBinding(Queue paymentQueue, TopicExchange topicExchange) {
        return BindingBuilder.bind(paymentQueue).to(topicExchange).with("payment.#");
    }
}
生产者完整类
@Component
public class TopicSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendOrderCreated() {
        rabbitTemplate.convertAndSend("exchange.topic", "order.created", "订单已创建");
    }

    public void sendOrderPaid() {
        rabbitTemplate.convertAndSend("exchange.topic", "order.paid", "订单已支付");
    }

    public void sendPaymentSuccess() {
        rabbitTemplate.convertAndSend("exchange.topic", "payment.success", "支付成功");
    }

    public void sendPaymentFailed() {
        rabbitTemplate.convertAndSend("exchange.topic", "payment.failed", "支付失败");
    }
}
消费者(订单队列)
@Component
public class TopicOrderReceiver {

    @RabbitListener(queues = "queue.topic.order")
    public void receive(String message) {
        System.out.println("订单处理器收到: " + message);
    }
}
消费者(支付队列)
@Component
public class TopicPaymentReceiver {

    @RabbitListener(queues = "queue.topic.payment")
    public void receive(String message) {
        System.out.println("支付处理器收到: " + message);
    }
}

八、RabbitMQ 内置交换机说明

RabbitMQ 默认创建了几个交换机,你可以在管理界面看到(名称以 amq. 开头):

交换机名称 类型 作用
(AMQP default) direct 默认交换机,空 routing key 表示直接发给同名队列
amq.direct direct 直连交换机(精确匹配)
amq.fanout fanout 广播交换机
amq.topic topic 主题交换机
amq.headers headers 头交换机(根据消息头属性匹配)
amq.match headers amq.headers 的别名
amq.rabbitmq.log topic 内部日志交换机(Internal)
amq.rabbitmq.trace topic 消息追踪交换机(Internal)

注意:内置交换机一般仅供系统使用,生产环境中通常自己定义交换机。


九、消息监听器容器类型

容器类型 描述 特点
SimpleMessageListenerContainer 传统容器,每个消费者一个线程,共享 Channel 支持事务、批量确认,配置灵活
DirectMessageListenerContainer 较新的容器,每个消费者独占一个 Channel,直接在 RabbitMQ 客户端线程上调用监听器 更轻量,高吞吐,不支持事务

配置方式application.yml):

  • 新容器:
spring:
  rabbitmq:
    listener:
      type: direct   # 可选 simple 或 direct,默认 simple
      # 新的容器,二选一
      direct:
        acknowledge-mode: manual
  • 传统容器
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual
        concurrency: 3   # 并发消费者数
        max-concurrency: 10

十、代码创建交换机和队列的两种方式(详细注释)

方式一:注解方式(在 @RabbitListener 中声明)

package com.demo.consumer;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class AnnotationListener {

    /**
     * 使用 @RabbitListener 的 bindings 属性动态声明交换机、队列和绑定
     * 
     * @Queue 注解属性:
     * - name: 队列名称
     * - durable: 是否持久化(默认 "",实际解析为 false,可填 "true"/"false")
     * - exclusive: 是否排他(默认 "",false)
     * - autoDelete: 是否自动删除(默认 "",false)
     * 
     * @Exchange 注解属性:
     * - name: 交换机名称
     * - type: 交换机类型(direct/fanout/topic/headers,默认 direct)
     * - durable: 是否持久化(默认 "",false)
     * - autoDelete: 是否自动删除(默认 "",false)
     * - internal: 是否内部交换机(默认 "",false)
     * 
     * @QueueBinding 属性:
     * - value: 队列(@Queue)
     * - exchange: 交换机(@Exchange)
     * - key: 绑定路由键(可多个)
     */
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(name = "queue.annotation", durable = "true"),
                    exchange = @Exchange(name = "exchange.annotation", type = "direct", durable = "true"),
                    key = {"routing.key"}
            )
    )
    public void handleMessage(String msg, Message message, Channel channel) throws Exception {
        long tag = message.getMessageProperties().getDeliveryTag();
        try {
            log.info("注解方式收到消息: {}", msg);
            channel.basicAck(tag, false);
        } catch (Exception e) {
            channel.basicNack(tag, false, true);
        }
    }
}

方式二:配置类方式(推荐生产环境)

package com.demo.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AppConfig {

    /**
     * 声明交换机
     * 
     * ExchangeBuilder 提供链式调用:
     * - directExchange(name): 创建 Direct 交换机构建器
     * - durable(boolean): 设置持久化
     * - autoDelete(boolean): 设置自动删除
     * - internal(boolean): 设置内部交换机
     */
    @Bean
    public DirectExchange myExchange() {
        return ExchangeBuilder.directExchange("exchange.my")
                .durable(true)      // 持久化
                .autoDelete(false)  // 不自动删除
                .build();
    }

    /**
     * 声明队列
     * 
     * QueueBuilder 提供链式调用:
     * - durable(name): 持久化队列
     * - exclusive(): 排他队列
     * - autoDelete(): 自动删除
     * - ttl(long): 消息 TTL(毫秒)
     * - maxLength(int): 队列最大长度
     * - deadLetterExchange(String): 死信交换机
     * - deadLetterRoutingKey(String): 死信路由键
     */
    @Bean
    public Queue myQueue() {
        return QueueBuilder.durable("queue.my")
                .ttl(60000)          // 消息存活时间 60秒
                .maxLength(1000)      // 最多存储 1000 条消息
                .build();
    }

    /**
     * 绑定队列到交换机
     * 
     * BindingBuilder 用法:
     * - bind(queue).to(exchange).with(routingKey)
     */
    @Bean
    public Binding myBinding(Queue myQueue, DirectExchange myExchange) {
        return BindingBuilder.bind(myQueue).to(myExchange).with("my.key");
    }
}

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐