前言: 在使用Rabbitmq 过程中,每次配置参数都需要进行搜索和回忆,本文对rabbitmq 中常用的配置成参数进行列举并解释;

这里先粘下比较常用的参数及其简单注注释,更为详细的注释可以在文章中后面的部分进行解读:

############# 基础配置
# mq 服务器的地址
spring.rabbitmq.host=localhost
# mq 服务器的端口
spring.rabbitmq.port=5672
# mq 服务器的连接使用的用户名
spring.rabbitmq.username=admin
# mq 服务器的连接使用的密码
spring.rabbitmq.password=rabbitmq
# mq 服务器的连接使用的虚拟机
spring.rabbitmq.virtual-host=my_vhost


############# 连接和管道配置
# spring.rabbitmq.cache.connection.mode 为connection 生效 ,connection 连接池的大小
#spring.rabbitmq.cache.connection.size=3
# 与broker 连接的 模式 channel 或者 connection
spring.rabbitmq.cache.connection.mode=channel
# 与broker 连接的默认时间,默认为 60 秒,超时会会中断并抛出异常
spring.rabbitmq.connection-timeout=1000
# 每个连接中可以建立的channel 数量
spring.rabbitmq.cache.channel.size=50
# 如果已达到channel缓存大小,等待获取channel的时间。 如果为0,则始终创建一个新channel
spring.rabbitmq.cache.channel.checkout-timeout=2000
# 指定心跳超时,单位秒,0为不指定;默认60s
spring.rabbitmq.requested-heartbeat=60
# 客户端总共可以创建总的channel 数量
spring.rabbitmq.requested-channel-max=1024


########## 生产者配置
spring.rabbitmq.template.exchange=my_exchange
# 启用消息投递结果确认
spring.rabbitmq.publisher-returns=true
# 启用强制消息投递,即生产者发送消息成功或者失败,需要返回确认消息
spring.rabbitmq.template.mandatory=true
# 消息发布者确认模式
spring.rabbitmq.publisher-confirm-type=correlated
# 发送重试是否可用
spring.rabbitmq.template.retry.enabled= true
# 最大重试次数,默认值为 3
spring.rabbitmq.template.retry.max-attempts=3
# 第一次和第二次尝试发布或传递消息之间的间隔,默认值为 1000 毫秒
spring.rabbitmq.template.retry.initial-interval=1000
#表示时间间隔的倍数系数,默认值为 1 当进行第 n 次重试时,
# 会将时间间隔设置为  initial-interval * multiplier^(n-1) ,用于控制重试时间间隔逐渐增加
spring.rabbitmq.template.retry.multiplier=1 
# 表示时间间隔的最大值,默认值为 10000 毫秒
spring.rabbitmq.template.retry.max-interval= 1000


########## 消费者配置
# 是否自动启动消息的监听 默认为true
spring.rabbitmq.listener.simple.auto-startup=false
# 消费消息确认模式
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 批量预取条数 默认值250
spring.rabbitmq.listener.simple.prefetch=50
# 开启批量消费 需要搭配将SimpleRabbitListenerContainerFactory 
# 对象 batchListener 属性设置为true 使用 否则报错
spring.rabbitmq.listener.simple.consumer-batch-enabled=true
# 批量消费的条数
spring.rabbitmq.listener.simple.batch-size=20
# 并发消费最小线程数
spring.rabbitmq.listener.simple.concurrency=1
# 并发消费最大线程数
spring.rabbitmq.listener.simple.max-concurrency=1


### 消费失败 重试参数
# 开启重试
spring.rabbitmq.listener.simple.retry.enabled=true
# 表示最大重试次数,默认值为 3
spring.rabbitmq.listener.simple.retry.max-attempts=3
# 表示第一次重试的时间间隔,默认值为 1000 毫秒
spring.rabbitmq.listener.simple.retry.initial-interval=1000
#表示时间间隔的倍数系数,默认值为 1 当进行第 n 次重试时,
# 会将时间间隔设置为  initial-interval * multiplier^(n-1) ,用于控制重试时间间隔逐渐增加
spring.rabbitmq.listener.simple.retry.multiplier=1
# 表示时间间隔的最大值,默认值为 10000 毫秒
spring.rabbitmq.listener.simple.retry.max-interval=1000
# 消息监听器是否启用无状态(stateless)重试 默认true
spring.rabbitmq.listener.simple.retry.stateless=false
# 控制当消息消费失败后,RabbitMQ 是否需要将消息重新入队。该参数的默认值为 true,即消息将被重新入队
spring.rabbitmq.listener.simple.default-requeue-rejected=true

