1. RabbitMQ核心概念、环境搭建

一、RabbitMQ核心概念

1. 基本概念
  • RabbitMQ :基于AMQP协议的开源消息代理软件
  • AMQP :高级消息队列协议,定义了消息中间件的统一标准
2. 核心组件
生产者 (Producer)
  • 发送消息的应用程序
  • 连接到RabbitMQ服务器,将消息发布到交换机
消费者 (Consumer)
  • 接收消息的应用程序
  • 订阅队列并处理消息
交换机 (Exchange)

接收生产者发送的消息,并根据路由规则将消息分发到队列

四种类型:

  • Direct Exchange :精确匹配routing key
  • Fanout Exchange :广播到所有绑定队列
  • Topic Exchange :模式匹配routing key
  • Headers Exchange :基于消息头匹配
队列 (Queue)
  • 存储消息的缓冲区
  • 消费者从队列获取消息
绑定 (Binding)
  • 连接交换机和队列的规则
  • 包含路由键(routing key)信息
路由键 (Routing Key)
  • 生产者发送消息时指定的键
  • 交换机根据此键决定消息路由
虚拟主机 (Virtual Host)
  • 逻辑隔离,类似命名空间
  • 每个vhost有独立的交换机、队列和权限
3. 消息确认机制
  • 消费者确认 (ACK) :消费者处理成功后确认
  • 生产者确认 :消息成功到达交换机或队列的确认
  • 持久化 :消息和队列的持久化存储

二、环境搭建

1. Docker方式(推荐)
# 拉取RabbitMQ镜像(带管理界面)
docker pull rabbitmq:3-management

# 运行容器
docker run -d \
  --name rabbitmq \
  -p 5672:5672 \      # AMQP协议端口
  -p 15672:15672 \    # 管理界面端口
  -e RABBITMQ_DEFAULT_USER=admin \
  -e RABBITMQ_DEFAULT_PASS=admin123 \
  rabbitmq:3-management
2. 本地安装(Ubuntu/Debian)
# 安装Erlang(RabbitMQ依赖)
sudo apt-get update
sudo apt-get install -y erlang

# 安装RabbitMQ
sudo apt-get install -y rabbitmq-server

# 启动服务
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server

# 启用管理插件
sudo rabbitmq-plugins enable rabbitmq_management

# 创建管理员用户
sudo rabbitmqctl add_user admin admin123
sudo rabbitmqctl set_user_tags admin administrator
sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
3. Windows安装
  1. 安装Erlang:从官网下载安装包
  2. 安装RabbitMQ:从官网下载安装包
  3. 启用管理插件:
rabbitmq-plugins enable rabbitmq_management
  1. 启动服务:
net start RabbitMQ
4. 基本配置
配置文件位置
  • Linux: /etc/rabbitmq/rabbitmq.conf
  • Docker: 挂载配置文件到容器
示例配置文件
# 监听端口
listeners.tcp.default = 5672

# 管理界面
management.tcp.port = 15672

# 默认虚拟主机
default_vhost = /

# 默认用户(仅开发环境)
default_user = guest
default_pass = guest

# 持久化设置
queue_master_locator = min-masters
5. 验证安装

访问管理界面:​​http://localhost:15672​​

  • 用户名:admin
  • 密码:admin123
6. 常用管理命令
# 查看状态
sudo rabbitmqctl status

# 查看用户
sudo rabbitmqctl list_users

# 查看队列
sudo rabbitmqctl list_queues

# 查看交换机
sudo rabbitmqctl list_exchanges

# 查看绑定
sudo rabbitmqctl list_bindings

三、生产环境建议

  1. 集群部署 :至少3个节点保证高可用
  2. 持久化设置 :消息和队列都启用持久化
  3. 监控告警 :使用Prometheus + Grafana监控
  4. 资源限制 :设置内存和磁盘使用限制
  5. SSL/TLS :启用加密传输
  6. 权限控制 :严格管理用户权限

这样你就有了一个完整的RabbitMQ环境,可以开始进行消息队列的开发了!

2. SpringBoot整合RabbitMQ、交换机/队列配置

添加依赖
<!-- pom.xml -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
基础配置
# application.yml
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    
    # 生产者确认机制
    publisher-confirm-type: correlated
    publisher-returns: true
    
    # 消费者配置
    listener:
      simple:
        acknowledge-mode: manual  # 手动ACK
        prefetch: 10  # 每次处理10条消息
        retry:
          enabled: true  # 开启重试
          max-attempts: 3  # 最大重试次数
