RabbitMQ 是基于 AMQP 协议 的开源消息中间件,用于实现系统间的异步解耦、流量削峰、消息分发,是微服务架构中最常用的消息队列之一。它作为后端开发常用的消息中间件,如何与实际项目整合并良好的运用是一个重要的关注点,下面来介绍一下它在SpringBoot项目中的嵌入。

一、RabbitMQ是什么?

        RabbitMQ 是一个在 AMQP(Advanced Message Queuing Protocol )基础上实现的可复用的企业消息系统。它可以用于大型软件系统各个模块之间的高效通信,支持高并发、可扩展。同时它支持多种客户端如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX,持久化,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

RabbitMQ的整体模型架构图:

RabbitMQ的核心概念:

  • 生产者(Producer):发送消息的程序
  • 消费者(Consumer):接收并处理消息的程序
  • 队列(Queue):存储消息的容器,消息最终存在队列中
  • 交换机(Exchange):消息路由中转站,决定消息发送到哪个队列
  • 路由键(RoutingKey):生产者发送消息时指定,交换机根据它匹配队列
  • 绑定(Binding):交换机和队列的关联关系

二、前置环境配置

1.maven 依赖引入

<!-- RabbitMQ Java 客户端 -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.18.0</version>
</dependency>

2.连接工具类

所有模式都使用这个工具类创建连接和通道,通道(Channel)是 RabbitMQ 通信的最小单元

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMQUtil {
    private static final String HOST = "localhost";
    private static final int PORT = 5672;
    private static final String USERNAME = "guest";
    private static final String PASSWORD = "guest";
    private static ConnectionFactory factory;

    // 静态代码块初始化连接工厂
    static {
        factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
    }

    // 获取连接
    public static Connection getConnection() throws Exception {
        return factory.newConnection();
    }
}

三、RabbitMQ在项目中的工作模式

1.简单的通信模式

生产者(消息发送方):

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class SimpleProducer {
    // 队列名称
    private static final String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws Exception {
        // 1. 获取连接
        Connection connection = RabbitMQUtil.getConnection();
        // 2. 创建通道
        Channel channel = connection.createChannel();

        // 3. 声明队列:队列名、持久化、排他、自动删除、参数
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 4. 发送消息
        String message = "Hello RabbitMQ 简单模式!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("消息发送成功:" + message);

        // 5. 关闭资源
        channel.close();
        connection.close();
    }
}

消费者(消息接收方):

import com.rabbitmq.client.*;

public class SimpleConsumer {
    private static final String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 监听消息并消费
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) {
                String message = new String(body);
                System.out.println("接收到消息:" + message);
            }
        };

        // 监听队列:队列名、自动确认、消费者
        channel.basicConsume(QUEUE_NAME, true, consumer);
        System.out.println("消费者已启动,等待消息...");
    }
}

2.工作队列模式

一个生产者,多个消费者,消息平均分配(轮询分发),用于任务异步处理。消息会被轮询分配给不同消费者,避免单节点压力过大。

3.发布/订阅模式

一个生产者,多个消费者,一条消息广播给所有消费者(类似群发)。使用 fanout 交换机,不处理路由键,绑定的队列都能收到消息。

生产者:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class PubSubProducer {
    private static final String EXCHANGE_NAME = "fanout_exchange";

    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明fanout类型交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String message = "广播消息:所有消费者都能收到!";
        // 发送到交换机,路由键为空
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println("广播消息发送成功");

        channel.close();
        connection.close();
    }
}

消费者:

import com.rabbitmq.client.*;

public class PubSubConsumer1 {
    private static final String EXCHANGE_NAME = "fanout_exchange";
    private static final String QUEUE_NAME = "fanout_queue1";

    public static void main(String[] args) throws Exception {
        Connection connection = RabbitMQUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 队列绑定交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 消费逻辑
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) {
                System.out.println("消费者1收到:" + new String(body));
            }
        };
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

4.路由模式

生产者指定路由键,交换机精准匹配路由键,只有匹配的队列能收到消息。使用 direct 交换机。

示例:错误日志分级推送(error/info/warn)

// 生产者发送error级别消息
channel.basicPublish(EXCHANGE_NAME, "error", null, message.getBytes());

// 消费者只绑定error路由键
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");

5.主题模式

路由键支持通配符匹配,最灵活的模式,企业开发最常用。

  • #:匹配 0 个或多个单词
  • *:匹配一个单词
示例:
  • 路由键:user.log.error
  • 匹配规则:user.##.erroruser.*.*

6.RPC模式

客户端发送消息,服务端处理后返回结果,实现远程调用。(实际开发中常用 OpenFeign,RPC 模式了解即可)

四、RabbitMQ消息持久化

防止 RabbitMQ 重启后消息丢失,三步实现持久化

  • 队列持久化:queueDeclare(QUEUE_NAME, true, ...)
  • 消息持久化:MessageProperties.PERSISTENT_TEXT_PLAIN
  • 交换机持久化:exchangeDeclare(EXCHANGE_NAME, "fanout", true)

持久化生产者代码:

// 队列持久化
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 消息持久化
channel.basicPublish("", QUEUE_NAME,
        MessageProperties.PERSISTENT_TEXT_PLAIN,
        message.getBytes());

五、消息确认机制

1. 生产者确认(Confirm)

确保消息成功发送到 RabbitMQ 服务器。

2. 消费者手动确认(Ack)

确保消息被正确消费,避免消费失败丢失消息。

消费者手动确认代码:
// 关闭自动确认:autoAck = false
channel.basicConsume(QUEUE_NAME, false, consumer);

@Override
public void handleDelivery(...) {
    try {
        // 业务处理
        // 手动确认消息:消息标签、批量确认
        channel.basicAck(envelope.getDeliveryTag(), false);
    } catch (Exception e) {
        // 拒绝消息,重新入队
        channel.basicNack(envelope.getDeliveryTag(), false, true);
    }
}

以上便是RabbitMQ在Java项目中的具体应用和一些核心特性。

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