参数分类介绍:

1 基础参数:

客户端连接mq 服务端必要的参数配置项:

# mq 服务器的地址
spring.rabbitmq.host=localhost
# mq 服务器的端口
spring.rabbitmq.port=5672
# mq 服务器的连接使用的用户名
spring.rabbitmq.username=admin
# mq 服务器的连接使用的密码
spring.rabbitmq.password=rabbitmq
# mq 服务器的连接使用的虚拟机 
spring.rabbitmq.virtual-host=my_vhost

在引入了Rabbitmq 包后 spirng 在启动时会自动装配然后连接 mq 服务器,组成的连接地址如下:

amqp://user:pass@host:port/vhost

2 客户端与服务端建立连接和管道的参数:

2.1 建立连接参数:

# 与broker 连接的 模式 channel 或者 connection
spring.rabbitmq.cache.connection.mode=channel
  • 默认使用channel 模式建立连接;
  • connection 模式会创建多个连接对象,channel 只会创建一个connection 对象;
# spring.rabbitmq.cache.connection.mode 为connection 生效 ,connection 连接池的大小
spring.rabbitmq.cache.connection.size=25
  • 当使用spring.rabbitmq.cache.connection.mode 使用 connection 模式时,用于控制其connection 连接池中connection 连接对象的数量;

# 与broker 连接的默认时间,默认为 60 秒,超时会会中断并抛出异常
spring.rabbitmq.connection-timeout=1000

  • spring.rabbitmq.connection-timeout 是 Spring Boot 集成 RabbitMQ 客户端连接工厂(ConnectionFactory)的一个属性,用于设置连接(Connection)超时时间。这个属性影响了应用程序与 RabbitMQ Broker 建立连接的时间。

  • 当应用程序启动时,它必须连接到 RabbitMQ Broker 以进行通信。连接建立的时间包括多个步骤,例如 DNS 解析、TCP 握手和客户端 / 服务端协议交换等。在这些步骤中,可能会因网络问题、防火墙或主机配置等原因导致连接建立失败,因此需要一定的超时机制来避免连接超时等问题。

  • spring.rabbitmq.connection-timeout 属性定义了在连接到 RabbitMQ Broker 时的超时时间。当连接建立需要的时间超过此超时时间时,连接尝试将会中断并抛出异常。这可以确保应用程序连接到 Broker 的过程不会无限地等待,并能够在失败的情况下尽快进行故障排除和处理。该属性应设置为一个正整数,表示等待连接的时间(以毫秒为单位)。

  • 尽管连接超时默认为 60 秒,但可能需要根据具体情况进行微调。长的超时时间会导致应用程序在连接的过程中浪费大量时间,而过短的超时时间则可能会让连接建立失败,从而导致应用程序中断或出现其他问题。在设置 spring.rabbitmq.connection-timeout 属性时,需要确保其值足够长,以允许连接建立的所有步骤正常完成,并确保应用程序不会因连接问题而出现长时间阻塞。

  • 总之,spring.rabbitmq.connection-timeout 属性定义了应用程序连接到 RabbitMQ Broker 的超时时间。通过设置适当的超时时间,可以确保应用程序能够在合理的时间内连接到 Broker,并正常处理事件。