交换机/队列配置类
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    
    // ==================== 直连交换机配置 ====================
    
    /**
     * 直连交换机
     */
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("direct.exchange", true, false);
    }
    
    /**
     * 直连队列
     */
    @Bean
    public Queue directQueue() {
        return QueueBuilder.durable("direct.queue")
                .withArgument("x-dead-letter-exchange", "dlx.exchange")  // 死信交换机
                .withArgument("x-dead-letter-routing-key", "dlx.routing.key")
                .withArgument("x-message-ttl", 10000)  // TTL 10秒
                .build();
    }
    
    /**
     * 绑定直连队列到交换机
     */
    @Bean
    public Binding directBinding() {
        return BindingBuilder.bind(directQueue())
                .to(directExchange())
                .with("direct.routing.key");
    }
    
    // ==================== 主题交换机配置 ====================
    
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("topic.exchange", true, false);
    }
    
    @Bean
    public Queue topicQueue1() {
        return QueueBuilder.durable("topic.queue1").build();
    }
    
    @Bean
    public Queue topicQueue2() {
        return QueueBuilder.durable("topic.queue2").build();
    }
    
    @Bean
    public Binding topicBinding1() {
        return BindingBuilder.bind(topicQueue1())
                .to(topicExchange())
                .with("topic.*.key");
    }
    
    @Bean
    public Binding topicBinding2() {
        return BindingBuilder.bind(topicQueue2())
                .to(topicExchange())
                .with("topic.#");
    }
    
    // ==================== 扇形交换机配置 ====================
    
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout.exchange", true, false);
    }
    
    @Bean
    public Queue fanoutQueue1() {
        return QueueBuilder.durable("fanout.queue1").build();
    }
    
    @Bean
    public Queue fanoutQueue2() {
        return QueueBuilder.durable("fanout.queue2").build();
    }
    
    @Bean
    public Binding fanoutBinding1() {
        return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
    }
    
    @Bean
    public Binding fanoutBinding2() {
        return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
    }
    
    // ==================== 死信队列配置 ====================
    
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange("dlx.exchange", true, false);
    }
    
    @Bean
    public Queue dlxQueue() {
        return QueueBuilder.durable("dlx.queue").build();
    }
    
    @Bean
    public Binding dlxBinding() {
        return BindingBuilder.bind(dlxQueue())
                .to(dlxExchange())
                .with("dlx.routing.key");
    }
    
    // ==================== 延迟队列配置 ====================
    
    @Bean
    public CustomExchange delayedExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange("delayed.exchange", "x-delayed-message", true, false, args);
    }
    
    @Bean
    public Queue delayedQueue() {
        return QueueBuilder.durable("delayed.queue").build();
    }
    
    @Bean
    public Binding delayedBinding() {
        return BindingBuilder.bind(delayedQueue())
                .to(delayedExchange())
                .with("delayed.routing.key")
                .noargs();
    }
}
生产者配置
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MessageProducer {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    /**
     * 发送消息到直连交换机
     */
    public void sendDirectMessage(Object message) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        
        rabbitTemplate.convertAndSend(
            "direct.exchange",
            "direct.routing.key",
            message,
            msg -> {
                // 设置消息属性
                msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                msg.getMessageProperties().setContentType("application/json");
                msg.getMessageProperties().setMessageId(UUID.randomUUID().toString());
                return msg;
            },
            correlationData
        );
    }
    
    /**
     * 发送延迟消息
     */
    public void sendDelayedMessage(Object message, int delayMillis) {
        rabbitTemplate.convertAndSend(
            "delayed.exchange",
            "delayed.routing.key",
            message,
            msg -> {
                msg.getMessageProperties().setHeader("x-delay", delayMillis);
                return msg;
            }
        );
    }
    
    /**
     * 发送到主题交换机
     */
    public void sendTopicMessage(String routingKey, Object message) {
        rabbitTemplate.convertAndSend("topic.exchange", routingKey, message);
    }
    
    /**
     * 发送到扇形交换机
     */
    public void sendFanoutMessage(Object message) {
        rabbitTemplate.convertAndSend("fanout.exchange", "", message);
    }
}
消费者配置
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;

@Component
public class MessageConsumer {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    /**
     * 监听直连队列 - 手动ACK
     */
    @RabbitListener(queues = "direct.queue")
    public void handleDirectMessage(Message message, Channel channel) throws IOException {
        try {
            String msg = new String(message.getBody());
            System.out.println("收到消息: " + msg);
            
            // 业务处理逻辑
            processMessage(msg);
            
            // 手动确认消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            
        } catch (Exception e) {
            // 处理失败,拒绝消息
            channel.basicNack(
                message.getMessageProperties().getDeliveryTag(),
                false,
                true  // 重新入队
            );
        }
    }
    
    /**
     * 监听主题队列1
     */
    @RabbitListener(queues = "topic.queue1")
    public void handleTopicMessage1(String message) {
        System.out.println("Topic Queue1 收到: " + message);
    }
    
    /**
     * 监听死信队列
     */
    @RabbitListener(queues = "dlx.queue")
    public void handleDlxMessage(String message) {
        System.out.println("死信队列收到: " + message);
        // 处理死信消息
    }
    
    /**
     * 监听延迟队列
     */
    @RabbitListener(queues = "delayed.queue")
    public void handleDelayedMessage(String message) {
        System.out.println("延迟消息到达: " + message);
    }
    
    private void processMessage(String message) {
        // 业务处理逻辑
    }
}
确认回调配置
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;

@Configuration
public class RabbitConfirmConfig {
    
    @Resource
    private RabbitTemplate rabbitTemplate;
    
    @PostConstruct
    public void init() {
        // 消息发送到交换机的确认回调
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack) {
                    System.out.println("消息发送到交换机成功: " + correlationData);
                } else {
                    System.out.println("消息发送到交换机失败: " + cause);
                    // 消息补偿机制
                }
            }
        });
        
        // 消息从交换机路由到队列的失败回调
        rabbitTemplate.setReturnsCallback(returned -> {
            System.out.println("消息路由到队列失败: " + returned.getMessage());
            System.out.println("交换机: " + returned.getExchange());
            System.out.println("路由键: " + returned.getRoutingKey());
            // 处理路由失败的消息
        });
    }
}
工具类
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.stereotype.Component;

