消息监听容器配置

配置 `SimpleMessageListenerContainer` (SMLC) 和 `DirectMessageListenerContainer` (DMLC) 以及与事务和服务质量相关的选项有很多,其中一些选项会相互影响。适用于 SMLC、DMLC 或 `StreamListenerContainer` (StLC)(参见 使用 RabbitMQ Stream 插件)的属性在相应的列中用复选标记表示。有关帮助您确定哪个容器适合您的应用程序的信息,请参见 选择容器

下表显示了容器属性名称及其在使用命名空间配置 `<rabbit:listener-container/>` 时的等效属性名称(括号中)。该元素上的 `type` 属性可以是 `simple`(默认)或 `direct`,分别指定 `SMLC` 或 `DMLC`。某些属性未通过命名空间公开。这些由 `N/A` 表示。

表 1. 消息监听容器的配置选项
属性 (属性名) 描述 SMLC DMLC StLC

ackTimeout
(N/A)

当设置 `messagesPerAck` 时,此超时用作发送确认的替代方法。当收到新消息时,未确认消息的数量与 `messagesPerAck` 进行比较,自上次确认以来的时间与该值进行比较。如果任一条件为 `true`,则确认消息。当没有新的消息到达并且有未确认的消息时,此超时是近似的,因为条件仅在每个 `monitorInterval` 检查一次。另请参见本表中的 `messagesPerAck` 和 `monitorInterval`。

tickmark

acknowledgeMode
(acknowledge)

  • NONE:不发送确认(与 `channelTransacted=true` 不兼容)。RabbitMQ 将此称为“autoack”,因为代理假设所有消息都在没有消费者任何操作的情况下被确认。

  • MANUAL:监听器必须通过调用 `Channel.basicAck()` 来确认所有消息。

  • AUTO:容器自动确认消息,除非 `MessageListener` 抛出异常。请注意,`acknowledgeMode` 与 `channelTransacted` 相互补充——如果通道已进行事务处理,则代理需要提交通知以及确认。这是默认模式。另请参见 `batchSize`。

tickmark
tickmark

adviceChain
(advice-chain)

应用于监听器执行的 AOP Advice 数组。这可以用于应用额外的横切关注点,例如在代理死亡时自动重试。请注意,只要代理仍然处于活动状态,`CachingConnectionFactory` 就会处理 AMQP 错误后的简单重新连接。

tickmark
tickmark

afterReceivePostProcessors
(N/A)

在调用监听器之前调用的 `MessagePostProcessor` 实例数组。后处理器可以实现 `PriorityOrdered` 或 `Ordered`。该数组按顺序排序,无序成员最后调用。如果后处理器返回 `null`,则丢弃消息(如果适用,则确认)。

tickmark
tickmark

alwaysRequeueWithTxManagerRollback
(N/A)

设置为 `true` 以在配置事务管理器时始终在回滚时重新排队消息。

tickmark
tickmark

autoDeclare
(auto-declare)

设置为 `true`(默认)时,如果容器在启动时检测到至少一个队列丢失(可能是因为它是 `auto-delete` 或已过期队列),则容器使用 `RabbitAdmin` 重新声明所有 AMQP 对象(队列、交换机、绑定),但无论出于何种原因队列丢失,重新声明都会继续进行。要禁用此行为,请将此属性设置为 `false`。请注意,如果所有队列都丢失,则容器将无法启动。

在 1.6 版之前,如果上下文中有多个管理员,则容器会随机选择一个。如果没有管理员,它会在内部创建一个。在这两种情况下,都可能导致意外结果。从 1.6 版开始,为了使 `autoDeclare` 工作,上下文中必须只有一个 `RabbitAdmin`,或者必须使用 `rabbitAdmin` 属性在容器上配置对特定实例的引用。
tickmark
tickmark

autoStartup
(auto-startup)

此标志指示容器应在ApplicationContext启动时启动(作为SmartLifecycle回调的一部分,该回调在所有bean初始化之后发生)。默认为true,但如果您的代理可能在启动时不可用,您可以将其设置为false,并在知道代理已准备好后稍后手动调用start()

tickmark
tickmark
tickmark

batchSize
(事务大小)(批量大小)

