RabbitMQ

课程内容

  • 认识RabbitMQ
  • 安装RabbitMQ
  • SpringBoot使用RabbitMQ
  • 其他特性

一.RabbitMQ入门

1.RabbitMQ认识

1.1.RabbitMQ是什么

MQ全称为Message Queue,即消息队列. 它也是一个队列,遵循FIFO原则 。RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue Protocol高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开 发中应用非常广泛。官方地址:http://www.rabbitmq.com/

1.2.RabbitMQ的使用场景

开发中消息队列通常有如下应用场景: 消峰,解耦,提速,大数据处理

  • 消除峰值:用于高并发场景消除峰值,让并发请求在mq中进行排队

  • 大数据处理:由于数据量太大,程序一时处理不过来,可以通过把数据放入MQ,多开几个消费者去处理消息,比如:日志收集等

  • 服务异步/解耦 :服务之间通过RPC进行通信的方式是同步方式,服务消费方需要等到服务提供方相应结果后才可以继续执行,使用MQ之后的服务通信是异步的,服务之间没有直接的调用关系,而是通过队列进行服务通信, 应用程序解耦合 MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。

  • 排序保证 FIFO :遵循队列先进先出的特点,可以保证数据按顺序消费

除此之外使用MQ还可以达到:提高系统响应速度,提高系统稳定性的目的。 将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。 提高了应用程序的响应时间。另外如果系统挂了也没关系,数据放到消息队列.后续可以继续消费

但是需要注意的是:对数据的一致性要求较高的业务场景不适合使用MQ,因为MQ具有一定的数据延迟

1.3.AMQP协议

AMQP是一套公开的消息队列协议,最早在2003年被提出,它旨在从协议层定义消息通信数据的标准格式, 为的就是解决MQ市场上协议不统一的问题,RabbitMQ就是遵循AMQP标准协议开发的MQ服务。 官方:http://www.amqp.org

1.4.JMS是什么 ?

JMS是Java消息服务,是java提供的一套消息服务API标准,其目的是为所有的java应用程序提供统一的消息通信的标准,类似java的 jdbc,只要遵循jms标准的应用程序之间都可以进行消息通信。它和AMQP有什么 不同,jms是java语言专属的消 息服务标准,它是在api层定义标准,并且只能用于java应用;而AMQP是在协议层定义的标准,是跨语言的 。

2.RabbitMQ的工作流程

2.1.RabbitMQ中核心概念
  • Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue。

  • Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。exchange有下面四种(先了解:fanout,direct,topics,header)

  • Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。

  • Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。

  • Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。

2.2.RabbitMQ的工作流程

在这里插入图片描述

消息发布接收流程:

1.发送消息

1、生产者和Broker建立TCP连接。

2、生产者和Broker建立通道。

3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。

4、Exchange将消息转发到指定的Queue(队列)

消息接收消息

1、消费者和Broker建立TCP连接

2、消费者和Broker建立通道

3、消费者监听指定的Queue(队列)

4、当有消息到达Queue时Broker默认将消息推送给消费者。

5、消费者接收到消息。

3.RabbitMQ安装

3.1.Docker安装RabbitMQ
1.下载docker镜像
docker pull rabbitmq:3-management
2.启动容器

需要开放端口:5672是程序连接的端口,15672是可视化界面接口

docker run -id --name=rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
3.访问管理界面

安装好之后,访问:15672界面如下,账号和密码都是 guest

在这里插入图片描述

二.SpringBoot中使用RabbitMQ

1.SpringBoot整合RabbitMQ

1.1.导入依赖

第一步需要导入mq的基础依赖,SpringBoot使用的是2.6.13

	<!--SpringBoot依赖-->
    <parent>
        <groupId> org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.13</version>
    </parent>

	<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>
2.2.配置RabbitMQ

第二步:配置mq,主要是配置连接信息

server:
  port: 10200
spring:
  application:
    name: rabbitmq‐application
  rabbitmq:
    host: 60.204.187.34
    port: 5672
    username: guest
    password: guest
    virtualHost: /

编写启动类,省略…

2.HelloWorld(直连模型)

2.1.模型认识

rabbitMQ提供了7种消息模型。https://www.rabbitmq.com/tutorials