@Component
public class RabbitMQUtils {
    
    /**
     * 构建消息
     */
    public static Message buildMessage(Object object, MessageProperties properties) {
        MessageConverter converter = new SimpleMessageConverter();
        return converter.toMessage(object, properties);
    }
    
    /**
     * 解析消息
     */
    public static Object parseMessage(Message message) {
        MessageConverter converter = new SimpleMessageConverter();
        return converter.fromMessage(message);
    }
}
高级配置
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQAdvancedConfig {
    
    /**
     * JSON消息转换器
     */
    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    
    /**
     * 自定义监听容器工厂
     */
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            ConnectionFactory connectionFactory,
            Jackson2JsonMessageConverter converter) {
        
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(converter);
        factory.setConcurrentConsumers(3);  // 最小消费者数量
        factory.setMaxConcurrentConsumers(10);  // 最大消费者数量
        factory.setPrefetchCount(5);  // 每次预取消息数量
        factory.setDefaultRequeueRejected(false);  // 拒绝时不重新入队
        return factory;
    }
}
测试类
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class RabbitMQTest {
    
    @Autowired
    private MessageProducer producer;
    
    @Test
    void testDirectExchange() {
        producer.sendDirectMessage("Hello RabbitMQ!");
    }
    
    @Test
    void testDelayedMessage() {
        producer.sendDelayedMessage("Delayed Message", 5000);  // 5秒延迟
    }
    
    @Test
    void testTopicExchange() {
        producer.sendTopicMessage("topic.order.key", "Order Message");
        producer.sendTopicMessage("topic.user.key", "User Message");
    }
}
配置说明
交换机类型:
  • DirectExchange : 直连交换机,完全匹配 routingKey
  • TopicExchange : 主题交换机,支持通配符匹配
  • FanoutExchange : 扇形交换机,广播到所有绑定的队列
  • CustomExchange : 自定义交换机,用于延迟队列
队列参数:
  • ​durable​​: 是否持久化
  • ​exclusive​​: 是否排他
  • ​autoDelete​​: 自动删除
  • ​x-dead-letter-exchange​​: 死信交换机
  • ​x-message-ttl​​: 消息存活时间
  • ​x-max-length​​: 队列最大长度
消息确认机制:
  • 自动ACK : 消息消费后自动确认
  • 手动ACK : 需要显式调用 basicAck/basicNack
  • 发布确认 : 生产者确认消息到达交换机
注意事项:
  1. 生产环境建议开启消息持久化
  2. 合理设置消息TTL,避免消息堆积
  3. 使用手动ACK确保消息可靠消费
  4. 配置死信队列处理异常消息
  5. 监控队列长度,防止内存溢出
  6. 使用JSON序列化提高可读性

这个配置提供了完整的RabbitMQ集成方案,包括多种交换机类型、消息确认机制、延迟队列、死信队列等高级功能。

3. 五种消息模型实战

RabbitMQ 提供了多种消息模型,以下是五种核心模型的 Java 实现示例。

环境准备

Maven 依赖
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.16.0</version>
</dependency>
连接工具类
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMQUtil {
    private static final String HOST = "localhost";
    private static final int PORT = 5672;
    private static final String USERNAME = "guest";
    private static final String PASSWORD = "guest";
    private static final String VIRTUAL_HOST = "/";
    
    public static Connection getConnection() throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        factory.setVirtualHost(VIRTUAL_HOST);
        return factory.newConnection();
    }
}

简单队列模型 (Hello World)

最简单的模型,一个生产者对应一个消费者。

生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class SimpleProducer {
    private static final String QUEUE_NAME = "simple_queue";
    
    public static void main(String[] args) throws Exception {
        // 1. 获取连接
        Connection connection = RabbitMQUtil.getConnection();
        // 2. 创建通道
        Channel channel = connection.createChannel();
        
        // 3. 声明队列
        // 参数:queue - 队列名称
        //      durable - 是否持久化
        //      exclusive - 是否独占(只能被一个连接使用)
        //      autoDelete - 是否自动删除(没有消费者时自动删除)
        //      arguments - 其他参数
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        // 4. 发送消息
        String message = "Hello RabbitMQ!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent: '" + message + "'");
        
        // 5. 关闭资源
        channel.close();
        connection.close();
    }
}
消费者
import com.rabbitmq.client.*;

public class SimpleConsumer {
    private static final String QUEUE_NAME = "simple_queue";
    
    public static void main(String[] args) throws Exception {
        // 1. 获取连接
        Connection connection = RabbitMQUtil.getConnection();
        // 2. 创建通道
        Channel channel = connection.createChannel();
        
        // 3. 声明队列(确保队列存在)
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        
        // 4. 创建消费者回调
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received: '" + message + "'");
        };
        
        // 5. 监听队列
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
        
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    }
}

工作队列模型 (Work Queues)

一个生产者,多个消费者,消息被轮询分发。

生产者
public class WorkQueueProducer {
    private static final String QUEUE_NAME = "work_queue";
    
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        
        // 发送10条消息
        for (int i = 1; i <= 10; i++) {
            String message = "Task " + i;
            // 设置消息持久化
            AMQP.BasicProperties props = MessageProperties.PERSISTENT_TEXT_PLAIN;
            channel.basicPublish("", QUEUE_NAME, props, message.getBytes());
            System.out.println(" [x] Sent: '" + message + "'");
            Thread.sleep(500); // 模拟任务产生间隔
        }
        