当与设置为AUTOacknowledgeMode一起使用时,容器尝试处理多达此数量的消息,然后再发送ack(等待每个消息直到达到接收超时设置)。这也是事务通道提交的时间。如果prefetchCount小于batchSize,则将其增加以匹配batchSize

tickmark

batchingStrategy
(N/A)

拆分消息时使用的策略。默认值为SimpleDebatchingStrategy。参见批量处理@RabbitListener与批量处理

tickmark
tickmark

channelTransacted
(通道事务)

布尔标志,指示所有消息都应在一个事务中确认(手动或自动)。

tickmark
tickmark

concurrency
(N/A)

m-n 每个侦听器的并发使用者范围(最小值,最大值)。如果只提供n,则n是固定的使用者数量。参见侦听器并发

tickmark

concurrentConsumers
(并发)

为每个侦听器最初启动的并发使用者的数量。参见侦听器并发。对于StLC,并发通过重载的superStream方法控制;参见使用单个活动使用者使用超级流

tickmark
tickmark

connectionFactory
(连接工厂)

ConnectionFactory的引用。当使用XML命名空间配置时,默认引用的bean名称为rabbitConnectionFactory

tickmark
tickmark

consecutiveActiveTrigger
(最小连续活动)

消费者连续接收的消息的最小数量,在没有发生接收超时的情况下,才会考虑启动新的消费者。还会受到“batchSize”的影响。参见侦听器并发。默认值:10。

tickmark

consecutiveIdleTrigger
(最小连续空闲)

消费者必须经历的最小接收超时次数,然后才会考虑停止消费者。还会受到“batchSize”的影响。参见侦听器并发。默认值:10。

tickmark

consumerBatchEnabled
(批量启用)

如果MessageListener支持,则将其设置为true可以启用最多batchSize个离散消息的批量处理;如果在receiveTimeout内没有新的消息到达或收集批量消息的时间超过batchReceiveTimeout,则将交付部分批次。当此值为false时,批量处理仅支持由生产者创建的批次;参见批量处理

tickmark

consumerCustomizer
(N/A)

用于修改容器创建的流消费者的ConsumerCustomizer bean。

tickmark

consumerStartTimeout
(N/A)

等待消费者线程启动的时间(毫秒)。如果此时间已过,则会写入错误日志。这可能发生的一个例子是,如果配置的taskExecutor没有足够的线程来支持容器concurrentConsumers

参见线程和异步消费者。默认值:60000(一分钟)。

tickmark

consumerTagStrategy
(消费者标签策略)

设置ConsumerTagStrategy的实现,从而能够为每个消费者创建(唯一)的标签。

tickmark
tickmark

consumersPerQueue
(每个队列的消费者)

为每个配置的队列创建的消费者的数量。参见侦听器并发

tickmark

consumeDelay
(N/A)

当使用RabbitMQ分片插件concurrentConsumers > 1时,存在一个竞争条件,可能会阻止消费者在分片之间均匀分布。使用此属性在消费者启动之间添加少量延迟,以避免此竞争条件。您应该尝试不同的值以确定适合您环境的延迟。

tickmark
tickmark

debatchingEnabled
(N/A)

如果为true,则侦听器容器将拆分批量消息,并使用批次中的每条消息调用侦听器。从2.2.7版本开始,如果侦听器是BatchMessageListenerChannelAwareBatchMessageListener,则生产者创建的批次将作为List<Message>进行拆分。否则,批次中的消息将逐一呈现。默认为true。参见批量处理@RabbitListener与批量处理

tickmark
tickmark

declarationRetries
(声明重试)

被动队列声明失败时的重试尝试次数。被动队列声明发生在消费者启动时,或者当从多个队列消费时,在初始化期间并非所有队列都可用时。如果在重试耗尽后,没有一个配置的队列可以被动声明(任何原因),则容器的行为由前面描述的“missingQueuesFatal”属性控制。默认值:三次重试(总共四次尝试)。

tickmark

defaultRequeueRejected
(重新入队被拒绝的消息)

确定由于侦听器引发异常而被拒绝的消息是否应重新入队。默认值:true

tickmark
tickmark

errorHandler
(错误处理程序)

