SpringBoot整合实现RabbitMQ

本文大纲
一.RabbitMQ介绍

二.RabbitMQ的工作原理
2.1 RabbitMQ的基本结构
2.2 组成部分说明
2.3 生产者发送消息流程
2.4 消费者接收消息流程

三.SpringBoot 整合实现RabbitMQ
3.1创建mq-rabbitmq-producer(生产者)发送消息
3.1.1pom.xml中添加相关的依赖
3.1.2 配置application.yml
3.1.3 配置RabbitMQ常量类
3.1.4 创建RabbitMQConfig配置类
3.1.5 创建生产者用于发送消息
3.1.6 创建一个类,用于模拟测试
3.2创建mq-rabbitmq-consumer(消费者)消费消息
3.2.1pom.xml中添加相关的依赖
3.2.2 配置application.yml
3.2.3 配置RabbitMQ常量类
3.2.4 创建RabbitMQConfig配置类
3.2.5 创建消费者消息监听
3.2.6 启动项目,监听器监听到生产者发送的消息,自动消费消息

一.RabbitMQ介绍

RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。支持Windows、Linux/Unix、MAC OS X操作系统和包括JAVA在内的多种编程语言。

二.RabbitMQ的工作原理

2.1 RabbitMQ的基本结构:

在这里插入图片描述

2.2 组成部分说明:
Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue
Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的
Producer:消息生产者,即生产方客户端,生产方客户端将消息发送
Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。
2.3 生产者发送消息流程:
1.生产者和Broker建立TCP连接。
2.生产者和Broker建立通道。
3.生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4.Exchange将消息转发到指定的Queue(队列)
2.4 消费者接收消息流程:
1.消费者和Broker建立TCP连接 
2.消费者和Broker建立通道
3.消费者监听指定的Queue(队列) 
4.当有消息到达Queue时Broker默认将消息推送给消费者。

三.SpringBoot 整合实现RabbitMQ

创建2个springboot项目,一个 mq-rabbitmq-producer(生产者),一个mq-rabbitmq-consumer(消费者)。

3.1创建mq-rabbitmq-producer(生产者)发送消息
3.1.1pom.xml中添加相关的依赖
<!--添加AMQP的启动器-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--web-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
3.1.2 配置application.yml
server:
  port: 8080
spring:
  application:
    name: mq-rabbitmq-producer
  #rabbitmq配置
  rabbitmq:
    host: localhost
    port: 5672
    #注意:guest用户只能链接本地服务器 比如localhost  不可以连接远程服务器
    username: guest
    password: guest
    #虚拟主机 一台机器可能有很多虚拟主机 这里选择默认配置 / 即可
    virtual-host: /
    #支持发布返回
    publisher-returns: true
    listener:
      # Routing 路由模型(交换机类型:direct)
      direct:
        #消息确认:手动签收
        acknowledge-mode: manual
        #当前监听容器数
        concurrency: 1
        #最大数
        max-concurrency: 10
        #是否支持重试
        retry:
          enabled: true
          #重试次数5,超过5次抛出异常
          max-attempts: 5
          #重试间隔 3s
          max-interval: 3000

采用 Routing 路由模型(交换机类型:direct)方式,实现RabbitMQ消息队列。

3.1.3 配置RabbitMQ常量类

配置直连交换机名称、消息队列名称、routingkey

package com.example.mqrabbitmqproducer.util.rabbitmq;

/**
 * RabbitMQ RoutingKey 常量工具类
 * @author qzz
 */
public class RabbitMQConstantUtil {

    /**
     * 交换机名称
     */
    public static final String DIRECT_EXCHANGE = "directExchange";

    /**
     * 取消订单 队列名称 routingkey
     */
    public static final String CANCEL_ORDER = "cancel-order";

    /**
     * 自动确认订单 队列名称\routingkey
     */
    public static final String CONFIRM_ORDER = "confirm-order";


}

注意:这里把消息队列名称和routingkey设置为同名。

3.1.4 创建RabbitMQConfig配置类

rabbitmq配置类:配置Exchange、Queue、以及绑定交换机

package com.example.mqrabbitmqproducer.util.rabbitmq.config;

import com.example.mqrabbitmqproducer.util.rabbitmq.RabbitMQConstantUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * rabbitmq配置类:配置Exchange、Queue、以及绑定交换机
 * @author qzz
 */
@Configuration
@EnableRabbit
public class RabbitMQConfig {

    private static final Logger log = LoggerFactory.getLogger(RabbitMQConfig.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        //SimpleRabbitListenerContainerFactory发现消息中有content_type有text就会默认将其转换成string类型的
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
         /**
         * 比较常用的 Converter 就是 Jackson2JsonMessageConverter,在发送消息时,它会先将自定义的消息类序列化成json格式,
         * 再转成byte构造 Message,在接收消息时,会将接收到的 Message 再反序列化成自定义的类
         */
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        //开启手动ACK
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }

    @Bean
    public AmqpTemplate amqpTemplate(){
        rabbitTemplate.setEncoding("UTF-8");
        rabbitTemplate.setMandatory(true);
        /**
         * ReturnsCallback消息没有正确到达队列时触发回调,如果正确到达队列不执行
         * config : 需要开启rabbitmq发送失败回退 
         * yml配置publisher-returns: true 
         * 或rabbitTemplate.setMandatory(true);设置为true
         */
        rabbitTemplate.setReturnsCallback(returnedMessage -> {
            String messageId = returnedMessage.getMessage().getMessageProperties().getMessageId();
            byte[] message = returnedMessage.getMessage().getBody();
            Integer replyCode = returnedMessage.getReplyCode();
            String replyText = returnedMessage.getReplyText();
            String exchange = returnedMessage.getExchange();
            String routingKey = returnedMessage.getRoutingKey();

            log.info("消息:{} 发送失败,消息ID:{} 应答码:{} 原因:{} 交换机:{} 路由键:{}",
                    new String(message),messageId,replyCode,replyText,exchange,routingKey);

        });
        return rabbitTemplate;
    }

    /**
     * 声明直连交换机  支持持久化
     * @return
     */
    @Bean(RabbitMQConstantUtil.DIRECT_EXCHANGE)
    public Exchange directExchange(){
        return ExchangeBuilder.directExchange(RabbitMQConstantUtil.DIRECT_EXCHANGE).durable(true).build();
    }

    /**
     * 取消订单 消息队列
     * @return
     */
    @Bean(RabbitMQConstantUtil.CANCEL_ORDER)
    public Queue cancelOrderQueue(){
        return new Queue(RabbitMQConstantUtil.CANCEL_ORDER,true,false,true);
    }

    /**
     * 把取消订单消息队列绑定到交换机上
     * @param queue
     * @param directExchange
     * @return
     */
    @Bean
    public Binding cancelOrderBinding(@Qualifier(RabbitMQConstantUtil.CANCEL_ORDER) Queue queue,
                                      @Qualifier(RabbitMQConstantUtil.DIRECT_EXCHANGE) Exchange directExchange){
        //RoutingKey :RabbitMQConstantUtil.CANCEL_ORDER,这里设置与消息队列 同名
        return BindingBuilder.bind(queue).to(directExchange).with(RabbitMQConstantUtil.CANCEL_ORDER).noargs();
    }

    /**
     * 自动确认订单 消息队列
     * @return
     */
    @Bean(RabbitMQConstantUtil.CONFIRM_ORDER)
    public Queue confirmOrderQueue(){
        return new Queue(RabbitMQConstantUtil.CONFIRM_ORDER,true,false,true);
    }

    /**
     * 把自动确认订单消息队列绑定到交换机上
     * @param queue
     * @param directExchange
     * @return
     */
    @Bean
    public Binding confirmOrderBinding(@Qualifier(RabbitMQConstantUtil.CONFIRM_ORDER) Queue queue,
                                       @Qualifier(RabbitMQConstantUtil.DIRECT_EXCHANGE) Exchange directExchange){
        //RoutingKey :RabbitMQConstantUtil.CANCEL_ORDER,这里设置与消息队列 同名
        return BindingBuilder.bind(queue).to(directExchange).with(RabbitMQConstantUtil.CONFIRM_ORDER).noargs();
    }
}
3.1.5 创建生产者用于发送消息
package com.example.mqrabbitmqproducer.util.rabbitmq;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;

/**
 * Routing 路由模型(交换机类型:direct)
 * 消息生成者
 * @author qzz
 */
@Component
public class DirectSender {

    private static final Logger log = LoggerFactory.getLogger(DirectSender.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送消息
     * @param routingKey
     * @param msg
     */
    public void send (String routingKey,String msg){
        Message message = MessageBuilder.withBody(msg.getBytes())
                .setContentType(MessageProperties.CONTENT_TYPE_JSON)
                .setContentEncoding("utf-8")
                .setMessageId(UUID.randomUUID()+"").build();

        log.info("【发送者】消息内容【{}】 交换机【{}】 路由【{}】 消息ID【{}】",msg,RabbitMQConstantUtil.DIRECT_EXCHANGE
                ,routingKey,message.getMessageProperties().getMessageId());
        rabbitTemplate.convertAndSend(RabbitMQConstantUtil.DIRECT_EXCHANGE,routingKey,message);
    }
}
3.1.6 创建一个类,用于模拟测试
package com.example.mqrabbitmqproducer.controller;

import com.alibaba.fastjson.JSONObject;
import com.example.mqrabbitmqproducer.util.rabbitmq.DirectSender;
import com.example.mqrabbitmqproducer.util.rabbitmq.RabbitMQConstantUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;
import java.util.Map;

/**
 * 模拟测试消息发送
 * @author qzz
 */
@RestController
@RequestMapping("/order")
public class TestRabbitMQSendMsg {