        channel.close();
        connection.close();
    }
}
消费者1(慢速处理)
public class WorkQueueConsumer1 {
    private static final String QUEUE_NAME = "work_queue";
    
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        
        // 设置预取数量为1,实现公平分发
        channel.basicQos(1);
        
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [Consumer1] Received: '" + message + "'");
            
            try {
                // 模拟耗时任务
                Thread.sleep(2000);
                System.out.println(" [Consumer1] Done: '" + message + "'");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 手动确认消息
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        
        // 关闭自动确认,改为手动确认
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {});
        
        System.out.println(" [Consumer1] Waiting for messages...");
    }
}
消费者2(快速处理)
public class WorkQueueConsumer2 {
    private static final String QUEUE_NAME = "work_queue";
    
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        
        // 设置预取数量为1
        channel.basicQos(1);
        
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [Consumer2] Received: '" + message + "'");
            
            try {
                // 模拟较快的任务处理
                Thread.sleep(1000);
                System.out.println(" [Consumer2] Done: '" + message + "'");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {});
        
        System.out.println(" [Consumer2] Waiting for messages...");
    }
}

发布/订阅模型 (Publish/Subscribe)

一个生产者发送消息到交换机,交换机将消息广播到所有绑定的队列。

生产者
public class PubSubProducer {
    private static final String EXCHANGE_NAME = "fanout_exchange";
    
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        
        // 声明fanout类型的交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        
        for (int i = 1; i <= 5; i++) {
            String message = "Broadcast Message " + i;
            // 发送到交换机,而不是队列
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
            System.out.println(" [x] Sent to exchange: '" + message + "'");
            Thread.sleep(1000);
        }
        
        channel.close();
        connection.close();
    }
}
消费者1
public class PubSubConsumer1 {
    private static final String EXCHANGE_NAME = "fanout_exchange";
    
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        
        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        
        // 创建临时队列(非持久化,独占,自动删除)
        String queueName = channel.queueDeclare().getQueue();
        
        // 将队列绑定到交换机
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        
        System.out.println(" [Consumer1] Queue name: " + queueName);
        
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [Consumer1] Received: '" + message + "'");
        };
        
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
        
        System.out.println(" [Consumer1] Waiting for broadcast messages...");
    }
}
消费者2
public class PubSubConsumer2 {
    private static final String EXCHANGE_NAME = "fanout_exchange";
    
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
        
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        
        System.out.println(" [Consumer2] Queue name: " + queueName);
        
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [Consumer2] Received: '" + message + "'");
        };
        
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
        
        System.out.println(" [Consumer2] Waiting for broadcast messages...");
    }
}

路由模型 (Routing)

基于路由键进行消息筛选。

生产者
public class RoutingProducer {
    private static final String EXCHANGE_NAME = "direct_exchange";
    
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        
        // 声明direct类型的交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        
        // 定义路由键
        String[] routingKeys = {"info", "warning", "error"};
        
        for (String routingKey : routingKeys) {
            for (int i = 1; i <= 3; i++) {
                String message = routingKey + " message " + i;
                // 发送消息时指定路由键
                channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
                System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
                Thread.sleep(500);
            }
        }
        
        channel.close();
        connection.close();
    }
}
消费者(接收所有级别日志)
public class RoutingConsumerAll {
    private static final String EXCHANGE_NAME = "direct_exchange";
    
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        
        String queueName = channel.queueDeclare().getQueue();
        
        // 绑定多个路由键
        String[] routingKeys = {"info", "warning", "error"};
        for (String routingKey : routingKeys) {
            channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
            System.out.println(" [*] Binding queue '" + queueName + "' to exchange '" + 
                             EXCHANGE_NAME + "' with routing key '" + routingKey + "'");
        }
        
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            String routingKey = delivery.getEnvelope().getRoutingKey();
            System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
        };
        
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
        
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    }
}
消费者(只接收error级别日志)
public class RoutingConsumerError {
    private static final String EXCHANGE_NAME = "direct_exchange";
    
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        
        String queueName = channel.queueDeclare().getQueue();
        
        // 只绑定error路由键
        channel.queueBind(queueName, EXCHANGE_NAME, "error");
        System.out.println(" [*] Binding queue '" + queueName + "' to exchange '" + 
                         EXCHANGE_NAME + "' with routing key 'error'");
        
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [Error Consumer] Received: '" + message + "'");
        };
        
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
        
        System.out.println(" [Error Consumer] Waiting for error messages...");
    }
}

主题模型 (Topics)

基于模式匹配的路由,使用通配符。

生产者
public class TopicProducer {
    private static final String EXCHANGE_NAME = "topic_exchange";
    
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        
        // 声明topic类型的交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        
        // 定义路由键和消息
        String[][] messages = {
            {"order.create", "New order created"},
            {"order.update", "Order updated"},
            {"order.delete", "Order deleted"},
            {"payment.success", "Payment successful"},
            {"payment.failed", "Payment failed"},
            {"user.login", "User logged in"},
            {"user.logout", "User logged out"}
        };
        
        for (String[] msg : messages) {
            String routingKey = msg[0];
            String message = msg[1];
            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
            System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
            Thread.sleep(300);
        }
        
        channel.close();
        connection.close();
    }
}
消费者(接收所有订单相关消息)
public class TopicConsumerOrder {
    private static final String EXCHANGE_NAME = "topic_exchange";
    
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        
        String queueName = channel.queueDeclare().getQueue();
        