# 指定心跳超时,单位秒,0为不指定;默认60s
spring.rabbitmq.requested-heartbeat: 60
  • sring.rabbitmq.requested-heartbeat 是 Spring Boot 集成 RabbitMQ 客户端连接工厂(ConnectionFactory)的一个属性,用于设置客户端和 RabbitMQ Broker 之间的心跳检测时间(即连接保持活跃的时间)。

  • 在 RabbitMQ 中,心跳检测是一种用于确保客户端与 Broker保持活跃连接的机制。心跳检测的基本原理是向对方发送心跳帧(心跳包)并等待回应帧,如果一定时间内未能收到回应帧,则会认为连接已经停止工作,从而中断连接。如果没有心跳机制,通道的终端可能早就离线了,但是由于没有显式的断开信号,连接仍然保持开启状态,这将浪费网络和服务器资源。

  • spring.rabbitmq.requested-heartbeat 属性用于指示 RabbitMQ Broker 每隔多长时间应该向客户端发送心跳帧。它接受一个以秒为单位的整数值,表示心跳帧的时间间隔。如果客户端在两次心跳包之间没有收到消息,它将发送一个心跳帧以确认连接仍然有效。如果 Broker 在一定时间内未能收到来自客户端的心跳包,则可能会认为连接已经失效,从而关闭连接。

  • 通过设置适当的心跳时间间隔可以帮助提高连接的稳定性。如果心跳时间间隔设置得太短,则可能引入不必要的网络流量,并对 RabbitMQ 和客户端的性能产生负面影响。如果心跳时间间隔太长,则可能会导致连接时间过长,并且无法及时检测到连接故障。

  • 总之,sring.rabbitmq.requested-heartbeat 属性可用于指示客户端和 RabbitMQ Broker 之间的心跳检测时间。设置合适的心跳时间可以确保连接保持活跃,并在连接异常情况下及时断开连接,提高消息传输的稳定性和可靠性。

2.2 连接通道参数:

# 每个连接中可以建立的channel 数量
spring.rabbitmq.cache.channel.size=50
  • spring.rabbitmq.cache.channel.size 是 Spring Boot 集成 RabbitMQ 客户端连接工厂(ConnectionFactory)的一个属性,用于设置缓存的通道(channel)数量。它可控制应用程序与 RabbitMQ Broker 之间的通道(channel)复。

  • 在 RabbitMQ 中,设备连接(Connection)是相对昂贵的,因此创建一个新的设备连接需要消耗较高的资源。但是,创建通道(Channel)的消耗非常低。因此,建议不要创建太多的设备连接,相反,可以使用一些可重用的通道来处理大量的I/O操作。

  • 缓存通道是一项优化技术,它可以在相同的设备连接上重新使用通道,从而减少初始化通道的开销,提高生产力。

  • spring.rabbitmq.cache.channel.size 定义了每个连接缓存的通道的数量,因此可以控制缓存的通道的数量。当您的应用程序需要许多通道时,缓存通道会提高性能,而不需要每次发送消息时重新初始化新通道。在这种情况下,较高的通道缓存大小可能会提高吞吐量,因为它可以处理更多的消息并可重用通道,而不是创建新的通道。

  • 但是,连接池和通道缓存的大小应该在业务性能和可用资源之间进行权衡。如果缓存的通道数量过高,将会占用过多的资源,并可能对应用程序的性能产生负面影响。在另一方面,如果设置的通道缓存过于小,则可能会导致频繁地创建和删除通道,从而增加了通道的开销。

  • 总之,spring.rabbitmq.cache.channel.size 是应用程序的可配置属性,可用于优化连接池和通道复用。通过设置正确的值,可以提高应用程序的性能并最大程度地减少应用程序的资源占用。

# 如果已达到channel缓存大小,等待获取channel的时间。 如果为0,则始终创建一个新channel
spring.rabbitmq.cache.channel.checkout-timeout=2000
  • sring.rabbitmq.cache.channel.checkout-timeout 是 Spring Boot 集成 RabbitMQ 客户端连接工厂(ConnectionFactory)的一个属性,用于设置缓存的通道(channel)请求超时时间。它控制当应用程序请求通道但没有可用通道时的行为。

  • 在 RabbitMQ 中,通道(channel)是一种表示到 RabbitMQ Broker 的实际 TCP 底层连接的抽象。创建和释放通道都需要一定的开销,因此无论是消费者还是生产者,最好事先声明和缓存一些多路复用的通道。缓存通道的优点是可以重用通道,避免了频繁地创建和释放通道。

  • sring.rabbitmq.cache.channel.checkout-timeout 属性用于控制通道请求的超时时间。超时时间是指尝试从连接工厂请求通道时等待的时间。当应用程序请求一个通道,但所有的缓存通道都已被占用时,应用程序将等待可用的通道,直到超时时间结束。如果超时时间到期仍然无法获得可用通道,则可能会抛出 AmqpTimeoutException 异常。

  • 之所以设置通道请求超时时间是为了防止因线程阻塞而导致的应用程序停滞或抖动。如果执行线程无限期地等待可用通道,则可能会影响应用程序的整体性能。因此,通过设置适当的超时时间,可以确保应用程序不会无限期地阻塞在通道请求阶段。

  • 总之,sring.rabbitmq.cache.channel.checkout-timeout 属性可用于定义缓存通道请求的超时时间。通过设置适当的超时时间,可以确保应用程序在不影响系统性能的情况下,以可控和高可靠性的方式请求和使用通道。