用于处理在MessageListener执行期间可能发生的任何未捕获异常的ErrorHandler策略的引用。默认值:ConditionalRejectingErrorHandler

tickmark
tickmark

exclusive
(独占)

确定此容器中的单个使用者是否对队列具有独占访问权限。当此值为true时,容器的并发性必须为1。如果另一个使用者具有独占访问权限,则容器将根据recovery-intervalrecovery-back-off尝试恢复使用者。使用命名空间时,此属性与队列名称一起出现在<rabbit:listener/>元素上。默认值:false

tickmark
tickmark

exclusiveConsumerExceptionLogger
(N/A)

独占使用者无法访问队列时使用的异常记录器。默认情况下,这将在WARN级别记录。

tickmark
tickmark

failedDeclarationRetryInterval
(失败的声明重试间隔)

被动队列声明重试尝试之间的间隔。被动队列声明发生在消费者启动时,或者当从多个队列消费时,在初始化期间并非所有队列都可用时。默认值:5000(五秒)。

tickmark
tickmark

forceCloseChannel
(N/A)

如果消费者在shutdownTimeout内没有响应关闭,如果此值为true,则通道将关闭,导致任何未确认的消息重新入队。从2.0版开始默认为true。您可以将其设置为false以恢复到之前的行为。

tickmark
tickmark

forceStop
(N/A)

设置为true可在处理当前记录后停止(当容器停止时);导致所有预取的消息重新入队。默认情况下,容器将在停止之前取消使用者并处理所有预取的消息。从2.4.14版、3.0.6版开始,默认为false

tickmark
tickmark

globalQos
(全局QoS)

如果为true,则prefetchCount将全局应用于通道,而不是应用于通道上的每个使用者。有关更多信息,请参见basicQos.global

tickmark
tickmark

(组)

仅当使用命名空间时才可用。指定后,类型为Collection<MessageListenerContainer>的bean将使用此名称注册,并且每个<listener/>元素的容器都将添加到集合中。例如,这允许通过迭代集合来启动和停止容器组。如果多个<listener-container/>元素具有相同的组值,则集合中的容器构成所有如此指定的容器的聚合。

tickmark
tickmark

idleEventInterval
(空闲事件间隔)

参见检测空闲异步消费者

tickmark
tickmark

javaLangErrorHandler
(N/A)

当容器线程捕获Error时调用的AbstractMessageListenerContainer.JavaLangErrorHandler实现。默认实现调用System.exit(99);要恢复到之前的行为(不执行任何操作),请添加一个无操作处理程序。

tickmark
tickmark

maxConcurrentConsumers
(最大并发)

根据需要启动的并发消费者的最大数量。必须大于或等于“concurrentConsumers”。参见侦听器并发

tickmark

messagesPerAck
(N/A)

在ack之间接收的消息数。使用此方法可以减少发送到代理的ack数量(代价是增加重新交付消息的可能性)。通常,您应该仅在高容量侦听器容器上设置此属性。如果设置了此属性并且消息被拒绝(引发异常),则将确认挂起的ack,并将失败的消息拒绝。不允许使用事务通道。如果prefetchCount小于messagesPerAck,则将其增加以匹配messagesPerAck。默认值:每条消息都确认。另请参见此表中的ackTimeout

tickmark

mismatchedQueuesFatal
(不匹配队列致命)

当容器启动时,如果此属性为true(默认值:false),则容器将检查上下文中声明的所有队列是否与代理上已存在的队列兼容。如果存在不匹配的属性(例如auto-delete)或参数(例如x-message-ttl),则容器(和应用程序上下文)将因致命异常而无法启动。

如果在恢复期间检测到问题(例如,连接丢失后),则容器将停止。

应用程序上下文中必须只有一个RabbitAdmin(或一个在容器上使用rabbitAdmin属性专门配置的RabbitAdmin)。否则,此属性必须为false

