RabbitMQ 核心知识点与 Java 实战笔记
RabbitMQ 是基于 AMQP 协议 的开源消息中间件,用于实现系统间的异步解耦、流量削峰、消息分发,是微服务架构中最常用的消息队列之一。它作为后端开发常用的消息中间件,如何与实际项目整合并良好的运用是一个重要的关注点,下面来介绍一下它在SpringBoot项目中的嵌入。
一、RabbitMQ是什么?
RabbitMQ 是一个在 AMQP(Advanced Message Queuing Protocol )基础上实现的可复用的企业消息系统。它可以用于大型软件系统各个模块之间的高效通信,支持高并发、可扩展。同时它支持多种客户端如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX,持久化,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
RabbitMQ的整体模型架构图:

RabbitMQ的核心概念:
- 生产者(Producer):发送消息的程序
- 消费者(Consumer):接收并处理消息的程序
- 队列(Queue):存储消息的容器,消息最终存在队列中
- 交换机(Exchange):消息路由中转站,决定消息发送到哪个队列
- 路由键(RoutingKey):生产者发送消息时指定,交换机根据它匹配队列
- 绑定(Binding):交换机和队列的关联关系
二、前置环境配置
1.maven 依赖引入
<!-- RabbitMQ Java 客户端 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.18.0</version>
</dependency>
2.连接工具类
所有模式都使用这个工具类创建连接和通道,通道(Channel)是 RabbitMQ 通信的最小单元。
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMQUtil {
private static final String HOST = "localhost";
private static final int PORT = 5672;
private static final String USERNAME = "guest";
private static final String PASSWORD = "guest";
private static ConnectionFactory factory;
// 静态代码块初始化连接工厂
static {
factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
}
// 获取连接
public static Connection getConnection() throws Exception {
return factory.newConnection();
}
}
三、RabbitMQ在项目中的工作模式
1.简单的通信模式
生产者(消息发送方):
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class SimpleProducer {
// 队列名称
private static final String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws Exception {
// 1. 获取连接
Connection connection = RabbitMQUtil.getConnection();
// 2. 创建通道
Channel channel = connection.createChannel();
// 3. 声明队列:队列名、持久化、排他、自动删除、参数
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 4. 发送消息
String message = "Hello RabbitMQ 简单模式!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("消息发送成功:" + message);
// 5. 关闭资源
channel.close();
connection.close();
}
}
消费者(消息接收方):
import com.rabbitmq.client.*;
public class SimpleConsumer {
private static final String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 监听消息并消费
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) {
String message = new String(body);
System.out.println("接收到消息:" + message);
}
};
// 监听队列:队列名、自动确认、消费者
channel.basicConsume(QUEUE_NAME, true, consumer);
System.out.println("消费者已启动,等待消息...");
}
}
2.工作队列模式
一个生产者,多个消费者,消息平均分配(轮询分发),用于任务异步处理。消息会被轮询分配给不同消费者,避免单节点压力过大。
3.发布/订阅模式
一个生产者,多个消费者,一条消息广播给所有消费者(类似群发)。使用 fanout 交换机,不处理路由键,绑定的队列都能收到消息。
生产者:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class PubSubProducer {
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
// 声明fanout类型交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = "广播消息:所有消费者都能收到!";
// 发送到交换机,路由键为空
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println("广播消息发送成功");
channel.close();
connection.close();
}
}
消费者:
import com.rabbitmq.client.*;
public class PubSubConsumer1 {
private static final String EXCHANGE_NAME = "fanout_exchange";
private static final String QUEUE_NAME = "fanout_queue1";
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 队列绑定交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 消费逻辑
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) {
System.out.println("消费者1收到:" + new String(body));
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
4.路由模式
生产者指定路由键,交换机精准匹配路由键,只有匹配的队列能收到消息。使用 direct 交换机。
示例:错误日志分级推送(error/info/warn)
// 生产者发送error级别消息
channel.basicPublish(EXCHANGE_NAME, "error", null, message.getBytes());
// 消费者只绑定error路由键
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
5.主题模式
路由键支持通配符匹配,最灵活的模式,企业开发最常用。
#:匹配 0 个或多个单词*:匹配一个单词
示例:
- 路由键:
user.log.error - 匹配规则:
user.#、#.error、user.*.*
6.RPC模式
客户端发送消息,服务端处理后返回结果,实现远程调用。(实际开发中常用 OpenFeign,RPC 模式了解即可)
四、RabbitMQ消息持久化
防止 RabbitMQ 重启后消息丢失,三步实现持久化:
- 队列持久化:
queueDeclare(QUEUE_NAME, true, ...) - 消息持久化:
MessageProperties.PERSISTENT_TEXT_PLAIN - 交换机持久化:
exchangeDeclare(EXCHANGE_NAME, "fanout", true)
持久化生产者代码:
// 队列持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 消息持久化
channel.basicPublish("", QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
五、消息确认机制
1. 生产者确认(Confirm)
确保消息成功发送到 RabbitMQ 服务器。
2. 消费者手动确认(Ack)
确保消息被正确消费,避免消费失败丢失消息。
消费者手动确认代码:
// 关闭自动确认:autoAck = false
channel.basicConsume(QUEUE_NAME, false, consumer);
@Override
public void handleDelivery(...) {
try {
// 业务处理
// 手动确认消息:消息标签、批量确认
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
// 拒绝消息,重新入队
channel.basicNack(envelope.getDeliveryTag(), false, true);
}
}
以上便是RabbitMQ在Java项目中的具体应用和一些核心特性。
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐


所有评论(0)