# 客户端总共可以创建总的channel 数量
spring.rabbitmq.requested-channel-max=1024

  • spring.rabbitmq.requested-channel-max 是 RabbitMQ 客户端 的一个属性,用于设置客户端连接通道(channel)的最大数量。一个 RabbitMQ 连接最多可以创建多少个 channel,取决于 RabbitMQ Server 的配置和客户端的运行环境。在一些场景下,由于客户端连接过多或者 Server 的无限制支持,将导致资源浪费或者系统负载过高。

  • spring.rabbitmq.requested-channel-max 可以控制客户端连接通道的数量,对 RabbitMQ Server 进行限制。通过限制连接通道数量,可以避免因过多 channel 导致的系统性能下降或者资源浪费,保证 RabbitMQ Server 的可用性以及客户端连接的稳定性。

  • 该属性有以下几个取值:

  • 0:表示不限制客户端或者 Server 端的 channel 数量;

  • 1:表示每个客户端连接最多只能创建一个 channel,可以通过设置该属性为 1,防止客户端创建过多的 channel 浪费资源;

  • 大于 1 的整数:表示每个客户端连接最多可以创建的 channel 数量,该值在一些高并发场景中可以根据需要设置合适的数值,以做到资源最优化利用;

  • 需要注意的是,该属性仅影响客户端连接通道的数量,不会影响 RabbitMQ Server 的 channel 数量上限。如果需要更改 Server 端 channel 数量上限,需要修改相应的 Server 配置;

注意:spring.rabbitmq.requested-channel-max 和spring.rabbitmq.cache.channel.size 的区别:
spring.rabbitmq.requested-channel-maxspring.rabbitmq.cache.channel.size 是 RabbitMQ 客户端 的两个属性,它们控制的是 RabbitMQ 的连接通道(channel)的数量,但二者存在较大的区别。

  • spring.rabbitmq.requested-channel-max 主要是控制客户端连接的总的 channel 数量,即使用一个 RabbitMQ 连接可以创建的最大 channel 数量。当设置为 0 时,表示不限制客户端连接的 channel 数量,RabbitMQ Server 可以创建任意多的 channel。当设置为 1 时,表示每个客户端连接只能创建一个 channel。而当设置为大于 1 的整数时,表示每个客户端连接最多可以同时使用的 channel 数量。

  • spring.rabbitmq.cache.channel.size 控制使用一个 RabbitMQ 连接创建的 channel 数量,即 RabbitMQ 连接池中缓存连接时每个连接中的 channel 的数量,缓存的目的是减少连接和销毁的开销。当设置为 0 时,表示不缓存任何 channel,每次使用 channel 前都需要创建一个新的 channel 对象。当设置为大于 0 的整数时,表示缓存的每个连接创建的 channel 对象的最大数量。

  • 综上所述,spring.rabbitmq.requested-channel-max 控制的是连接整个 RabbitMQ 集群某一个节点时,一个连接最多能有的 channel 数量;而 spring.rabbitmq.cache.channel.size 控制的则是一个连接缓存 channel 的数量,也就是一个物理连接内可以缓存 channel 的数量。

2.3 SSL 连接参数:

spring.rabbitmq.ssl.enabled: 是否支持ssl
spring.rabbitmq.ssl.key-store: 指定持有SSL certificate的key store的路径
spring.rabbitmq.ssl.key-store-password: 指定访问key store的密码
spring.rabbitmq.ssl.trust-store: 指定持有SSL certificates的Trust store
spring.rabbitmq.ssl.trust-store-password: 指定访问trust store的密码
spring.rabbitmq.ssl.algorithm: ssl使用的算法,例如,TLSv1.1

3 客户端 生产者发送消息参数:

# 生产者 发送的消息需要使用哪个 交换机
spring.rabbitmq.template.exchange=my_exchange
# 启用强制消息投递,即生产者发送消息成功或者失败,需要返回确认消息
spring.rabbitmq.template.mandatory=true
  • spring.rabbitmq.template.mandatory 用于控制是否启用 RabbitMQ 强制性(mandatory)投递机制,该机制可以确保生产者能够收到发送失败的消息,进行自定义处理;
  • 当消息无法路由到目标 Queue 时,RabbitMQ 会将该消息返回给生产者,交由生产者自己处理。如果该属性值为 true,则需要使用 setReturnCallback() 方法注册一个回调方法,以便处理返回的消息;
  • 如果该属性值为 false,则无论消息是否发送成功都不会有返回;
# 启用消息投递结果确认
spring.rabbitmq.publisher-returns=true
  • spring.rabbitmq.publisher-returns 属性表示是否启用 RabbitMQ 消息发送确认机制;
  • 消息投递成功或者失败,可以使用setConfirmCallback 方法注册一个回调方法,以便处理返回的消息;
# 消息发布者确认模式
spring.rabbitmq.publisher-confirm-type=correlated

确认模式有3中可以选择:

public static enum ConfirmType {
     SIMPLE,
     CORRELATED,
     NONE;

     private ConfirmType() {
     }
 }
  • none :表示不启用消息发布者确认模式,即不需要确认消息是否成功发送到 RabbitMQ 服务器;
  • CORRELATED 模式是指当消息成功发送到 RabbitMQ 服务器时,消息发布者会收到一个带有唯一标识符的确认信号。这个唯一标识符通常是消息的序列号或者其他唯一标识符。通过这个唯一标识符,消息发布者可以知道哪条消息被确认了。因此,该模式适用于需要知道每条消息的确认情况的场景;
  • SIMPLE 模式是指当消息成功发送到 RabbitMQ 服务器时,消息发布者会收到一个确认信号。但是,由于没有唯一标识符,因此无法知道确认信号对应的是哪一条消息。因此,该模式适用于不需要知道每条消息的确认情况的场景;

注意:
spring.rabbitmq.template.mandatory 主要关注交换机无法通过routkey 投递消息到队列中,更侧重于消息路由问题;spring.rabbitmq.publisher-returns 主要关注消息确切的投递问题。

下面是一个生产者投递消息的ack 确认demo:

/**
     * 通过交换机,路由key 发送消息
     *
     * @param exchangeName
     * @param routKey
     * @param message
     */
    public void sendMessage(String exchangeName, String routKey, Object message) {
        // 设置消息的唯一标识符
        long deliveryTag = System.currentTimeMillis();
        rabbitTemplate.convertAndSend(exchangeName, routKey, message, messagePostProcessor -> {
        	// 设置消息标识后续,回调时对应correlationData.getId()
            messagePostProcessor.getMessageProperties().setDeliveryTag(deliveryTag);
            // 设置消息没有被路由回调 对应oneMessage.getMessageProperties().getMessageId();
            messagePostProcessor.getMessageProperties().setMessageId(String.valueOf(deliveryTag));
            return messagePostProcessor;
        }, new CorrelationData(String.valueOf(deliveryTag)));
       
        // 设置 ConfirmCallback 回调函数 确认消息是否成功发送到 Exchang
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                if (null == correlationData) {
                    // 延迟消息 correlationData 为null
                    return;
                }
                log.debug("Message sent successfully:{} ", correlationData.getId());

            } else {
                if (null == correlationData && null == cause) {
                    // 延迟消息 correlationData 为null
                    return;
                }
                log.error("Message sent failed: {}", correlationData.getId() + ", cause: " + cause);
            }
        });
        // ReturnCallback  处理的是未路由的消息返回的情况
        rabbitTemplate.setReturnCallback((oneMessage, replyCode, replyText, exchange, routingKey) -> {
            // 判断是否是延迟消息
            if (routingKey.indexOf("delay") != -1) {
                // 是一个延迟消息,忽略这个错误提示
                return;
            }
            log.debug("Message returned: {}", new String(oneMessage.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey);
        });
    }

4 客户端消费消息:

4.1 消费消息确认机制:

# 消费消息确认模式
spring.rabbitmq.listener.simple.acknowledge-mode=manual

有3中模式可以选择:

public enum AcknowledgeMode {
    NONE,
    MANUAL,
    AUTO;

    private AcknowledgeMode() {
    }

    public boolean isTransactionAllowed() {
        return this == AUTO || this == MANUAL;
    }

    public boolean isAutoAck() {
        return this == NONE;
    }