        // 使用通配符绑定:order.* 匹配所有order开头的路由键
        String bindingKey = "order.*";
        channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
        System.out.println(" [*] Binding queue '" + queueName + "' to exchange '" + 
                         EXCHANGE_NAME + "' with binding key '" + bindingKey + "'");
        
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            String routingKey = delivery.getEnvelope().getRoutingKey();
            System.out.println(" [Order Consumer] Received '" + routingKey + "':'" + message + "'");
        };
        
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
        
        System.out.println(" [Order Consumer] Waiting for order messages...");
    }
}
消费者(接收所有支付相关消息)
public class TopicConsumerPayment {
    private static final String EXCHANGE_NAME = "topic_exchange";
    
    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        
        String queueName = channel.queueDeclare().getQueue();
        
        // 使用通配符绑定:payment.# 匹配所有payment开头的路由键
        String bindingKey = "payment.#";
        channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
        System.out.println(" [*] Binding queue '" + queueName + "' to exchange '" + 
                         EXCHANGE_NAME + "' with binding key '" + bindingKey + "'");
        
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            String routingKey = delivery.getEnvelope().getRoutingKey();
            System.out.println(" [Payment Consumer] Received '" + routingKey + "':'" + message + "'");
        };
        
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
        
        System.out.println(" [Payment Consumer] Waiting for payment messages...");
    }
}

4. 消息确认机制、重试机制、死信队列

消息确认机制

1.1 生产者确认(Publisher Confirm)
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConfirmCallback;

public class PublisherConfirmExample {
    
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            
            // 开启发布确认模式
            channel.confirmSelect();
            
            // 异步确认监听器
            ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
                System.out.println("消息确认成功,标签:" + deliveryTag);
                if (multiple) {
                    System.out.println("批量确认,到标签:" + deliveryTag);
                }
            };
            
            ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
                System.out.println("消息确认失败,标签:" + deliveryTag);
                // 处理失败逻辑
            };
            
            channel.addConfirmListener(ackCallback, nackCallback);
            
            // 同步确认(等待确认)
            // channel.waitForConfirms();
            
            // 异步确认(推荐)
            channel.basicPublish("exchange", "routingKey", null, "消息".getBytes());
            
            // 等待所有消息被确认
            channel.waitForConfirmsOrDie(5000);
        }
    }
}
1.2 消费者确认(Consumer Acknowledgement)
import com.rabbitmq.client.*;

public class ConsumerAckExample {
    
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        
        // 关闭自动确认,改为手动确认
        boolean autoAck = false;
        
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            
            try {
                // 处理消息
                System.out.println("收到消息: " + message);
                
                // 处理成功,手动确认
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                
                // 拒绝消息(单个拒绝,不重新入队)
                // channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
                
                // 拒绝消息(批量拒绝)
                // channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
                
            } catch (Exception e) {
                // 处理失败,重新入队
                channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
            }
        };
        
        channel.basicConsume("queueName", autoAck, deliverCallback, consumerTag -> {});
    }
}

重试机制

2.1 Spring Boot 集成重试
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;

@Configuration
public class RabbitMQConfig {
    
    // 定义重试队列和死信交换器
    @Bean
    public Queue retryQueue() {
        return QueueBuilder.durable("retry.queue")
                .withArgument("x-dead-letter-exchange", "dlx.exchange")
                .withArgument("x-dead-letter-routing-key", "dlx.routing.key")
                .withArgument("x-message-ttl", 10000) // 10秒后进入死信队列
                .build();
    }
    
    @Bean
    public DirectExchange retryExchange() {
        return new DirectExchange("retry.exchange");
    }
    
    @Bean
    public Binding retryBinding() {
        return BindingBuilder.bind(retryQueue())
                .to(retryExchange())
                .with("retry.routing.key");
    }
    
    // 配置重试策略
    @Bean
    public RabbitListenerContainerFactory<SimpleMessageListenerContainer> rabbitListenerContainerFactory(
            ConnectionFactory connectionFactory) {
        
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrentConsumers(3);
        factory.setMaxConcurrentConsumers(10);
        
        // 配置重试
        factory.setAdviceChain(retryOperationsInterceptor());
        
        return factory;
    }
    
    @Bean
    public RetryOperationsInterceptor retryOperationsInterceptor() {
        return RetryInterceptorBuilder.stateless()
                .retryPolicy(retryPolicy())
                .backOffPolicy(backOffPolicy())
                .recoverer(recoverer())
                .build();
    }
    
    @Bean
    public SimpleRetryPolicy retryPolicy() {
        Map<Class<? extends Throwable>, Boolean> retryableExceptions = new HashMap<>();
        retryableExceptions.put(Exception.class, true);
        
        return new SimpleRetryPolicy(3, retryableExceptions);
    }
    
    @Bean
    public ExponentialBackOffPolicy backOffPolicy() {
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(1000);
        backOffPolicy.setMultiplier(2.0);
        backOffPolicy.setMaxInterval(10000);
        return backOffPolicy;
    }
    