在这里插入图片描述

我们使用Hello World 案例来入门,这种模式比较简单,只需要一个生产者,一个队列,一个消费者即可

在这里插入图片描述

  • P:生产者,也就是要发送消息的程序
  • C:消费者:消息的接受者,会一直等待消息到来。
  • queue:消息队列,用来存储消息的,生产者把消息发送到队列中,消费者从队列中消费消息。类似一个邮箱,可以缓存消息;

我们需要做如下事情

  1. 创建一个队列
  2. 编写消费者,监听该队列
  3. 编写生产者发送消息,指定该队列名
2.2.配置队列

在SpringBoot中交换机和队列的创建都通过Bean的方式来进行,下面定义了一个队列,名字为hello:

@Configuration
public class RabbitmqConfig {
    //定义消息队列的名字
    public static final String NAME_HELLO = "queue_hello";

    @Bean
    public Queue queue() {
        //创建一个队列队列,并指定队列的名字
        return new Queue(NAME_HELLO,true);
    }
}
2.3.编写消费者

rabbitmq通过@RabbitListener(queues = {队列名}) 来监听队列,从而消费消息

@Component
public class ReceiveHandler {

    //监听NAME_HELLO队列
    @RabbitListener(queues = {RabbitmqConfig.NAME_HELLO})
    public void receiveHelloQueueMessage(String msg, Message message, Channel channel) {
        System.out.println("消费者收到消息:"+msg);
    }
}
2.4.编写MQ生产者

通过注入:RabbitTemplate发送消息,以及消息内容。

@RestController
public class SenderController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostMapping("/sender/hello/{message}")
    public String senderHello(@PathVariable String message) {
        /**
         * 参数说明
         * exchnage: 交换机,默认交换机指定为“”即可
         * routingKey :发送消息的路由键,该模式下使用队列名即可
         * message:消息的内容
         */
        rabbitTemplate.convertAndSend("", RabbitmqConfig.NAME_HELLO,message);
        return "success";
    }
}

注意:这个的交换机使用的是默认的交换机"" ,路由键直接指定为队列的名字。其实在MQ中是提供了几个默认的交换机,当我们把交换机指定为 “” , 就会使用默认的交换机来转发消息,而我们创建的队列会和默认的交换机进行绑定,如下:

在这里插入图片描述

下面是绑定关系图

在这里插入图片描述

2.5.测试

启动程序访问controller进行测试,控制台可以看到消费者打印的日志,打开MQ的可视化界面可以看到创建的队列,之所以里面没有消息是因为消息被消费了。

在这里插入图片描述

3.WorkQueue(工作队列)

3.1.WorkQueue模型认识

Work queues,也被称为(Task queues),任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。此时就可以使用work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的,即:同一个消息只会被一个消费者消费

在这里插入图片描述

3.2.代码演示

WorkQueue和HelloWorld本身无区别,只是在HelloWorld的基础上多增加消费者而已,如下:

@Component
public class ReceiveHandler {

    //监听NAME_HELLO队列
    @RabbitListener(queues = {RabbitmqConfig.NAME_HELLO})
    public void receive1(String msg, Message message, Channel channel) {
        System.out.println("消费者1收到消息:"+msg);
    }

    //监听NAME_HELLO队列
    @RabbitListener(queues = {RabbitmqConfig.NAME_HELLO})
    public void receive2(String msg, Message message, Channel channel) {
        System.out.println("消费者2收到消息:"+msg);
    }
}

连续多次发送消息MQ会使用轮询方式把消息评价分配给多个消费者

在这里插入图片描述

3.3.指定拉取数量

这种消费模式有一个问题,当某个消费者消费能力偏弱会导致后续的消息阻塞,我们可以通过 prefetch 来指定消费者每次只能拉取一个消息,这样的话当某个消费者正在忙碌,那么MQ会把消息推送给别的消费者,防止消息在某个消费者身上发生阻塞。

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1

4.fanout-广播模型

4.1.模型认识

在上面的案例中,我们采用一个队列来发送消息,及时同一个队列监听了多个消费者,同一个消息也只会给到其中一个消费者,而发布订阅模型允许一个消息向多个消费者投递。而对于:fanout , direct , topics都属于发布订阅模型。