    public boolean isManual() {
        return this == MANUAL;
    }
}

  • none 表示消费者不会发送任何确认消息给 RabbitMQ,即消息会被消费者收到并立即从队列中删除,而不管消费者是否成功处理该消息。因此,将 acknowledge-mode 设置为 none 可能会导致消息丢失和重复消费的问题;
  • AUTO: 自动确认消息。当消费者成功处理完消息时,会自动发送 ack 消息,表示消息已经被成功消费;当消费者处理消息时发生异常时,会自动发送 nack 消息,表示消息消费失败,需要重新进入队列进行重;
  • MANUAL: 手动确认消息。消息处理函数必须调用 Channel#basicAck(long deliveryTag, boolean multiple) 方法来确认消息,deliveryTag 表示该消息的标识符,multiple 表示是否批量确认。如果该方法没有得到调用,则消息会被重新加入队列,等待消费者重新消费;

注意:
(1)在 AUTO 模式下,消费者无需手动发送 ack 或 nack 消息,系统会自动处理。但是需要注意的是,在 AUTO 模式下,如果消费者处理消息时发生异常,系统会自动发送 nack 消息,将消息重新放回队列进行重试。如果消费者一直处理失败,系统会不断地将消息重新放回队列进行重试,直到消息过期或者达到最大重试次数才会被丢弃。因此,在使用 AUTO 模式时,需要确保消费者能够正确地处理消息,避免消息重复消费或者丢失的问题。
(2)channel.basicAck 是 RabbitMQ Java 客户端库中的一个方法,用于向 RabbitMQ 服务器发送确认消息,表示消息已经被成功消费。该方法的参数含义如下:

  • deliveryTag :表示消息的唯一标识符,是一个非负整数。每个消息的 deliveryTag 是唯一的,用于标识 RabbitMQ 服务器上的消息。
  • multiple :表示是否批量确认消息。当该参数设置为 true 时,表示确认所有 deliveryTag 小于等于当前 deliveryTag 的所有消息;当该参数设置为 false 时,表示只确认当前 deliveryTag 对应的消息。

需要注意的是, channel.basicAck 方法只能在消费者接收到消息后调用,用于告知 RabbitMQ 服务器消息已经被成功消费,否则消息会一直保留在队列中,直到消费者发送确认消息为止。在使用该方法时,需要确保消费者已经成功处理了消息,避免消息重复消费或者丢失的问题;

# 是否自动启动消息的监听,默认为true
spring.rabbitmq.listener.simple.auto-startup=false

Spring Boot 中的 RabbitMQ 自动配置提供了一个名为 SimpleRabbitListenerContainerFactory 的 Bean,用于创建 org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer 对象,该对象用于监听消息并处理消息。auto-startup 参数的作用是表示容器是否在 context 装载时自动启动,在配置false 后可以通过下面代码手动开启:

@Slf4j
@Component
public class BatchConfig {
    @Value("${env:prod}")
    private String env;
    @Autowired
    SimpleRabbitListenerContainerFactory containerFactory;

    @PostConstruct
    public void simpleListenerBatchInit() {
        log.info("设置批量-----");
        containerFactory.setBatchListener(true);
        if ("prod".equals(env)){
            // 依照不同的环境进行开启
            containerFactory.setAutoStartup(true);
        }

    }

}

4.2 消费消息失败重试机制:

### 消费失败 重试参数
# 开启重试
spring.rabbitmq.listener.simple.retry.enabled=true
# 表示最大重试次数,默认值为 3
spring.rabbitmq.listener.simple.retry.max-attempts=3
# 表示第一次重试的时间间隔,默认值为 1000 毫秒
spring.rabbitmq.listener.simple.retry.initial-interval=1000
#表示时间间隔的倍数系数,默认值为 1 当进行第 n 次重试时,
# 会将时间间隔设置为  initial-interval * multiplier^(n-1) ,用于控制重试时间间隔逐渐增加
spring.rabbitmq.listener.simple.retry.multiplier=1
# 表示时间间隔的最大值,默认值为 10000 毫秒
spring.rabbitmq.listener.simple.retry.max-interval=1000
# 消息监听器是否启用无状态(stateless)重试 默认true
spring.rabbitmq.listener.simple.retry.stateless=false