    @Bean
    public MessageRecoverer recoverer() {
        return new RepublishMessageRecoverer(rabbitTemplate(), "retry.exchange", "retry.routing.key");
    }
}
2.2 手动重试实现
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MessageConsumer {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    private static final int MAX_RETRY_COUNT = 3;
    
    @RabbitListener(queues = "main.queue")
    public void handleMessage(Message message, Channel channel) throws IOException {
        try {
            // 处理消息
            processMessage(message);
            
            // 确认消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            
        } catch (Exception e) {
            // 获取重试次数
            Integer retryCount = (Integer) message.getMessageProperties()
                    .getHeaders()
                    .getOrDefault("retry-count", 0);
            
            if (retryCount < MAX_RETRY_COUNT) {
                // 增加重试次数
                message.getMessageProperties().getHeaders().put("retry-count", retryCount + 1);
                
                // 延迟重试
                sendToRetryQueue(message, retryCount + 1);
                
                // 确认原消息(已转移到重试队列)
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } else {
                // 超过最大重试次数,发送到死信队列
                sendToDeadLetterQueue(message);
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }
        }
    }
    
    private void processMessage(Message message) {
        // 业务处理逻辑
        String content = new String(message.getBody());
        System.out.println("处理消息: " + content);
        
        // 模拟处理失败
        if (content.contains("error")) {
            throw new RuntimeException("处理消息失败");
        }
    }
    
    private void sendToRetryQueue(Message message, int retryCount) {
        // 计算延迟时间(指数退避)
        long delay = (long) (Math.pow(2, retryCount) * 1000);
        
        // 设置延迟属性
        message.getMessageProperties().setExpiration(String.valueOf(delay));
        
        // 发送到延迟队列
        rabbitTemplate.send("retry.exchange", "retry.routing.key", message);
    }
    
    private void sendToDeadLetterQueue(Message message) {
        rabbitTemplate.send("dlx.exchange", "dlx.routing.key", message);
    }
}

死信队列(DLQ)

3.1 死信队列配置
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DeadLetterConfig {
    
    // 主队列(绑定死信交换器)
    @Bean
    public Queue mainQueue() {
        return QueueBuilder.durable("main.queue")
                .withArgument("x-dead-letter-exchange", "dlx.exchange")  // 死信交换器
                .withArgument("x-dead-letter-routing-key", "dlx.queue")  // 死信路由键
                .withArgument("x-max-length", 1000)  // 队列最大长度
                .withArgument("x-message-ttl", 60000)  // 消息TTL(60秒)
                .build();
    }
    
    // 死信交换器
    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange("dlx.exchange");
    }
    
    // 死信队列
    @Bean
    public Queue deadLetterQueue() {
        return QueueBuilder.durable("dlx.queue").build();
    }
    
    // 死信队列绑定
    @Bean
    public Binding deadLetterBinding() {
        return BindingBuilder.bind(deadLetterQueue())
                .to(deadLetterExchange())
                .with("dlx.queue");
    }
    
    // 主交换器
    @Bean
    public DirectExchange mainExchange() {
        return new DirectExchange("main.exchange");
    }
    
    // 主队列绑定
    @Bean
    public Binding mainBinding() {
        return BindingBuilder.bind(mainQueue())
                .to(mainExchange())
                .with("main.routing.key");
    }
}
3.2 死信队列消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class DeadLetterConsumer {
    
    @RabbitListener(queues = "dlx.queue")
    public void handleDeadLetter(Message message) {
        String content = new String(message.getBody());
        System.out.println("收到死信消息: " + content);
        
        // 获取死信原因
        String reason = message.getMessageProperties().getReceivedReason();
        System.out.println("死信原因: " + reason);
        
        // 处理死信(记录日志、人工干预、重试等)
        handleDeadLetterMessage(message);
    }
    
    private void handleDeadLetterMessage(Message message) {
        // 1. 记录到数据库
        // 2. 发送告警
        // 3. 人工处理
        // 4. 尝试重新处理
    }
}

完整示例:整合所有机制

4.1 配置类
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableRabbit
public class RabbitMQCompleteConfig {
    
    // ========== 主队列和交换器 ==========
    @Bean
    public Queue mainQueue() {
        return QueueBuilder.durable("order.queue")
                .withArgument("x-dead-letter-exchange", "order.dlx.exchange")
                .withArgument("x-dead-letter-routing-key", "order.dlx.queue")
                .build();
    }
    
    @Bean
    public DirectExchange mainExchange() {
        return new DirectExchange("order.exchange");
    }
    
    @Bean
    public Binding mainBinding() {
        return BindingBuilder.bind(mainQueue())
                .to(mainExchange())
                .with("order.create");
    }
    
    // ========== 重试队列 ==========
    @Bean
    public Queue retryQueue() {
        return QueueBuilder.durable("order.retry.queue")
                .withArgument("x-dead-letter-exchange", "order.exchange")
                .withArgument("x-dead-letter-routing-key", "order.create")
                .withArgument("x-message-ttl", 5000)  // 5秒后重试
                .build();
    }
    
    @Bean
    public DirectExchange retryExchange() {
        return new DirectExchange("order.retry.exchange");
    }
    
    @Bean
    public Binding retryBinding() {
        return BindingBuilder.bind(retryQueue())
                .to(retryExchange())
                .with("order.retry");
    }
    
    // ========== 死信队列 ==========
    @Bean
    public Queue deadLetterQueue() {
        return QueueBuilder.durable("order.dlx.queue").build();
    }
    
    @Bean
    public DirectExchange deadLetterExchange() {
        return new DirectExchange("order.dlx.exchange");
    }
    