如果在初始启动期间代理不可用,则容器将启动,并在建立连接时检查条件。
检查针对上下文中的所有队列进行,而不仅仅是特定侦听器配置为使用的队列。如果您希望将检查限制为容器使用的那些队列,则应为容器配置单独的RabbitAdmin,并使用rabbitAdmin属性提供对它的引用。有关更多信息,请参见条件声明
在启动标记为@Lazy 的bean 中的@RabbitListener 容器时,会禁用不匹配队列参数检测。这是为了避免潜在的死锁,该死锁最多可能延迟此类容器的启动 60 秒。使用延迟侦听器 bean 的应用程序应在获取对延迟 bean 的引用之前检查队列参数。
tickmark
tickmark

missingQueuesFatal
(missing-queues-fatal)

设置为true(默认值)时,如果代理上没有任何已配置的队列可用,则认为这是致命的。这会导致应用程序上下文在启动期间初始化失败。此外,当容器运行时队列被删除时,默认情况下,使用者会尝试三次连接到队列(间隔五秒),如果这些尝试失败,则停止容器。

这在以前的版本中不可配置。

设置为false时,在进行三次重试后,容器将进入恢复模式,就像其他问题(例如代理关闭)一样。容器将根据recoveryInterval属性尝试恢复。在每次恢复尝试期间,每个使用者都会再次尝试四次以五秒的间隔被动声明队列。此过程将无限期地继续。

您还可以使用属性 bean 为所有容器全局设置属性,如下所示

<util:properties
        id="spring.amqp.global.properties">
    <prop key="mlc.missing.queues.fatal">
        false
    </prop>
</util:properties>

此全局属性不应用于任何已设置显式missingQueuesFatal属性的容器。

可以通过设置以下属性来覆盖默认重试属性(三次重试,每次间隔五秒)。

在启动标记为@Lazy 的bean 中的@RabbitListener 容器时,会禁用缺失队列检测。这是为了避免潜在的死锁,该死锁最多可能延迟此类容器的启动 60 秒。使用延迟侦听器 bean 的应用程序应在获取对延迟 bean 的引用之前检查队列。
tickmark
tickmark

monitorInterval
(monitor-interval)

使用 DMLC 时,会安排一项任务以该间隔运行,以监控使用者的状态并恢复任何已失败的使用者。

tickmark

noLocal
(N/A)

设置为true可禁用服务器向使用者传递在同一通道连接上发布的消息。

tickmark
tickmark

phase
(phase)

autoStartuptrue时,此容器应在其内启动和停止的生命周期阶段。值越低,此容器启动越早,停止越晚。默认值为Integer.MAX_VALUE,这意味着容器尽可能晚启动,尽可能早停止。

tickmark
tickmark

possibleAuthenticationFailureFatal
(possible-authentication-failure-fatal)

设置为true(对于 SMLC 为默认值)时,如果连接期间抛出PossibleAuthenticationFailureException,则认为这是致命的。这会导致应用程序上下文在启动期间初始化失败(如果容器配置了自动启动)。

2.0 版起。

DirectMessageListenerContainer

设置为false(默认值)时,每个使用者将根据monitorInterval尝试重新连接。

SimpleMessageListenerContainer

设置为false时,在进行 3 次重试后,容器将进入恢复模式,就像其他问题(例如代理关闭)一样。容器将根据recoveryInterval属性尝试恢复。在每次恢复尝试期间,每个使用者将再次尝试启动 4 次。此过程将无限期地继续。

您还可以使用属性 bean 为所有容器全局设置属性,如下所示

<util:properties
    id="spring.amqp.global.properties">
  <prop
    key="mlc.possible.authentication.failure.fatal">
     false
  </prop>
</util:properties>

此全局属性不应用于任何已设置显式missingQueuesFatal属性的容器。

可以使用此属性后的属性覆盖默认重试属性(3 次重试,每次间隔 5 秒)。

tickmark
tickmark

prefetchCount
(prefetch)

每个使用者可以未确认的消息数量。此值越高,消息传递速度越快,但非顺序处理的风险也越高。如果acknowledgeModeNONE,则忽略此值。如有必要,会增加此值以匹配batchSizemessagePerAck。自 2.0 版以来默认为 250。您可以将其设置为 1 以恢复到之前的行为。

在某些情况下,prefetch 值应该较低——例如,对于大型消息,尤其是在处理速度较慢的情况下(消息可能会在客户端进程中累积大量内存),以及如果需要严格的消息排序(在这种情况下,prefetch 值应设置为 1)。此外,对于低容量消息传递和多个使用者(包括单个侦听器容器实例内的并发性),您可能希望减少预取以获得更均匀的消息分布。