spring.rabbitmq.listener.simple.retry.enabled 是 Spring Boot RabbitMQ Starter 中的一个配置参数,用于设置是否启用消息消费者重试功能。该参数的默认值为 false ,表示不启用消息消费者重试功能。如果需要启用消息消费者重试功能,可以将该参数设置为 true 。 当启用消息消费者重试功能时,如果消息消费失败,会根据以下配置参数进行重试:

  • spring.rabbitmq.listener.simple.retry.max-attempts :表示最大重试次数,默认值为 3。当消息消费失败时,会根据该参数指定的次数进行重试,如果仍然失败,则会将消息发送到死信队列。

  • spring.rabbitmq.listener.simple.retry.initial-interval :表示第一次重试的时间间隔,默认值为 1000 毫秒。当消息消费失败后,会等待该时间间隔后进行第一次重试。

  • spring.rabbitmq.listener.simple.retry.multiplier :表示时间间隔的倍数系数,默认值为 1。当进行第 n 次重试时,会将时间间隔设置为 initial-interval * multiplier^(n-1) ,用于控制重试时间间隔逐渐增加。

  • spring.rabbitmq.listener.simple.retry.max-interval :表示时间间隔的最大值,默认值为 10000 毫秒。当计算得到的时间间隔大于该值时,会将时间间隔设置为该值。

  • 启用消息消费者重试功能可以提高消息消费的可靠性,确保消息能够被成功消费。在消息消费失败时,重试功能可以自动进行重试,避免消息被丢失或者无法处理的问题。需要注意的是,重试功能会增加系统的开销,因为需要等待一定的时间间隔后进行重试。如果系统的性能要求比较高,可以考虑关闭重试功能,或者调整重试参数的值。

spring.rabbitmq.listener.simple.retry.stateless 是 RabbitMQ 客户端的一个属性,用于设置 Spring AMQP 的简单消息监听器(SimpleMessageListenerContainer)是否启用无状态(stateless)重试。

  • 如果该属性设置为 true,则监听器会在出现异常时直接将消息返回给 RabbitMQ Server,这时 Server 会认为消息处理失败,将消息重新加入队列中,等待重新消费。这种情况下的重试是无状态的,因为监听器在处理消息时不会记住处理状态,也就是说任何时候都会对消息进行处理。

  • 如果该属性设置为 false,则监听器会在出现异常时先将消息进行缓存,再进行消息处理。如果消息处理成功,则将消息从缓存中移除;如果消息处理失败,则触发重试机制。这种情况下的重试是有状态的,因为监听器在处理消息时会记录处理状态,监控和管理者可以根据状态来分析并管理消息。

  • 需要注意的是,启用无状态重试(即将 spring.rabbitmq.listener.simple.retry.stateless 设置为 true)可以提高消息处理的吞吐量,但也可能会导致消费者重复消费同一条消息的情况。因此,在使用无状态重试时,需要确保消息处理不会产生副作用,并且需要配置 RabbitMQ 重试策略以确保消息不会因无限重试而导致消费者过度负担。

4.3 批量消费参数配置:

# 并发消费最小线程数
spring.rabbitmq.listener.simple.concurrency=1
# 并发消费最大线程数
spring.rabbitmq.listener.simple.max-concurrency=1

注意:
上面两个参数并不是真正意义上的批量消费,它们只是增加了消费的线程数量;

# 批量预取条数 默认值250
spring.rabbitmq.listener.simple.prefetch=50
# 开启批量消费
spring.rabbitmq.listener.simple.consumer-batch-enabled=true
# 批量消费的条数
spring.rabbitmq.listener.simple.batch-size=20

prefetch 参数:

  • spring.rabbitmq.listener.simple.prefetch 是 RabbitMQ 客户端 的一个属性,用于设置 Spring AMQP 的简单消息监听器(SimpleMessageListenerContainer)的预取数量。

  • 预取(prefetch)是指消费者从 RabbitMQ 队列中拉取一定数量的消息并进行缓存,以便在消息处理期间减少消费者与队列之间的数据交互。spring.rabbitmq.listener.simple.prefetch 属性定义了消费者从 RabbitMQ 中预取并缓存的消息数量。例如,如果将属性设置为 10,则消费者可以从队列中取出并缓存 10 条消息,然后依次处理这些消息。

  • 预取数量可以通过该属性进行配置,预取的目的是通过减少从 RabbitMQ 内部传输和处理消息的次数来提高消息消费的效率和性能。如果预取过小,则消费者可能会因等待消息而浪费时间;如果预取过大,则可能会导致消费者占用过多内存,并且 RabbitMQ 服务器的吞吐量也会受到限制。

  • 需要注意的是,当多个消费者共享同一个队列时,较大的预取数量对服务器的压力也会更大,因为 RabbitMQ 需要维护所有预取消息的缓存并等待所有消费者都完成处理后再发送新的预取请求。因此,在配置预取数量时,需要权衡预取数量和节点的负载,以确保消息的高效处理和 RabbitMQ 整体性能的稳定性。