    @Bean
    public Binding deadLetterBinding() {
        return BindingBuilder.bind(deadLetterQueue())
                .to(deadLetterExchange())
                .with("order.dlx.queue");
    }
    
    // ========== 消息转换器 ==========
    @Bean
    public Jackson2JsonMessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMessageConverter(messageConverter());
        template.setMandatory(true);
        
        // 确认回调
        template.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                System.out.println("消息发送成功: " + correlationData);
            } else {
                System.out.println("消息发送失败: " + cause);
            }
        });
        
        // 返回回调
        template.setReturnsCallback(returned -> {
            System.out.println("消息路由失败: " + returned.getMessage());
        });
        
        return template;
    }
}
4.2 生产者
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class OrderProducer {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendOrder(Order order) {
        CorrelationData correlationData = new CorrelationData(order.getOrderId());
        
        rabbitTemplate.convertAndSend(
            "order.exchange",
            "order.create",
            order,
            message -> {
                // 设置消息属性
                message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                message.getMessageProperties().setPriority(5);
                return message;
            },
            correlationData
        );
    }
    
    public void sendToRetry(Order order, int retryCount) {
        // 设置重试次数
        rabbitTemplate.convertAndSend(
            "order.retry.exchange",
            "order.retry",
            order,
            message -> {
                message.getMessageProperties().setHeader("retry-count", retryCount);
                return message;
            }
        );
    }
}
4.3 消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.core.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class OrderConsumer {
    
    private static final int MAX_RETRY_COUNT = 3;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @RabbitListener(queues = "order.queue")
    public void processOrder(Order order, Message message) {
        try {
            System.out.println("处理订单: " + order.getOrderId());
            
            // 业务处理逻辑
            boolean success = processBusiness(order);
            
            if (!success) {
                throw new RuntimeException("处理订单失败");
            }
            
        } catch (Exception e) {
            handleFailure(order, message, e);
        }
    }
    
    private boolean processBusiness(Order order) {
        // 模拟业务处理
        return !order.getOrderId().contains("FAIL");
    }
    
    private void handleFailure(Order order, Message message, Exception e) {
        Integer retryCount = (Integer) message.getMessageProperties()
                .getHeaders()
                .getOrDefault("retry-count", 0);
        
        if (retryCount < MAX_RETRY_COUNT) {
            System.out.println("第" + (retryCount + 1) + "次重试,订单: " + order.getOrderId());
            
            // 发送到重试队列
            rabbitTemplate.convertAndSend(
                "order.retry.exchange",
                "order.retry",
                order,
                msg -> {
                    msg.getMessageProperties().setHeader("retry-count", retryCount + 1);
                    return msg;
                }
            );
        } else {
            System.out.println("达到最大重试次数,发送到死信队列,订单: " + order.getOrderId());
            
            // 发送到死信队列
            rabbitTemplate.convertAndSend(
                "order.dlx.exchange",
                "order.dlx.queue",
                order
            );
        }
    }
    
    @RabbitListener(queues = "order.dlx.queue")
    public void handleDeadLetter(Order order) {
        System.out.println("处理死信订单: " + order.getOrderId());
        // 记录日志、告警、人工处理等
    }
}

5. 消息丢失、消息重复消费解决方案

1. 幂等性设计(最常用)

1.1 数据库唯一约束

@Component
public class MessageConsumer {
    
    @Autowired
    private MessageIdRepository messageIdRepository;
    
    @RabbitListener(queues = "order.queue")
    public void processOrder(OrderMessage message, Channel channel, 
                           @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        
        // 1. 检查消息是否已处理
        if (messageIdRepository.existsById(message.getMessageId())) {
            // 已处理,直接确认
            channel.basicAck(deliveryTag, false);
            return;
        }
        
        try {
            // 2. 业务处理
            orderService.createOrder(message);
            
            // 3. 记录已处理的消息ID
            messageIdRepository.save(new ProcessedMessage(
                message.getMessageId(),
                LocalDateTime.now()
            ));
            
            // 4. 手动确认
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            // 处理失败,拒绝消息
            channel.basicNack(deliveryTag, false, true);
        }
    }
}

1.2 Redis 实现幂等性

@Component
public class IdempotentConsumer {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    private static final String MESSAGE_PREFIX = "msg:processed:";
    private static final long EXPIRE_TIME = 24 * 60 * 60; // 24小时
    
    @RabbitListener(queues = "payment.queue")
    public void processPayment(PaymentMessage message, Channel channel,
                              @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        
        String messageId = message.getMessageId();
        String redisKey = MESSAGE_PREFIX + messageId;
        
        // 使用 SETNX 命令保证原子性
        Boolean success = redisTemplate.opsForValue()
            .setIfAbsent(redisKey, "1", EXPIRE_TIME, TimeUnit.SECONDS);
        
        if (Boolean.FALSE.equals(success)) {
            // 消息已处理
            channel.basicAck(deliveryTag, false);
            return;
        }
        
        try {
            // 业务处理
            paymentService.processPayment(message);
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            // 删除Redis记录,允许重试
            redisTemplate.delete(redisKey);
            channel.basicNack(deliveryTag, false, true);
        }
    }
}

2. 消息去重表设计

@Entity
@Table(name = "processed_messages")
public class ProcessedMessage {
    @Id
    private String messageId;
    
    @Column(nullable = false)
    private String businessType; // 业务类型,如: ORDER_CREATE
    
    @Column(nullable = false)
    private LocalDateTime processedTime;
    