在这里插入图片描述

RabbitMQ的exchnage正好有4中类型,就对应了上述的几种订阅模型,源码如下:

public abstract class ExchangeTypes {
    public static final String DIRECT = "direct";
    public static final String TOPIC = "topic";
    public static final String FANOUT = "fanout";
    public static final String HEADERS = "headers";
    public static final String SYSTEM = "system";
}

Fanout被叫做广播模型,它的特点是当生产者把消息投递给交换机,交换机会把消息投递给和它绑定的所有队列,而相应的所有的消费者都能收到消息,如上图。要实现Fanout模型我们要做如下几个事情

  1. 定义自己的fanout类型的交换机
  2. 定义多个队列
  3. 把队列和交换机进行绑定
  4. 消费者监听不同队列
4.2.配置交换机和队列

下面配置了一个fanout类型的交换机和2个队列,并把队列绑定到了交换机

@Configuration
public class RabbitmqConfigFanout {
    //定义消息队列的名字
    public static final String QUEUE_1 = "queue1";
    public static final String QUEUE_2 = "queue2";
    public static final String EXCHANGE_FANOUT = "exchnage-fanout";


    @Bean
    public Exchange exchange(){
        //定义一个fanout类型的交换机,并指定持久化
        return ExchangeBuilder.fanoutExchange(EXCHANGE_FANOUT).durable(true).build();
    }

    @Bean
    public Queue queue1() {
        //创建一个队列队列,并指定队列的名字和持久化
        return new Queue(QUEUE_1,true);
    }

    @Bean
    public Queue queue2() {
        //创建一个队列队列,并指定队列的名字
        return new Queue(QUEUE_2,true);
    }

    @Bean
    public Binding bindingQueue1() {
        //fanout模式不指定routingkey
        return BindingBuilder
                .bind(queue1()).to(exchange()).with("").noargs();
    }
    @Bean
    public Binding bindingQueue2() {
        return BindingBuilder
                .bind(queue2()).to(exchange()).with("").noargs();
    }
}
4.3.编写消费者

消费者只需要监听不同的队列即可

    @RabbitListener(queues = {RabbitmqConfigFanout.QUEUE_1})
    public void receiveFanout1(String msg, Message message, Channel channel) {
        System.out.println("fanout消费者1收到消息:"+msg);
    }

    @RabbitListener(queues = {RabbitmqConfigFanout.QUEUE_2})
    public void receiveFanout2(String msg, Message message, Channel channel) {
        System.out.println("fanout消费者2收到消息:"+msg);
    }

4.4.编写生产者

生产者发送消息的时候需要指定exchange的名字,注意:routingkey不需要指定

    @PostMapping("/sender/fanout/{message}")
    public String senderFanout(@PathVariable String message) {
        /**
         * 参数说明
         * exchnage: 交换机,使用自定义的交换机
         * routingKey :发送消息的路由键,fanout模式指定为“”
         * message:消息的内容
         */
        rabbitTemplate.convertAndSend(RabbitmqConfigFanout.EXCHANGE_FANOUT, "",message);
        return "success";
    }
4.5.测试

启动测试,发送一个消息2个消费者都能收到

5.Routing(路由模型)

5.1.模型认识

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费,我们就要用的routing路由模式,这种模式是通过一个routingkey来收发消息。交换机的类型使用direct

在这里插入图片描述

如上图:不同的队列在绑定到交换机时指定的routingkey是不一样的,这样一来我们发送消息的时候,就可以通过不同的routingkey来把消息发送到不同的队列中,从而使不同的消费者去消费,该模型我们需要做如下几个步骤

  1. 创建direct类型的交换机
  2. 创建多个队列
  3. 把队列绑定到交换机,但是绑定时需要指定不同的routingkey
  4. 消费者消费不同队列的消息
5.2.配置交换机和队列

这里定义了一个direct类型的交换机,以及2个队列,队列在绑定到交换机时采用了不同的routingkey.