    /**
     * rabbitMQ消息发送
     */
    @Autowired
    private DirectSender directSender;

    /**
     * 测试取消订单,发送消息
     */
    @GetMapping("/cancel")
    public void cancel(){

        //取消订单逻辑省略

        //取消订单,发送消息
        Map<String, Object> map = new HashMap<>();
        map.put("order_number","4364756867987989");
        map.put("product_id","1");
        directSender.send(RabbitMQConstantUtil.CANCEL_ORDER, JSONObject.toJSONString(map));
    }

    /**
     * 测试自动确认订单,发送消息
     */
    @GetMapping("/confirm")
    public void confirm(){

        //自动确认订单,发送消息
        String order_number="4364756867987989";
        directSender.send(RabbitMQConstantUtil.CONFIRM_ORDER, order_number);
    }
}

启动项目,进行测试:
(1)在postman中输入 http://localhost:8080/order/cancel,进行测试:

(2)在postman中输入 http://localhost:8080/order/confirm,进行测试:

3.2创建mq-rabbitmq-consumer(消费者)消费消息
3.2.1pom.xml中添加相关的依赖
 <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-web</artifactId>
 </dependency>
 <!--添加AMQP的启动器-->
 <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>
3.2.2 配置application.yml
server:
  port: 8083
spring:
  application:
    name: mq-rabbitmq-consumer
  #rabbitmq配置
  rabbitmq:
    host: localhost
    port: 5672
    #注意:guest用户只能链接本地服务器 比如localhost  不可以连接远程服务器
    username: guest
    password: guest
    #虚拟主机 一台机器可能有很多虚拟主机 这里选择默认配置 / 即可
    virtual-host: /
3.2.3 配置RabbitMQ常量类

配置直连交换机名称、消息队列名称、routingkey

package com.example.mqrabbitmqconsumer.util.rabbitmq;

/**
 * RabbitMQ RoutingKey 常量工具类
 * @author qzz
 */
public class RabbitMQConstantUtil {

    /**
     * 交换机名称
     */
    public static final String DIRECT_EXCHANGE = "directExchange";

    /**
     * 取消订单 队列名称 \routingkey
     */
    public static final String CANCEL_ORDER = "cancel-order";

    /**
     * 自动确认订单 队列名称\routingkey
     */
    public static final String CONFIRM_ORDER = "confirm-order";


}

注意:这里把消息队列名称和routingkey设置为同名。

3.2.4 创建RabbitMQConfig配置类

rabbitmq配置类:配置Exchange、Queue、以及绑定交换机

package com.example.mqrabbitmqconsumer.util.rabbitmq.config;

import com.example.mqrabbitmqconsumer.util.rabbitmq.RabbitMQConstantUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * rabbitmq配置类:配置Exchange、Queue、以及绑定交换机
 * @author qzz
 */
@Configuration
@EnableRabbit
public class RabbitMQConfig {

    private static final Logger log = LoggerFactory.getLogger(RabbitMQConfig.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
       //SimpleRabbitListenerContainerFactory发现消息中有content_type有text就会默认将其转换成string类型的
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        /**
         * 比较常用的 Converter 就是 Jackson2JsonMessageConverter,在发送消息时,它会先将自定义的消息类序列化成json格式,
         * 再转成byte构造 Message,在接收消息时,会将接收到的 Message 再反序列化成自定义的类
         */
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        //开启手动ACK
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }

    @Bean
    public AmqpTemplate amqpTemplate(){
        rabbitTemplate.setEncoding("UTF-8");
        rabbitTemplate.setMandatory(true);
        /**
         * ReturnsCallback消息没有正确到达队列时触发回调,如果正确到达队列不执行
         * config : 需要开启rabbitmq发送失败回退     
         * yml配置publisher-returns: true    
         * 或rabbitTemplate.setMandatory(true);设置为true
         */
        rabbitTemplate.setReturnsCallback(returnedMessage -> {
            String messageId = returnedMessage.getMessage().getMessageProperties().getMessageId();
            byte[] message = returnedMessage.getMessage().getBody();
            Integer replyCode = returnedMessage.getReplyCode();
            String replyText = returnedMessage.getReplyText();
            String exchange = returnedMessage.getExchange();
            String routingKey = returnedMessage.getRoutingKey();

            log.info("消息:{} 发送失败,消息ID:{} 应答码:{} 原因:{} 交换机:{} 路由键:{}",
                    new String(message),messageId,replyCode,replyText,exchange,routingKey);

        });
        return rabbitTemplate;
    }