另请参阅globalQos

tickmark
tickmark

rabbitAdmin
(admin)

当侦听器容器侦听至少一个自动删除队列并在启动期间发现它丢失时,容器使用RabbitAdmin声明队列以及任何相关的绑定和交换机。如果此类元素配置为使用条件声明(请参阅条件声明),则容器必须使用已配置的管理员来声明这些元素。在此处指定该管理员。仅当使用具有条件声明的自动删除队列时才需要它。如果您不希望在容器启动之前声明自动删除队列,请在管理员上将auto-startup设置为false。默认为声明所有非条件元素的RabbitAdmin

tickmark
tickmark

receiveTimeout
(receive-timeout)

等待每条消息的最长时间。如果acknowledgeMode=NONE,则此值几乎没有作用——容器会循环并请求另一条消息。对于具有batchSize > 1的事务性Channel,它具有最大的影响,因为它可能导致已使用但尚未确认的消息直到超时到期才被确认。当consumerBatchEnabled为 true 时,如果在批处理完成之前发生此超时,则将传递部分批处理。

tickmark

batchReceiveTimeout
(batch-receive-timeout)

收集批处理消息的超时时间(毫秒)。它限制了等待填充 batchSize 的时间。当batchSize > 1并且收集批处理消息的时间大于batchReceiveTime时,将传递批处理。默认为 0(无超时)。

tickmark

recoveryBackOff
(recovery-back-off)

指定如果使用者因非致命原因无法启动而尝试启动使用者的间隔的BackOff。默认为FixedBackOff,每五秒钟无限次重试。与recoveryInterval互斥。

tickmark
tickmark

recoveryInterval
(recovery-interval)

确定如果使用者因非致命原因无法启动而尝试启动使用者的毫秒数间隔。默认值:5000。与recoveryBackOff互斥。

tickmark
tickmark

retryDeclarationInterval
(missing-queue- retry-interval)

如果在使用者初始化期间只有一部分配置的队列可用,则使用者将开始使用这些队列进行使用。使用者将尝试使用此间隔被动声明缺失的队列。当此间隔过去后,将再次使用“declarationRetries”和“failedDeclarationRetryInterval”。如果仍然有缺失的队列,使用者将再次等待此间隔,然后再尝试。此过程将无限期地继续,直到所有队列都可用。默认值:60000(一分钟)。

tickmark

shutdownTimeout
(N/A)

当容器关闭时(例如,如果其封闭的ApplicationContext关闭),它将等待处理中的消息,直到达到此限制。默认为五秒。

tickmark
tickmark

startConsumerMinInterval
(min-start-interval)

在按需启动每个新的使用者之前必须经过的毫秒数。请参阅侦听器并发性。默认值:10000(10 秒)。

tickmark

statefulRetryFatal
WithNullMessageId (N/A)

使用有状态重试建议时,如果收到缺少messageId属性的消息,则默认情况下,它会被视为对使用者而言是致命的(它将停止)。将其设置为false可丢弃(或路由到死信队列)此类消息。

tickmark
tickmark

stopConsumerMinInterval
(min-stop-interval)

检测到空闲使用者后,自上次使用者停止以来必须经过的毫秒数,才能停止使用者。请参阅侦听器并发性。默认值:60000(一分钟)。

tickmark

streamConverter
(N/A)

一个StreamMessageConverter,用于将本机 Stream 消息转换为 Spring AMQP 消息。

tickmark

taskExecutor
(task-executor)

对 Spring TaskExecutor(或标准 JDK 1.5+ Executor)的引用,用于执行侦听器调用者。默认为SimpleAsyncTaskExecutor,使用内部管理的线程。

tickmark
tickmark

taskScheduler
(task-scheduler)

使用 DMLC 时,用于按“monitorInterval”运行监控任务的调度程序。

tickmark

transactionManager
(transaction-manager)

侦听器操作的外部事务管理器。也与channelTransacted互补——如果Channel是事务性的,则其事务将与外部事务同步。

tickmark
tickmark