    @Column(nullable = false)
    private String status; // PROCESSED, FAILED
}

3. 基于版本号的乐观锁

@Service
public class OrderService {
    
    @Transactional
    public void createOrderWithVersion(OrderMessage message) {
        // 先查询订单是否存在
        Order order = orderRepository.findByOrderNo(message.getOrderNo());
        
        if (order != null) {
            // 使用版本号判断是否已处理
            if (order.getVersion() >= message.getVersion()) {
                return; // 已处理更新的版本
            }
        }
        
        // 创建或更新订单
        order = createOrUpdateOrder(message);
        
        // 设置版本号
        order.setVersion(message.getVersion());
        orderRepository.save(order);
    }
}

4. 消息属性设置

4.1 生产者设置消息ID

@Component
public class MessageProducer {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void sendOrderMessage(Order order) {
        String messageId = UUID.randomUUID().toString();
        
        MessageProperties properties = new MessageProperties();
        properties.setMessageId(messageId);
        properties.setTimestamp(new Date());
        
        Message message = new Message(
            objectMapper.writeValueAsBytes(order),
            properties
        );
        
        rabbitTemplate.send("order.exchange", "order.key", message);
    }
}

4.2 配置消费者确认模式

# application.yml
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual  # 手动确认
        prefetch: 1               # 每次只获取一条消息
        retry:
          enabled: true
          max-attempts: 3
          initial-interval: 1000

5. 完整解决方案示例

@Component
@Slf4j
public class ReliableMessageConsumer {
    
    @Autowired
    private RedisTemplate<String, String> redisTemplate;
    
    @Autowired
    private OrderService orderService;
    
    /**
     * 可靠的消息消费处理器
     */
    @RabbitListener(queues = "reliable.queue")
    public void handleMessage(Message message, Channel channel,
                             @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
        
        String messageId = message.getMessageProperties().getMessageId();
        String body = new String(message.getBody());
        
        // 1. 消息去重检查
        if (isMessageProcessed(messageId)) {
            log.info("消息已处理,直接确认: {}", messageId);
            ackMessage(channel, deliveryTag);
            return;
        }
        
        try {
            // 2. 业务处理
            processBusiness(messageId, body);
            
            // 3. 记录处理成功
            markMessageAsProcessed(messageId);
            
            // 4. 确认消息
            ackMessage(channel, deliveryTag);
            
        } catch (BusinessException e) {
            // 业务异常,记录日志,确认消息
            log.error("业务处理失败,消息丢弃: {}", messageId, e);
            ackMessage(channel, deliveryTag);
        } catch (Exception e) {
            // 系统异常,重新入队
            log.error("系统异常,消息重试: {}", messageId, e);
            nackMessage(channel, deliveryTag);
        }
    }
    
    private boolean isMessageProcessed(String messageId) {
        String key = "msg:idempotent:" + messageId;
        return Boolean.TRUE.equals(redisTemplate.hasKey(key));
    }
    
    private void markMessageAsProcessed(String messageId) {
        String key = "msg:idempotent:" + messageId;
        redisTemplate.opsForValue().set(key, "1", 2, TimeUnit.HOURS);
    }
    
    private void ackMessage(Channel channel, long deliveryTag) {
        try {
            channel.basicAck(deliveryTag, false);
        } catch (IOException e) {
            log.error("消息确认失败", e);
        }
    }
    
    private void nackMessage(Channel channel, long deliveryTag) {
        try {
            channel.basicNack(deliveryTag, false, true);
        } catch (IOException e) {
            log.error("消息拒绝失败", e);
        }
    }
}

6. 使用 Spring Cloud Stream

@Configuration
public class RabbitMQConfig {
    
    @Bean
    public Consumer<Message<OrderMessage>> orderConsumer() {
        return message -> {
            // 获取消息头中的ID
            String messageId = (String) message.getHeaders()
                .get(AmqpHeaders.MESSAGE_ID);
            
            // 幂等性检查
            if (idempotentChecker.isDuplicate(messageId)) {
                return;
            }
            
            // 处理业务
            orderService.process(message.getPayload());
            
            // 记录处理
            idempotentChecker.markAsProcessed(messageId);
        };
    }
}

7. 最佳实践建议

  1. 优先使用幂等性设计 :这是最根本的解决方案
  2. 消息设计 :
  1. 每个消息携带唯一ID
  2. 包含业务标识(如订单号)
  3. 添加时间戳和版本号
  1. 消费端配置 :
@Configuration
public class ConsumerConfig {
    
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            ConnectionFactory connectionFactory) {
        
        SimpleRabbitListenerContainerFactory factory = 
            new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        
        // 手动确认
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        
        // 并发消费者数量
        factory.setConcurrentConsumers(3);
        factory.setMaxConcurrentConsumers(10);
        
        // 预取数量
        factory.setPrefetchCount(1);
        
        // 重试策略
        factory.setAdviceChain(
            RetryInterceptorBuilder.stateless()
                .maxAttempts(3)
                .backOffOptions(1000, 2.0, 10000)
                .build()
        );
        
        return factory;
    }
}
  1. 监控和告警 :
  1. 监控重复消息比例
  2. 设置消息积压告警
  3. 记录消息处理日志

选择哪种方案取决于你的业务场景:

  • 对数据一致性要求高:使用数据库唯一约束
  • 高性能场景:使用 Redis
  • 简单业务:使用版本号控制
  • 金融级业务:结合多种方案

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