consumer-batch-enabled :开启批量消费;
batch-size 参数

  • spring.rabbitmq.listener.simple.batch-size 是 RabbitMQ 客户端的一个属性,用于设置 Spring AMQP 的简单消息监听器(SimpleMessageListenerContainer)的批量消费大小。

  • 批量消费(batching)是指一次消费多个消息而不是一次消费一个消息的消费方式。spring.rabbitmq.listener.simple.batch-size 属性定义了监听器从 RabbitMQ 中批量接收和处理的消息数量。例如,如果将该属性设置为 10,则监听器可以一次性接收和处理 10 条消息。

  • 批量消费可以通过该属性进行配置,批量消费的目的是通过同时处理多个消息来提高消息消费的效率和性能。如果批量大小过小,则消费者可能会因等待消息而浪费时间;如果批量大小过大,则可能会导致消费者占用过多内存,并且 RabbitMQ 服务器的吞吐量也会受到限制。因此,在配置批量大小时,需要权衡批量大小和节点的负载,以确保消息的高效处理和 RabbitMQ 整体性能的稳定性。

  • 需要注意的是,Spring 的 SimpleMessageListenerContainer 只支持向单个队列消费,也就是说无法将单个批次中的消息来自多个队列。如果需要在多个队列之间进行批量消费,可以创建多个 SimpleMessageListenerContainer 实例并分别绑定到不同的队列上,或者使用 Spring AMQP 中的 DirectMessageListener 或AbstractMessageListenerContainer 实现。

注意:

(1) springboot 中使用 @RabbitListener 注解进行队列消息的消费,需要将SimpleRabbitListenerContainerFactory 的bean 开启批量消费,否则在consumer-batch-enabled 设置为true 时 启动会报错:

@Slf4j
@Component
public class BatchConfig {
    @Autowired
    SimpleRabbitListenerContainerFactory containerFactory;
     @PostConstruct
    public void simpleListenerBatchInit() {
        log.info("设置批量-----");
        // 监听器开启批量消费
        containerFactory.setBatchListener(true);
    }
}

(2) spring.rabbitmq.listener.simple.prefetch 设置为大于等于 spring.rabbitmq.listener.simple.batch-size ;因为当 batch-size 的数量大于 prefetch 数量时,会导致某些消息被重复消费:

  • 消费者消费的消息是从 spring.rabbitmq.listener.simple.prefetch 预取出的消息缓存中获取的。消费者在获取消息时,首先从 RabbitMQ 队列中预取指定数量的消息并缓存到本地,然后在本地缓存队列中处理这些消息,直到处理完毕或缓存队列为空;
  • 当消费者的预取数小于批量消费数时,消费者会一次性从 RabbitMQ 服务器预取指定数量(batch-size - prefetch)的消息,但是只有在消费者确认之后,RabbitMQ 才会将新的消息发送给消费者。因此,当消费者处理速度较慢时,已经预取的消息可能已经被消费者处理了,但是由于消费者还没有确认,RabbitMQ 服务器会认为这些消息还没有被消费,因此会重新发送给消费者,导致消息重复消费的问题;
  • 为了避免这个问题,可以将 spring.rabbitmq.listener.simple.batch-size 设置为小于等于 spring.rabbitmq.listener.simple.prefetch ,或者将 spring.rabbitmq.listener.simple.prefetch 设置为大于等于 spring.rabbitmq.listener.simple.batch-size 。这样可以确保消费者在处理完批量消费数的消息之后,再去预取新的消息,避免消息重复消费的问题;

5 参考:

5.1 RabbitMQ——SpringBoot配置选项;
5.2 【rabbit MQ】Spring Boot + RabbitMQ 配置参数解释;

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