@Configuration
public class RabbitmqConfigDirect {
    //定义消息队列的名字
    public static final String QUEUE_DIRECT_1 = "direct_queue1";
    public static final String QUEUE_DIRECT_2 = "direct_queue2";
    public static final String EXCHANGE_DIRECT = "exchnage-direct";


    @Bean
    public Exchange exchange(){
        //定义一个direct类型的交换机,并指定持久化
        return ExchangeBuilder.directExchange(EXCHANGE_DIRECT).durable(true).build();
    }

    @Bean
    public Queue queue1() {
        //创建一个队列队列,并指定队列的名字
        return new Queue(QUEUE_DIRECT_1,true);
    }

    @Bean
    public Queue queue2() {
        //创建一个队列队列,并指定队列的名字
        return new Queue(QUEUE_DIRECT_2,true);
    }

    @Bean
    public Binding bindingQueue1() {
        return BindingBuilder
                .bind(queue1()).to(exchange()).with("pay").noargs();
    }
    @Bean
    public Binding bindingQueue2() {
        return BindingBuilder
                .bind(queue2()).to(exchange()).with("order").noargs();
    }
}
5.3.配置消费者

消费者消费不同队列中的消息即可

    @RabbitListener(queues = {RabbitmqConfigDirect.QUEUE_DIRECT_1})
    public void receiveDirect1(String msg, Message message, Channel channel) {
        System.out.println("receiveDirect1消费者1收到消息:"+msg);
    }

    @RabbitListener(queues = {RabbitmqConfigDirect.QUEUE_DIRECT_2})
    public void receiveDirect2(String msg, Message message, Channel channel) {
        System.out.println("receiveDirect2消费者2收到消息:"+msg);
    }

5.4.定义生产者

生产者需要指定自己的交换机,以及routingkey,指定不同的routingkey决定了消息会发送到不同的队列中

    @PostMapping("/sender/direct/{message}")
    public String senderDirect(@PathVariable String message) {
        /**
         * 参数说明
         * exchnage: 交换机,使用自定义的交换机
         * routingKey :发送消息的路由键,fanout模式指定为“”
         * message:消息的内容
         */
        rabbitTemplate.convertAndSend(RabbitmqConfigDirect.EXCHANGE_DIRECT, "pay",message);
        rabbitTemplate.convertAndSend(RabbitmqConfigDirect.EXCHANGE_DIRECT, "order",message);
        return "success";
    }

6.Topics(通配符)

6.1.模型认识

Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!这种模型Routingkey 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

在这里插入图片描述

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: goods.insert

通配符规则:

  • #:匹配一个或多个词

  • *:匹配不多不少恰好1个词

举例:

audit.#:能够匹配audit.irs.corporate 或者 audit.irs

audit.*:只能匹配audit.irs

topic 和 direct 没有本质的区别,只是在绑定队列时可以使用通配符

6.2.定义交换机和队列

需要定义topics类型的交换机和2个队列,绑定队列的时候指定routingkey,可以使用通配符来指定

@Bean
public Binding bindingQueue1() {
    return BindingBuilder
        .bind(queue1()).to(exchange()).with("#.pay").noargs();
}
@Bean
public Binding bindingQueue2() {
    return BindingBuilder
        .bind(queue2()).to(exchange()).with("#.order").noargs();
}
6.3.编写生产者

发送消息的时候指定的routingkey如果能命中绑定时的routingkey消息就可以发送到相应的队列中,比如:

rabbitTemplate.convertAndSend(RabbitmqConfigTopics.EXCHANGE_TOPIC, "good.pay",message);
rabbitTemplate.convertAndSend(RabbitmqConfigTopics.EXCHANGE_TOPIC, "account.pay",message);
rabbitTemplate.convertAndSend(RabbitmqConfigTopics.EXCHANGE_TOPIC, "good.order",message);

前2个消息的routingkey可以命中 #.pay, 第3条消息可以命中 #.order.

三.RabbitMQ其他特性

1.签收机制

1.1.自动签收的问题

在RabbitMQ中包含手动签收和自动签收2钟模式,上述案例都采用的是自动签收,也就是当MQ吧消息投递给消费者后,消息默认被签收,MQ就会直接把消息删除掉。这种模式可能会导致消息丢失分享:比如消费者拿到消息并未成功消费,但是MQ已经把消息删除,从而造成了消息的丢失,所以在司机开发中尽量使用手动签收