    /**
     * 声明直连交换机  支持持久化
     * @return
     */
    @Bean(RabbitMQConstantUtil.DIRECT_EXCHANGE)
    public Exchange directExchange(){
        return ExchangeBuilder.directExchange(RabbitMQConstantUtil.DIRECT_EXCHANGE).durable(true).build();
    }

    /**
     * 取消订单 消息队列
     * @return
     */
    @Bean(RabbitMQConstantUtil.CANCEL_ORDER)
    public Queue cancelOrderQueue(){
        return new Queue(RabbitMQConstantUtil.CANCEL_ORDER,true,false,true);
    }

    /**
     * 把取消订单消息队列绑定到交换机上
     * @param queue
     * @param directExchange
     * @return
     */
    @Bean
    public Binding cancelOrderBinding(@Qualifier(RabbitMQConstantUtil.CANCEL_ORDER) Queue queue,
                                      @Qualifier(RabbitMQConstantUtil.DIRECT_EXCHANGE) Exchange directExchange){
        //RoutingKey :RabbitMQConstantUtil.CANCEL_ORDER,这里设置与消息队列 同名
        return BindingBuilder.bind(queue).to(directExchange).with(RabbitMQConstantUtil.CANCEL_ORDER).noargs();
    }

    /**
     * 自动确认订单 消息队列
     * @return
     */
    @Bean(RabbitMQConstantUtil.CONFIRM_ORDER)
    public Queue confirmOrderQueue(){
        return new Queue(RabbitMQConstantUtil.CONFIRM_ORDER,true,false,true);
    }

    /**
     * 把自动确认订单消息队列绑定到交换机上
     * @param queue
     * @param directExchange
     * @return
     */
    @Bean
    public Binding confirmOrderBinding(@Qualifier(RabbitMQConstantUtil.CONFIRM_ORDER) Queue queue,
                                       @Qualifier(RabbitMQConstantUtil.DIRECT_EXCHANGE) Exchange directExchange){
        //RoutingKey :RabbitMQConstantUtil.CANCEL_ORDER,这里设置与消息队列 同名
        return BindingBuilder.bind(queue).to(directExchange).with(RabbitMQConstantUtil.CONFIRM_ORDER).noargs();
    }
}
3.2.5 创建消费者消息监听

(1)监听取消订单

package com.example.mqrabbitmqconsumer.listener;

import com.example.mqrabbitmqconsumer.util.rabbitmq.RabbitMQConstantUtil;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


/**
 * 监听取消订单
 * @author qzz
 */
@Component
public class RabbitMQCancelOrderListener {

    private static final Logger log = LoggerFactory.getLogger(RabbitMQCancelOrderListener.class);

    /**
     * 接受消息
     * @param channel
     * @param message
     * @throws Exception
     */
    @RabbitHandler
    @RabbitListener(queues = RabbitMQConstantUtil.CANCEL_ORDER)
    public void receiverMsg(Channel channel, Message message) throws Exception {
        //body 即消息体
        String msg = new String(message.getBody());
        String messageId = message.getMessageProperties().getMessageId();
        log.info("【消费者】 消息内容:【{}】。messageId 【{}】",msg, messageId);

        try{
            //如果有业务逻辑,则在这里编写


            //告诉服务器收到这条消息 无需再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }catch (Exception e){
            log.error("消息处理出现异常:{}",e.getMessage());
            //告诉消息服务器 消息处理异常,消息需要重新再次发送!
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false, true);
        }
    }
}

(2)监听自动确认订单

package com.example.mqrabbitmqconsumer.listener;

import com.example.mqrabbitmqconsumer.util.rabbitmq.RabbitMQConstantUtil;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


/**
 * 监听自动确认订单
 * @author qzz
 */
@Component
public class RabbitMQConfirmOrderListener {

    private static final Logger log = LoggerFactory.getLogger(RabbitMQConfirmOrderListener.class);

    /**
     * 接受消息
     * @param channel
     * @param message
     * @throws Exception
     */
    @RabbitHandler
    @RabbitListener(queues = RabbitMQConstantUtil.CONFIRM_ORDER)
    public void receiverMsg(Channel channel, Message message) throws Exception {
        //body 即消息体
        String msg = new String(message.getBody());
        String messageId = message.getMessageProperties().getMessageId();
        log.info("【消费者】 消息内容:【{}】。messageId 【{}】",msg, messageId);

        try{
            //如果有业务逻辑,则在这里编写


            //告诉服务器收到这条消息 无需再发了 否则消息服务器以为这条消息没处理掉 后续还会在发
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }catch (Exception e){
            log.error("消息处理出现异常:{}",e.getMessage());
            //告诉消息服务器 消息处理异常,消息需要重新再次发送!
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false, true);
        }
    }
}
3.2.6 启动项目,监听器监听到生产者发送的消息,自动消费消息
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