【SpringBoot 从入门到架构师】第17章:消息队列RabbitMQ整合
·
1. RabbitMQ核心概念、环境搭建
一、RabbitMQ核心概念
1. 基本概念
- RabbitMQ :基于AMQP协议的开源消息代理软件
- AMQP :高级消息队列协议,定义了消息中间件的统一标准
2. 核心组件
生产者 (Producer)
- 发送消息的应用程序
- 连接到RabbitMQ服务器,将消息发布到交换机
消费者 (Consumer)
- 接收消息的应用程序
- 订阅队列并处理消息
交换机 (Exchange)
接收生产者发送的消息,并根据路由规则将消息分发到队列
四种类型:
- Direct Exchange :精确匹配routing key
- Fanout Exchange :广播到所有绑定队列
- Topic Exchange :模式匹配routing key
- Headers Exchange :基于消息头匹配
队列 (Queue)
- 存储消息的缓冲区
- 消费者从队列获取消息
绑定 (Binding)
- 连接交换机和队列的规则
- 包含路由键(routing key)信息
路由键 (Routing Key)
- 生产者发送消息时指定的键
- 交换机根据此键决定消息路由
虚拟主机 (Virtual Host)
- 逻辑隔离,类似命名空间
- 每个vhost有独立的交换机、队列和权限
3. 消息确认机制
- 消费者确认 (ACK) :消费者处理成功后确认
- 生产者确认 :消息成功到达交换机或队列的确认
- 持久化 :消息和队列的持久化存储
二、环境搭建
1. Docker方式(推荐)
# 拉取RabbitMQ镜像(带管理界面)
docker pull rabbitmq:3-management
# 运行容器
docker run -d \
--name rabbitmq \
-p 5672:5672 \ # AMQP协议端口
-p 15672:15672 \ # 管理界面端口
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin123 \
rabbitmq:3-management
2. 本地安装(Ubuntu/Debian)
# 安装Erlang(RabbitMQ依赖)
sudo apt-get update
sudo apt-get install -y erlang
# 安装RabbitMQ
sudo apt-get install -y rabbitmq-server
# 启动服务
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server
# 启用管理插件
sudo rabbitmq-plugins enable rabbitmq_management
# 创建管理员用户
sudo rabbitmqctl add_user admin admin123
sudo rabbitmqctl set_user_tags admin administrator
sudo rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
3. Windows安装
- 安装Erlang:从官网下载安装包
- 安装RabbitMQ:从官网下载安装包
- 启用管理插件:
rabbitmq-plugins enable rabbitmq_management
- 启动服务:
net start RabbitMQ
4. 基本配置
配置文件位置
- Linux:
/etc/rabbitmq/rabbitmq.conf - Docker: 挂载配置文件到容器
示例配置文件
# 监听端口
listeners.tcp.default = 5672
# 管理界面
management.tcp.port = 15672
# 默认虚拟主机
default_vhost = /
# 默认用户(仅开发环境)
default_user = guest
default_pass = guest
# 持久化设置
queue_master_locator = min-masters
5. 验证安装
访问管理界面:http://localhost:15672
- 用户名:admin
- 密码:admin123
6. 常用管理命令
# 查看状态
sudo rabbitmqctl status
# 查看用户
sudo rabbitmqctl list_users
# 查看队列
sudo rabbitmqctl list_queues
# 查看交换机
sudo rabbitmqctl list_exchanges
# 查看绑定
sudo rabbitmqctl list_bindings
三、生产环境建议
- 集群部署 :至少3个节点保证高可用
- 持久化设置 :消息和队列都启用持久化
- 监控告警 :使用Prometheus + Grafana监控
- 资源限制 :设置内存和磁盘使用限制
- SSL/TLS :启用加密传输
- 权限控制 :严格管理用户权限
这样你就有了一个完整的RabbitMQ环境,可以开始进行消息队列的开发了!
2. SpringBoot整合RabbitMQ、交换机/队列配置
添加依赖
<!-- pom.xml -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
基础配置
# application.yml
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
# 生产者确认机制
publisher-confirm-type: correlated
publisher-returns: true
# 消费者配置
listener:
simple:
acknowledge-mode: manual # 手动ACK
prefetch: 10 # 每次处理10条消息
retry:
enabled: true # 开启重试
max-attempts: 3 # 最大重试次数
交换机/队列配置类
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
// ==================== 直连交换机配置 ====================
/**
* 直连交换机
*/
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct.exchange", true, false);
}
/**
* 直连队列
*/
@Bean
public Queue directQueue() {
return QueueBuilder.durable("direct.queue")
.withArgument("x-dead-letter-exchange", "dlx.exchange") // 死信交换机
.withArgument("x-dead-letter-routing-key", "dlx.routing.key")
.withArgument("x-message-ttl", 10000) // TTL 10秒
.build();
}
/**
* 绑定直连队列到交换机
*/
@Bean
public Binding directBinding() {
return BindingBuilder.bind(directQueue())
.to(directExchange())
.with("direct.routing.key");
}
// ==================== 主题交换机配置 ====================
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topic.exchange", true, false);
}
@Bean
public Queue topicQueue1() {
return QueueBuilder.durable("topic.queue1").build();
}
@Bean
public Queue topicQueue2() {
return QueueBuilder.durable("topic.queue2").build();
}
@Bean
public Binding topicBinding1() {
return BindingBuilder.bind(topicQueue1())
.to(topicExchange())
.with("topic.*.key");
}
@Bean
public Binding topicBinding2() {
return BindingBuilder.bind(topicQueue2())
.to(topicExchange())
.with("topic.#");
}
// ==================== 扇形交换机配置 ====================
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout.exchange", true, false);
}
@Bean
public Queue fanoutQueue1() {
return QueueBuilder.durable("fanout.queue1").build();
}
@Bean
public Queue fanoutQueue2() {
return QueueBuilder.durable("fanout.queue2").build();
}
@Bean
public Binding fanoutBinding1() {
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
@Bean
public Binding fanoutBinding2() {
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}
// ==================== 死信队列配置 ====================
@Bean
public DirectExchange dlxExchange() {
return new DirectExchange("dlx.exchange", true, false);
}
@Bean
public Queue dlxQueue() {
return QueueBuilder.durable("dlx.queue").build();
}
@Bean
public Binding dlxBinding() {
return BindingBuilder.bind(dlxQueue())
.to(dlxExchange())
.with("dlx.routing.key");
}
// ==================== 延迟队列配置 ====================
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange("delayed.exchange", "x-delayed-message", true, false, args);
}
@Bean
public Queue delayedQueue() {
return QueueBuilder.durable("delayed.queue").build();
}
@Bean
public Binding delayedBinding() {
return BindingBuilder.bind(delayedQueue())
.to(delayedExchange())
.with("delayed.routing.key")
.noargs();
}
}
生产者配置
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 发送消息到直连交换机
*/
public void sendDirectMessage(Object message) {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(
"direct.exchange",
"direct.routing.key",
message,
msg -> {
// 设置消息属性
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
msg.getMessageProperties().setContentType("application/json");
msg.getMessageProperties().setMessageId(UUID.randomUUID().toString());
return msg;
},
correlationData
);
}
/**
* 发送延迟消息
*/
public void sendDelayedMessage(Object message, int delayMillis) {
rabbitTemplate.convertAndSend(
"delayed.exchange",
"delayed.routing.key",
message,
msg -> {
msg.getMessageProperties().setHeader("x-delay", delayMillis);
return msg;
}
);
}
/**
* 发送到主题交换机
*/
public void sendTopicMessage(String routingKey, Object message) {
rabbitTemplate.convertAndSend("topic.exchange", routingKey, message);
}
/**
* 发送到扇形交换机
*/
public void sendFanoutMessage(Object message) {
rabbitTemplate.convertAndSend("fanout.exchange", "", message);
}
}
消费者配置
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
@Component
public class MessageConsumer {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 监听直连队列 - 手动ACK
*/
@RabbitListener(queues = "direct.queue")
public void handleDirectMessage(Message message, Channel channel) throws IOException {
try {
String msg = new String(message.getBody());
System.out.println("收到消息: " + msg);
// 业务处理逻辑
processMessage(msg);
// 手动确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 处理失败,拒绝消息
channel.basicNack(
message.getMessageProperties().getDeliveryTag(),
false,
true // 重新入队
);
}
}
/**
* 监听主题队列1
*/
@RabbitListener(queues = "topic.queue1")
public void handleTopicMessage1(String message) {
System.out.println("Topic Queue1 收到: " + message);
}
/**
* 监听死信队列
*/
@RabbitListener(queues = "dlx.queue")
public void handleDlxMessage(String message) {
System.out.println("死信队列收到: " + message);
// 处理死信消息
}
/**
* 监听延迟队列
*/
@RabbitListener(queues = "delayed.queue")
public void handleDelayedMessage(String message) {
System.out.println("延迟消息到达: " + message);
}
private void processMessage(String message) {
// 业务处理逻辑
}
}
确认回调配置
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
@Configuration
public class RabbitConfirmConfig {
@Resource
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init() {
// 消息发送到交换机的确认回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息发送到交换机成功: " + correlationData);
} else {
System.out.println("消息发送到交换机失败: " + cause);
// 消息补偿机制
}
}
});
// 消息从交换机路由到队列的失败回调
rabbitTemplate.setReturnsCallback(returned -> {
System.out.println("消息路由到队列失败: " + returned.getMessage());
System.out.println("交换机: " + returned.getExchange());
System.out.println("路由键: " + returned.getRoutingKey());
// 处理路由失败的消息
});
}
}
工具类
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.stereotype.Component;
@Component
public class RabbitMQUtils {
/**
* 构建消息
*/
public static Message buildMessage(Object object, MessageProperties properties) {
MessageConverter converter = new SimpleMessageConverter();
return converter.toMessage(object, properties);
}
/**
* 解析消息
*/
public static Object parseMessage(Message message) {
MessageConverter converter = new SimpleMessageConverter();
return converter.fromMessage(message);
}
}
高级配置
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQAdvancedConfig {
/**
* JSON消息转换器
*/
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
/**
* 自定义监听容器工厂
*/
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory,
Jackson2JsonMessageConverter converter) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(converter);
factory.setConcurrentConsumers(3); // 最小消费者数量
factory.setMaxConcurrentConsumers(10); // 最大消费者数量
factory.setPrefetchCount(5); // 每次预取消息数量
factory.setDefaultRequeueRejected(false); // 拒绝时不重新入队
return factory;
}
}
测试类
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class RabbitMQTest {
@Autowired
private MessageProducer producer;
@Test
void testDirectExchange() {
producer.sendDirectMessage("Hello RabbitMQ!");
}
@Test
void testDelayedMessage() {
producer.sendDelayedMessage("Delayed Message", 5000); // 5秒延迟
}
@Test
void testTopicExchange() {
producer.sendTopicMessage("topic.order.key", "Order Message");
producer.sendTopicMessage("topic.user.key", "User Message");
}
}
配置说明
交换机类型:
- DirectExchange : 直连交换机,完全匹配 routingKey
- TopicExchange : 主题交换机,支持通配符匹配
- FanoutExchange : 扇形交换机,广播到所有绑定的队列
- CustomExchange : 自定义交换机,用于延迟队列
队列参数:
-
durable: 是否持久化 -
exclusive: 是否排他 -
autoDelete: 自动删除 -
x-dead-letter-exchange: 死信交换机 -
x-message-ttl: 消息存活时间 -
x-max-length: 队列最大长度
消息确认机制:
- 自动ACK : 消息消费后自动确认
- 手动ACK : 需要显式调用 basicAck/basicNack
- 发布确认 : 生产者确认消息到达交换机
注意事项:
- 生产环境建议开启消息持久化
- 合理设置消息TTL,避免消息堆积
- 使用手动ACK确保消息可靠消费
- 配置死信队列处理异常消息
- 监控队列长度,防止内存溢出
- 使用JSON序列化提高可读性
这个配置提供了完整的RabbitMQ集成方案,包括多种交换机类型、消息确认机制、延迟队列、死信队列等高级功能。
3. 五种消息模型实战
RabbitMQ 提供了多种消息模型,以下是五种核心模型的 Java 实现示例。
环境准备
Maven 依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.16.0</version>
</dependency>
连接工具类
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMQUtil {
private static final String HOST = "localhost";
private static final int PORT = 5672;
private static final String USERNAME = "guest";
private static final String PASSWORD = "guest";
private static final String VIRTUAL_HOST = "/";
public static Connection getConnection() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost(VIRTUAL_HOST);
return factory.newConnection();
}
}
简单队列模型 (Hello World)
最简单的模型,一个生产者对应一个消费者。
生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class SimpleProducer {
private static final String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws Exception {
// 1. 获取连接
Connection connection = RabbitMQUtil.getConnection();
// 2. 创建通道
Channel channel = connection.createChannel();
// 3. 声明队列
// 参数:queue - 队列名称
// durable - 是否持久化
// exclusive - 是否独占(只能被一个连接使用)
// autoDelete - 是否自动删除(没有消费者时自动删除)
// arguments - 其他参数
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 4. 发送消息
String message = "Hello RabbitMQ!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent: '" + message + "'");
// 5. 关闭资源
channel.close();
connection.close();
}
}
消费者
import com.rabbitmq.client.*;
public class SimpleConsumer {
private static final String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws Exception {
// 1. 获取连接
Connection connection = RabbitMQUtil.getConnection();
// 2. 创建通道
Channel channel = connection.createChannel();
// 3. 声明队列(确保队列存在)
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 4. 创建消费者回调
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received: '" + message + "'");
};
// 5. 监听队列
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
}
}
工作队列模型 (Work Queues)
一个生产者,多个消费者,消息被轮询分发。
生产者
public class WorkQueueProducer {
private static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 发送10条消息
for (int i = 1; i <= 10; i++) {
String message = "Task " + i;
// 设置消息持久化
AMQP.BasicProperties props = MessageProperties.PERSISTENT_TEXT_PLAIN;
channel.basicPublish("", QUEUE_NAME, props, message.getBytes());
System.out.println(" [x] Sent: '" + message + "'");
Thread.sleep(500); // 模拟任务产生间隔
}
channel.close();
connection.close();
}
}
消费者1(慢速处理)
public class WorkQueueConsumer1 {
private static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 设置预取数量为1,实现公平分发
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [Consumer1] Received: '" + message + "'");
try {
// 模拟耗时任务
Thread.sleep(2000);
System.out.println(" [Consumer1] Done: '" + message + "'");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 手动确认消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
// 关闭自动确认,改为手动确认
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {});
System.out.println(" [Consumer1] Waiting for messages...");
}
}
消费者2(快速处理)
public class WorkQueueConsumer2 {
private static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 设置预取数量为1
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [Consumer2] Received: '" + message + "'");
try {
// 模拟较快的任务处理
Thread.sleep(1000);
System.out.println(" [Consumer2] Done: '" + message + "'");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {});
System.out.println(" [Consumer2] Waiting for messages...");
}
}
发布/订阅模型 (Publish/Subscribe)
一个生产者发送消息到交换机,交换机将消息广播到所有绑定的队列。
生产者
public class PubSubProducer {
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// 声明fanout类型的交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
for (int i = 1; i <= 5; i++) {
String message = "Broadcast Message " + i;
// 发送到交换机,而不是队列
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent to exchange: '" + message + "'");
Thread.sleep(1000);
}
channel.close();
connection.close();
}
}
消费者1
public class PubSubConsumer1 {
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
// 创建临时队列(非持久化,独占,自动删除)
String queueName = channel.queueDeclare().getQueue();
// 将队列绑定到交换机
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [Consumer1] Queue name: " + queueName);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [Consumer1] Received: '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
System.out.println(" [Consumer1] Waiting for broadcast messages...");
}
}
消费者2
public class PubSubConsumer2 {
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [Consumer2] Queue name: " + queueName);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [Consumer2] Received: '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
System.out.println(" [Consumer2] Waiting for broadcast messages...");
}
}
路由模型 (Routing)
基于路由键进行消息筛选。
生产者
public class RoutingProducer {
private static final String EXCHANGE_NAME = "direct_exchange";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// 声明direct类型的交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 定义路由键
String[] routingKeys = {"info", "warning", "error"};
for (String routingKey : routingKeys) {
for (int i = 1; i <= 3; i++) {
String message = routingKey + " message " + i;
// 发送消息时指定路由键
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
Thread.sleep(500);
}
}
channel.close();
connection.close();
}
}
消费者(接收所有级别日志)
public class RoutingConsumerAll {
private static final String EXCHANGE_NAME = "direct_exchange";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = channel.queueDeclare().getQueue();
// 绑定多个路由键
String[] routingKeys = {"info", "warning", "error"};
for (String routingKey : routingKeys) {
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
System.out.println(" [*] Binding queue '" + queueName + "' to exchange '" +
EXCHANGE_NAME + "' with routing key '" + routingKey + "'");
}
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
}
}
消费者(只接收error级别日志)
public class RoutingConsumerError {
private static final String EXCHANGE_NAME = "direct_exchange";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = channel.queueDeclare().getQueue();
// 只绑定error路由键
channel.queueBind(queueName, EXCHANGE_NAME, "error");
System.out.println(" [*] Binding queue '" + queueName + "' to exchange '" +
EXCHANGE_NAME + "' with routing key 'error'");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [Error Consumer] Received: '" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
System.out.println(" [Error Consumer] Waiting for error messages...");
}
}
主题模型 (Topics)
基于模式匹配的路由,使用通配符。
生产者
public class TopicProducer {
private static final String EXCHANGE_NAME = "topic_exchange";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// 声明topic类型的交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 定义路由键和消息
String[][] messages = {
{"order.create", "New order created"},
{"order.update", "Order updated"},
{"order.delete", "Order deleted"},
{"payment.success", "Payment successful"},
{"payment.failed", "Payment failed"},
{"user.login", "User logged in"},
{"user.logout", "User logged out"}
};
for (String[] msg : messages) {
String routingKey = msg[0];
String message = msg[1];
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
Thread.sleep(300);
}
channel.close();
connection.close();
}
}
消费者(接收所有订单相关消息)
public class TopicConsumerOrder {
private static final String EXCHANGE_NAME = "topic_exchange";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = channel.queueDeclare().getQueue();
// 使用通配符绑定:order.* 匹配所有order开头的路由键
String bindingKey = "order.*";
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
System.out.println(" [*] Binding queue '" + queueName + "' to exchange '" +
EXCHANGE_NAME + "' with binding key '" + bindingKey + "'");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [Order Consumer] Received '" + routingKey + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
System.out.println(" [Order Consumer] Waiting for order messages...");
}
}
消费者(接收所有支付相关消息)
public class TopicConsumerPayment {
private static final String EXCHANGE_NAME = "topic_exchange";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = channel.queueDeclare().getQueue();
// 使用通配符绑定:payment.# 匹配所有payment开头的路由键
String bindingKey = "payment.#";
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
System.out.println(" [*] Binding queue '" + queueName + "' to exchange '" +
EXCHANGE_NAME + "' with binding key '" + bindingKey + "'");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [Payment Consumer] Received '" + routingKey + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
System.out.println(" [Payment Consumer] Waiting for payment messages...");
}
}
4. 消息确认机制、重试机制、死信队列
消息确认机制
1.1 生产者确认(Publisher Confirm)
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConfirmCallback;
public class PublisherConfirmExample {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 开启发布确认模式
channel.confirmSelect();
// 异步确认监听器
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
System.out.println("消息确认成功,标签:" + deliveryTag);
if (multiple) {
System.out.println("批量确认,到标签:" + deliveryTag);
}
};
ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
System.out.println("消息确认失败,标签:" + deliveryTag);
// 处理失败逻辑
};
channel.addConfirmListener(ackCallback, nackCallback);
// 同步确认(等待确认)
// channel.waitForConfirms();
// 异步确认(推荐)
channel.basicPublish("exchange", "routingKey", null, "消息".getBytes());
// 等待所有消息被确认
channel.waitForConfirmsOrDie(5000);
}
}
}
1.2 消费者确认(Consumer Acknowledgement)
import com.rabbitmq.client.*;
public class ConsumerAckExample {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 关闭自动确认,改为手动确认
boolean autoAck = false;
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
try {
// 处理消息
System.out.println("收到消息: " + message);
// 处理成功,手动确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
// 拒绝消息(单个拒绝,不重新入队)
// channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
// 拒绝消息(批量拒绝)
// channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
} catch (Exception e) {
// 处理失败,重新入队
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
}
};
channel.basicConsume("queueName", autoAck, deliverCallback, consumerTag -> {});
}
}
重试机制
2.1 Spring Boot 集成重试
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
@Configuration
public class RabbitMQConfig {
// 定义重试队列和死信交换器
@Bean
public Queue retryQueue() {
return QueueBuilder.durable("retry.queue")
.withArgument("x-dead-letter-exchange", "dlx.exchange")
.withArgument("x-dead-letter-routing-key", "dlx.routing.key")
.withArgument("x-message-ttl", 10000) // 10秒后进入死信队列
.build();
}
@Bean
public DirectExchange retryExchange() {
return new DirectExchange("retry.exchange");
}
@Bean
public Binding retryBinding() {
return BindingBuilder.bind(retryQueue())
.to(retryExchange())
.with("retry.routing.key");
}
// 配置重试策略
@Bean
public RabbitListenerContainerFactory<SimpleMessageListenerContainer> rabbitListenerContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
// 配置重试
factory.setAdviceChain(retryOperationsInterceptor());
return factory;
}
@Bean
public RetryOperationsInterceptor retryOperationsInterceptor() {
return RetryInterceptorBuilder.stateless()
.retryPolicy(retryPolicy())
.backOffPolicy(backOffPolicy())
.recoverer(recoverer())
.build();
}
@Bean
public SimpleRetryPolicy retryPolicy() {
Map<Class<? extends Throwable>, Boolean> retryableExceptions = new HashMap<>();
retryableExceptions.put(Exception.class, true);
return new SimpleRetryPolicy(3, retryableExceptions);
}
@Bean
public ExponentialBackOffPolicy backOffPolicy() {
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000);
backOffPolicy.setMultiplier(2.0);
backOffPolicy.setMaxInterval(10000);
return backOffPolicy;
}
@Bean
public MessageRecoverer recoverer() {
return new RepublishMessageRecoverer(rabbitTemplate(), "retry.exchange", "retry.routing.key");
}
}
2.2 手动重试实现
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MessageConsumer {
@Autowired
private RabbitTemplate rabbitTemplate;
private static final int MAX_RETRY_COUNT = 3;
@RabbitListener(queues = "main.queue")
public void handleMessage(Message message, Channel channel) throws IOException {
try {
// 处理消息
processMessage(message);
// 确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 获取重试次数
Integer retryCount = (Integer) message.getMessageProperties()
.getHeaders()
.getOrDefault("retry-count", 0);
if (retryCount < MAX_RETRY_COUNT) {
// 增加重试次数
message.getMessageProperties().getHeaders().put("retry-count", retryCount + 1);
// 延迟重试
sendToRetryQueue(message, retryCount + 1);
// 确认原消息(已转移到重试队列)
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} else {
// 超过最大重试次数,发送到死信队列
sendToDeadLetterQueue(message);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
}
private void processMessage(Message message) {
// 业务处理逻辑
String content = new String(message.getBody());
System.out.println("处理消息: " + content);
// 模拟处理失败
if (content.contains("error")) {
throw new RuntimeException("处理消息失败");
}
}
private void sendToRetryQueue(Message message, int retryCount) {
// 计算延迟时间(指数退避)
long delay = (long) (Math.pow(2, retryCount) * 1000);
// 设置延迟属性
message.getMessageProperties().setExpiration(String.valueOf(delay));
// 发送到延迟队列
rabbitTemplate.send("retry.exchange", "retry.routing.key", message);
}
private void sendToDeadLetterQueue(Message message) {
rabbitTemplate.send("dlx.exchange", "dlx.routing.key", message);
}
}
死信队列(DLQ)
3.1 死信队列配置
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DeadLetterConfig {
// 主队列(绑定死信交换器)
@Bean
public Queue mainQueue() {
return QueueBuilder.durable("main.queue")
.withArgument("x-dead-letter-exchange", "dlx.exchange") // 死信交换器
.withArgument("x-dead-letter-routing-key", "dlx.queue") // 死信路由键
.withArgument("x-max-length", 1000) // 队列最大长度
.withArgument("x-message-ttl", 60000) // 消息TTL(60秒)
.build();
}
// 死信交换器
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange("dlx.exchange");
}
// 死信队列
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable("dlx.queue").build();
}
// 死信队列绑定
@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue())
.to(deadLetterExchange())
.with("dlx.queue");
}
// 主交换器
@Bean
public DirectExchange mainExchange() {
return new DirectExchange("main.exchange");
}
// 主队列绑定
@Bean
public Binding mainBinding() {
return BindingBuilder.bind(mainQueue())
.to(mainExchange())
.with("main.routing.key");
}
}
3.2 死信队列消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class DeadLetterConsumer {
@RabbitListener(queues = "dlx.queue")
public void handleDeadLetter(Message message) {
String content = new String(message.getBody());
System.out.println("收到死信消息: " + content);
// 获取死信原因
String reason = message.getMessageProperties().getReceivedReason();
System.out.println("死信原因: " + reason);
// 处理死信(记录日志、人工干预、重试等)
handleDeadLetterMessage(message);
}
private void handleDeadLetterMessage(Message message) {
// 1. 记录到数据库
// 2. 发送告警
// 3. 人工处理
// 4. 尝试重新处理
}
}
完整示例:整合所有机制
4.1 配置类
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableRabbit
public class RabbitMQCompleteConfig {
// ========== 主队列和交换器 ==========
@Bean
public Queue mainQueue() {
return QueueBuilder.durable("order.queue")
.withArgument("x-dead-letter-exchange", "order.dlx.exchange")
.withArgument("x-dead-letter-routing-key", "order.dlx.queue")
.build();
}
@Bean
public DirectExchange mainExchange() {
return new DirectExchange("order.exchange");
}
@Bean
public Binding mainBinding() {
return BindingBuilder.bind(mainQueue())
.to(mainExchange())
.with("order.create");
}
// ========== 重试队列 ==========
@Bean
public Queue retryQueue() {
return QueueBuilder.durable("order.retry.queue")
.withArgument("x-dead-letter-exchange", "order.exchange")
.withArgument("x-dead-letter-routing-key", "order.create")
.withArgument("x-message-ttl", 5000) // 5秒后重试
.build();
}
@Bean
public DirectExchange retryExchange() {
return new DirectExchange("order.retry.exchange");
}
@Bean
public Binding retryBinding() {
return BindingBuilder.bind(retryQueue())
.to(retryExchange())
.with("order.retry");
}
// ========== 死信队列 ==========
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable("order.dlx.queue").build();
}
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange("order.dlx.exchange");
}
@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue())
.to(deadLetterExchange())
.with("order.dlx.queue");
}
// ========== 消息转换器 ==========
@Bean
public Jackson2JsonMessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(messageConverter());
template.setMandatory(true);
// 确认回调
template.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
System.out.println("消息发送成功: " + correlationData);
} else {
System.out.println("消息发送失败: " + cause);
}
});
// 返回回调
template.setReturnsCallback(returned -> {
System.out.println("消息路由失败: " + returned.getMessage());
});
return template;
}
}
4.2 生产者
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class OrderProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendOrder(Order order) {
CorrelationData correlationData = new CorrelationData(order.getOrderId());
rabbitTemplate.convertAndSend(
"order.exchange",
"order.create",
order,
message -> {
// 设置消息属性
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
message.getMessageProperties().setPriority(5);
return message;
},
correlationData
);
}
public void sendToRetry(Order order, int retryCount) {
// 设置重试次数
rabbitTemplate.convertAndSend(
"order.retry.exchange",
"order.retry",
order,
message -> {
message.getMessageProperties().setHeader("retry-count", retryCount);
return message;
}
);
}
}
4.3 消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.core.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class OrderConsumer {
private static final int MAX_RETRY_COUNT = 3;
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = "order.queue")
public void processOrder(Order order, Message message) {
try {
System.out.println("处理订单: " + order.getOrderId());
// 业务处理逻辑
boolean success = processBusiness(order);
if (!success) {
throw new RuntimeException("处理订单失败");
}
} catch (Exception e) {
handleFailure(order, message, e);
}
}
private boolean processBusiness(Order order) {
// 模拟业务处理
return !order.getOrderId().contains("FAIL");
}
private void handleFailure(Order order, Message message, Exception e) {
Integer retryCount = (Integer) message.getMessageProperties()
.getHeaders()
.getOrDefault("retry-count", 0);
if (retryCount < MAX_RETRY_COUNT) {
System.out.println("第" + (retryCount + 1) + "次重试,订单: " + order.getOrderId());
// 发送到重试队列
rabbitTemplate.convertAndSend(
"order.retry.exchange",
"order.retry",
order,
msg -> {
msg.getMessageProperties().setHeader("retry-count", retryCount + 1);
return msg;
}
);
} else {
System.out.println("达到最大重试次数,发送到死信队列,订单: " + order.getOrderId());
// 发送到死信队列
rabbitTemplate.convertAndSend(
"order.dlx.exchange",
"order.dlx.queue",
order
);
}
}
@RabbitListener(queues = "order.dlx.queue")
public void handleDeadLetter(Order order) {
System.out.println("处理死信订单: " + order.getOrderId());
// 记录日志、告警、人工处理等
}
}
5. 消息丢失、消息重复消费解决方案
1. 幂等性设计(最常用)
1.1 数据库唯一约束
@Component
public class MessageConsumer {
@Autowired
private MessageIdRepository messageIdRepository;
@RabbitListener(queues = "order.queue")
public void processOrder(OrderMessage message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
// 1. 检查消息是否已处理
if (messageIdRepository.existsById(message.getMessageId())) {
// 已处理,直接确认
channel.basicAck(deliveryTag, false);
return;
}
try {
// 2. 业务处理
orderService.createOrder(message);
// 3. 记录已处理的消息ID
messageIdRepository.save(new ProcessedMessage(
message.getMessageId(),
LocalDateTime.now()
));
// 4. 手动确认
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
// 处理失败,拒绝消息
channel.basicNack(deliveryTag, false, true);
}
}
}
1.2 Redis 实现幂等性
@Component
public class IdempotentConsumer {
@Autowired
private RedisTemplate<String, String> redisTemplate;
private static final String MESSAGE_PREFIX = "msg:processed:";
private static final long EXPIRE_TIME = 24 * 60 * 60; // 24小时
@RabbitListener(queues = "payment.queue")
public void processPayment(PaymentMessage message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
String messageId = message.getMessageId();
String redisKey = MESSAGE_PREFIX + messageId;
// 使用 SETNX 命令保证原子性
Boolean success = redisTemplate.opsForValue()
.setIfAbsent(redisKey, "1", EXPIRE_TIME, TimeUnit.SECONDS);
if (Boolean.FALSE.equals(success)) {
// 消息已处理
channel.basicAck(deliveryTag, false);
return;
}
try {
// 业务处理
paymentService.processPayment(message);
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
// 删除Redis记录,允许重试
redisTemplate.delete(redisKey);
channel.basicNack(deliveryTag, false, true);
}
}
}
2. 消息去重表设计
@Entity
@Table(name = "processed_messages")
public class ProcessedMessage {
@Id
private String messageId;
@Column(nullable = false)
private String businessType; // 业务类型,如: ORDER_CREATE
@Column(nullable = false)
private LocalDateTime processedTime;
@Column(nullable = false)
private String status; // PROCESSED, FAILED
}
3. 基于版本号的乐观锁
@Service
public class OrderService {
@Transactional
public void createOrderWithVersion(OrderMessage message) {
// 先查询订单是否存在
Order order = orderRepository.findByOrderNo(message.getOrderNo());
if (order != null) {
// 使用版本号判断是否已处理
if (order.getVersion() >= message.getVersion()) {
return; // 已处理更新的版本
}
}
// 创建或更新订单
order = createOrUpdateOrder(message);
// 设置版本号
order.setVersion(message.getVersion());
orderRepository.save(order);
}
}
4. 消息属性设置
4.1 生产者设置消息ID
@Component
public class MessageProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendOrderMessage(Order order) {
String messageId = UUID.randomUUID().toString();
MessageProperties properties = new MessageProperties();
properties.setMessageId(messageId);
properties.setTimestamp(new Date());
Message message = new Message(
objectMapper.writeValueAsBytes(order),
properties
);
rabbitTemplate.send("order.exchange", "order.key", message);
}
}
4.2 配置消费者确认模式
# application.yml
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual # 手动确认
prefetch: 1 # 每次只获取一条消息
retry:
enabled: true
max-attempts: 3
initial-interval: 1000
5. 完整解决方案示例
@Component
@Slf4j
public class ReliableMessageConsumer {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Autowired
private OrderService orderService;
/**
* 可靠的消息消费处理器
*/
@RabbitListener(queues = "reliable.queue")
public void handleMessage(Message message, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
String messageId = message.getMessageProperties().getMessageId();
String body = new String(message.getBody());
// 1. 消息去重检查
if (isMessageProcessed(messageId)) {
log.info("消息已处理,直接确认: {}", messageId);
ackMessage(channel, deliveryTag);
return;
}
try {
// 2. 业务处理
processBusiness(messageId, body);
// 3. 记录处理成功
markMessageAsProcessed(messageId);
// 4. 确认消息
ackMessage(channel, deliveryTag);
} catch (BusinessException e) {
// 业务异常,记录日志,确认消息
log.error("业务处理失败,消息丢弃: {}", messageId, e);
ackMessage(channel, deliveryTag);
} catch (Exception e) {
// 系统异常,重新入队
log.error("系统异常,消息重试: {}", messageId, e);
nackMessage(channel, deliveryTag);
}
}
private boolean isMessageProcessed(String messageId) {
String key = "msg:idempotent:" + messageId;
return Boolean.TRUE.equals(redisTemplate.hasKey(key));
}
private void markMessageAsProcessed(String messageId) {
String key = "msg:idempotent:" + messageId;
redisTemplate.opsForValue().set(key, "1", 2, TimeUnit.HOURS);
}
private void ackMessage(Channel channel, long deliveryTag) {
try {
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
log.error("消息确认失败", e);
}
}
private void nackMessage(Channel channel, long deliveryTag) {
try {
channel.basicNack(deliveryTag, false, true);
} catch (IOException e) {
log.error("消息拒绝失败", e);
}
}
}
6. 使用 Spring Cloud Stream
@Configuration
public class RabbitMQConfig {
@Bean
public Consumer<Message<OrderMessage>> orderConsumer() {
return message -> {
// 获取消息头中的ID
String messageId = (String) message.getHeaders()
.get(AmqpHeaders.MESSAGE_ID);
// 幂等性检查
if (idempotentChecker.isDuplicate(messageId)) {
return;
}
// 处理业务
orderService.process(message.getPayload());
// 记录处理
idempotentChecker.markAsProcessed(messageId);
};
}
}
7. 最佳实践建议
- 优先使用幂等性设计 :这是最根本的解决方案
- 消息设计 :
- 每个消息携带唯一ID
- 包含业务标识(如订单号)
- 添加时间戳和版本号
- 消费端配置 :
@Configuration
public class ConsumerConfig {
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory =
new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
// 手动确认
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 并发消费者数量
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
// 预取数量
factory.setPrefetchCount(1);
// 重试策略
factory.setAdviceChain(
RetryInterceptorBuilder.stateless()
.maxAttempts(3)
.backOffOptions(1000, 2.0, 10000)
.build()
);
return factory;
}
}
- 监控和告警 :
- 监控重复消息比例
- 设置消息积压告警
- 记录消息处理日志
选择哪种方案取决于你的业务场景:
- 对数据一致性要求高:使用数据库唯一约束
- 高性能场景:使用 Redis
- 简单业务:使用版本号控制
- 金融级业务:结合多种方案
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)