1.2.手动牵手

手动签收模式意味着MQ不会自动签收消息,而是把消息推送给消费者后,等到消费者自己去签收消息后,再删除队列中的消息,这种模式可以防止消息丢失。我们可以通过下面配置来指定签收模式

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual #默认是 auto 自动签署

然后在消费者成功消费完消息后,触发手动签收,代码如下

@RabbitListener(queues = {RabbitmqConfigDirect.QUEUE_DIRECT_2})
public void receiveDirect2(String msg, Message message, Channel channel) throws IOException {
    System.out.println("receiveDirect2消费者2收到消息:"+msg);
    //拿到消息的tag
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    //签收消息:指定消息的tags ,以及不做批量签收
    channel.basicAck(deliveryTag,false);
}

channel.basicAck : 签收消息

  • deliveryTag :消息的标签,代表了一条消息

除此之外我们还可以不签收消息,或者拒绝消息.不签收的消息会一直重复消费,而被拒绝的消息会丢弃掉

//不签收
channel.basicNack(deliveryTag,false,false);
//拒绝消息
channel.basicReject(deliveryTag,false);

2.持久化

mq消息在内存中进行读写,如果MQ宕机那么消息有丢失的风险,我们需要通过持久化来防止消息丢失

2.1.交换机持久化

创建交换机的时候,指定durable属性为true

@Bean
public Exchange exchange(){
    //定义一个direct类型的交换机,并指定持久化
    return ExchangeBuilder.directExchange(EXCHANGE_DIRECT).durable(true).build();
}
2.2.队列持久化

创建队列时指定durable属性为true

@Bean
public Queue queue1() {
    //创建一个队列队列,并指定队列的名字
    return new Queue(QUEUE_DIRECT_1,true);
}
2.3.消息持久化

当我们发送一个消息内容的时候,SpringBoot会自动帮我们持久化

rabbitTemplate.convertAndSend(RabbitmqConfigDirect.EXCHANGE_DIRECT, "order",message);

底层会自动构建Message对象,Messge对象中有一个MessageProperties属性,它包含了MessageDeliveryMode.PERSISTENT持久化和NON_PERSISTENT不持久化2中方式。

3.可靠消息投递

3.1.回调接口介绍

在RabbitTemplate中提供了2个接口

  • ConfirmCallback : 消息投递到Brocker后触发回调,可以用来检测消息是否到达RabbitMQ
  • ReturnsCallback : 消息发送失败回调,比如队列路由失败

要开启上面2中回调需要在yaml中做如下配置

spring:
  rabbitmq:
    publisher-returns: true #开启returnCallback回调
    template:
      mandatory: true #消息会返回给发送者的回调,而不是丢弃
    publisher-confirm-type: correlated #开启ConfirmCallback 回调

然后需要编写回调接口,通过实现 RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback

@Component
@Slf4j
public class RabbitMQCallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        //ReturnedMessage 消息对象中包括:交换机,路由key,消息内容等
        log.info(returnedMessage.getExchange()
                +","+returnedMessage.getRoutingKey()
                +","+new String(returnedMessage.getMessage().getBody()));
        //把失败的消息再次发送
        rabbitTemplate.convertAndSend(returnedMessage.getExchange(),
                                      returnedMessage.getRoutingKey(),returnedMessage.getMessage());
    }

    /**
     * @param correlationData :消息的唯一标识
     * @param ack :消息确认结果
     * @param cause :错误原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info(correlationData.getId() +","+ack +","+cause);
    }
}
  • ConfirmCallback : 消息投递到Brocker的exchange后触发回调,可以用来检测消息是否到达RabbitMQ,通过confirm来把投递结果返回,通过ack我们可以判断消息是否投递到MQ中。
  • ReturnsCallback : 消息发送失败回调,比如队列路由失败。通过 returnedMessage 来把失败的消息返回,我们可以通过该方法对失败的消息进行重试发送

在这里插入图片描述

然后自定义template,把2个回调设置给template


//以下配置RabbitMQ消息服务
@Autowired
public ConnectionFactory connectionFactory;

@Autowired
private RabbitMQCallback rabbitMQCallback;

@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    // 这里的转换器设置实现了发送消息时自动序列化消息对象为message body
    template.setMandatory(true);
    template.setReturnsCallback(rabbitMQCallback);
    template.setConfirmCallback(rabbitMQCallback);

    return template;
}
3.2.可靠消息投递方案

根据上面2个回调接口的特性,我们可以做一个可靠消息投递方案,方案如下:

  1. 设计一个消息日志表,可以基于数据库,也可以基于Redis,字段有:交换机,路由key, 消息内容,发送状态(发送中,发送成功,发送失败),重试次数等。
  2. 在使用rabbitTemplate发布消息之前,把消息的内容持久化到 :消息日志表中,状态为:发送中
  3. 通过回调来监听消息发送结果,如果成功,把消息日志状态修改为:成功,如果发送失败,把消息日志修改为:失败
  4. 额外创建定时任务,定时读取失败的日志进行重试发送,并增加重试次数,直到发送成功,如果发送到一定次数依然不成功就不再重试,而是通知管理员抢修。

在这里插入图片描述

4.延迟队列

4.1.为什么要用

在开发项目的时候我们通常会遇到这么一个问题,比如商城项目有一下单逻辑,下单成功数据保存在数据库中,下单成功后需要用户进行支付,如果在30分钟内支付失败,需要修改订单的支付状态为“支付超时”并关闭订单以及回退库存操作,那如何在下单30后准时检查支付结果处理订单状态呢?

你可能想到了一个最简单的方法,就是使用定时任务扫描订单表,判断时间是否支付超时,这样的方式无疑是一种很消耗性能的做法,你试想一下,定时扫描一张数据量很大的表去判断时间和状态,而且99%的扫描都是无效的操作。

那么该如何优雅的解决上述问题呢?我们可以采用延迟队列来实现,Redis和MQ都可以做到,本文章采用RabbitMQ的延迟队列来实现。

4.2.延迟队列实现原理

说到延迟队列就要说一说消息的过期时间(存活时间)TTL,RabbitMQ可以给队列设置过期时间,也可以单独给每个消息设置过期时间,如果到了过期时间消息没被消费该消息就会标记为死信消息。

除此之外还有那些消息会成为死信消息?

  • 一是设置了TTL的消息到了TTL过期时间还没被消费,会成为死信
  • 二是消息被消费者拒收,并且reject方法的参数里requeue是false,意味这这个消息不会重回队列,该消息会成为死信,
  • 三是由于队列大小限制,新的消息进来队列可能满了,MQ会淘汰掉最老的消息,这些消息可能会成为死信消息

成为死信的消息会进入一个死信交换机(Dead Letter Exchange)中,死信交换机也是一个普通的交换机而已,根据这一特点,我们可以准备一个队列来接收死信交换机中的死信消息,然后准备一个消费者来消费该队列中的消息,这样一来我们的延迟队列就有思路了,还是按照订单为例流程如下:
在这里插入图片描述

  1. 下单成功(生产者),加入下单消息到队列(order.message)
  2. 队列设置TTL过期时间(10000毫秒),同时指定了死信交换机“delay-exchange”和死信交换机转发消息的队列“delay-message”
  3. 消息进入队列,等待一段时间,如果TTL时间到,订单消息会被MQ扔给死信交换机,死信交换机会把消息扔给指定的死信队列delay-message
  4. 消费者正好监听了死信队列delay-message,就可以获取到消息进行消费,比如检查该消息对应的订单是否支付,做出退库存处理等。

整体效果就是,消息进入order.message队列 延迟 10秒后就 会进入delay-message队列然后被消费者消费处理,这就是一个延迟队列的效果。

注意,这里的delay-exchange死信交换机其实就是一个普通的交换机而已,所以我们可以把上面的两个交换机合并成一个,如下:
在这里插入图片描述

4.3.延迟队列实战

第一步,定义交换机和队列

import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

//rabbitMQ的配置
@Configuration
public class MQConfig {
    //交换机
    public static final String EXCHNAGE_DELAY = "EXCHNAGE_DELAY";
    //订单队列,该队列中的消息设置过期时间
    public static final String QUEUE_ORDER = "QUEUE_ORDER";
    //该队列用来接收死信交换机转发过来的消息
    public static final String QUEUE_DELAY = "QUEUE_DELAY";
    //队列的路由键,该路由键用来接收订单消息传出到订单队列
    public static final String ROUTINGKEY_QUEUE_ORDER = "ROUTINGKEY_QUEUE_ORDER";
    //该路由键用来接收死信交换机转发过来的消息
    public static final String ROUTINGKEY_QUEUE_DELAY = "ROUTINGKEY_QUEUE_DELAY";

    //定义交换机
    @Bean
    public Exchange exchangeDelay(){
        return ExchangeBuilder.topicExchange(EXCHNAGE_DELAY).durable(true).build();
    }
    //该队列中的消息需要设置ttl
    @Bean
    public Queue queueOrder(){
        Map<String,Object> map = new HashMap<>();
        map.put("x-dead-letter-exchange", EXCHNAGE_DELAY);    //过期的消息给哪个交换机的名字
        map.put("x-dead-letter-routing-key", ROUTINGKEY_QUEUE_DELAY);   //死信交换机把消息个哪个个routingkey
        map.put("x-message-ttl", 10000);    //队列过期时间10s
        return new Queue(QUEUE_ORDER,true,false,false,map);
    }
    //该队列接收死信交换机转发过来的消息
    @Bean
    public Queue queueDelay(){
        return new Queue(QUEUE_DELAY,true);
    }
    @Bean
    public Binding queueOrderBinding(){
        return BindingBuilder.bind(queueOrder()).to(exchangeDelay()).with(ROUTINGKEY_QUEUE_ORDER).noargs();
    }
    @Bean
    public Binding queueDelayBinding(){
        return BindingBuilder.bind(queueDelay()).to(exchangeDelay()).with(ROUTINGKEY_QUEUE_DELAY).noargs();
    }
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}

第二步,写一个消息发送者

System.out.println("发送消息:我是一个延迟消息,开始时间:"+System.currentTimeMillis());
rabbitTemplate.convertAndSend(
    MQConfig.EXCHNAGE_DELAY,
    MQConfig.ROUTINGKEY_QUEUE_ORDER,
    "我是一个延迟消息"
);

第三步,写一个消费者

@Component
public class Consumer {

    @RabbitListener(queues = MQConfig.QUEUE_DELAY)
    public void handler(String message){
        System.out.println("收到消息:"+message+",结束时间:"+System.currentTimeMillis());
    }
}

第六步,测试效果

  • 生产者执行后,观察MQ,QUEUE_ORDER中有消息

在这里插入图片描述

  • 等待10s之后,消息进入QUEUE_DELAY队列

在这里插入图片描述

  • 控制台打印效果
Producer:   发送消息:我是一个延迟消息,开始时间:1606295976347
Consumer: 收到消息:我是一个延迟消息,结束时间:1606295986418

发送消息到收到消息的时间差为 10071 , 忽略网络开销,延迟时间差不多就是我们设置的TTL时间

5.消息重复消费

因为消息本身是有重试机制或者我们为了保证消息一定能投递成功可能会导致消息多次投递,那么对于消费者而言消息的重复消费处理就变得非常重要。通常我们可以使用消息的唯一标识来避免重复消费,大概思路如下

  1. 找到消息本省的唯一标识,或者在数据中设置一个唯一标识,比如:订单就可以把订单号作为唯一标识
  2. 消费者每次做了消息消费后,会把这个唯一标识记录下来,比如记录到数据库,或者Redis都可以。
  3. 消费者每次消费前都拿到消息的这个唯一标识去判断一下消息是否被消费,如果已经被消费国了就不要再消费了

四.面试必备

  1. MQ的使用场景
  2. RabbitMQ的工作流程
  3. RabbitMQ如何防止消息丢失
  4. RabbitMQ的消息模型有哪几种(交换机有哪几种)
  5. 如何处理消息重复消费
  6. 如何实现延迟队列(如果通过MQ实现订单超时取消)
  7. 什么情况下消息会变成死信消息
  8. 消息的签收模式有哪几种,有什么区别
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