消息监听器容器配置

配置 SimpleMessageListenerContainer (SMLC) 和 DirectMessageListenerContainer (DMLC) 时,在事务和服务质量方面有很多选项,其中一些选项会相互影响。适用于 SMLC、DMLC 或 StreamListenerContainer (StLC)(参见 使用 RabbitMQ 流插件)的属性在相应的列中用复选标记表示。参见 选择容器,了解有关如何确定哪个容器适合您的应用程序的信息。

下表显示了容器属性名称及其在使用命名空间配置 <rabbit:listener-container/> 时的等效属性名称(括号内)。该元素的 type 属性可以是 simple(默认)或 direct,分别指定 SMLCDMLC。某些属性没有通过命名空间公开。这些属性在属性中用 N/A 表示。

表 1. 消息监听器容器的配置选项
属性(属性) 描述 SMLC DMLC StLC

ackTimeout
(N/A)

当设置 messagesPerAck 时,此超时时间用作发送确认的替代方法。当收到新消息时,会将未确认消息的数量与 messagesPerAck 进行比较,并将自上次确认以来的时间与该值进行比较。如果任一条件为 true,则确认该消息。当没有新消息到达并且存在未确认的消息时,此超时时间是近似的,因为该条件仅在每次 monitorInterval 时检查。另请参见此表中的 messagesPerAckmonitorInterval

tickmark

acknowledgeMode
(确认)

  • NONE:不发送确认(与 channelTransacted=true 不兼容)。RabbitMQ 将其称为“自动确认”,因为代理假设所有消息都在没有消费者任何操作的情况下被确认。

  • MANUAL:监听器必须通过调用 Channel.basicAck() 来确认所有消息。

  • AUTO:容器会自动确认消息,除非 MessageListener 抛出异常。请注意,acknowledgeModechannelTransacted 相辅相成,如果通道是事务性的,则代理除了确认之外还需要提交通知。这是默认模式。另请参见 batchSize

tickmark
tickmark

adviceChain
(建议链)

应用于监听器执行的 AOP 建议数组。这可用于应用额外的横切关注点,例如在代理死亡时自动重试。请注意,在 AMQP 错误后进行简单的重新连接由 CachingConnectionFactory 处理,只要代理仍然存活。

tickmark
tickmark

afterReceivePostProcessors
(N/A)

在调用监听器之前调用的 MessagePostProcessor 实例数组。后处理器可以实现 PriorityOrderedOrdered。该数组按排序进行排序,未排序的成员最后调用。如果后处理器返回 null,则丢弃该消息(并在适当的情况下确认)。

tickmark
tickmark

alwaysRequeueWithTxManagerRollback
(N/A)

设置为 true 以在配置事务管理器时始终在回滚时重新排队消息。

tickmark
tickmark

autoDeclare
(自动声明)

当设置为 true(默认值)时,容器使用 RabbitAdmin 重新声明所有 AMQP 对象(队列、交换机、绑定),如果它检测到其至少一个队列在启动期间丢失,可能是因为它是一个 auto-delete 或已过期的队列,但如果队列由于任何原因丢失,则重新声明将继续进行。要禁用此行为,请将此属性设置为 false。请注意,如果所有队列都丢失,则容器无法启动。

在 1.6 版本之前,如果上下文中存在多个管理员,容器会随机选择一个。如果不存在管理员,它会在内部创建一个。这两种情况下都可能导致意外结果。从 1.6 版本开始,为了使 autoDeclare 工作,上下文中必须存在且仅存在一个 RabbitAdmin,或者在容器上使用 rabbitAdmin 属性配置对特定实例的引用。
tickmark
tickmark

autoStartup
(自动启动)

指示容器在 ApplicationContext 启动时启动的标志(作为 SmartLifecycle 回调的一部分,这些回调在所有 bean 初始化后发生)。默认值为 true,但如果您的代理可能在启动时不可用,您可以将其设置为 false,并在您知道代理已准备就绪时手动调用 start()

tickmark
tickmark
tickmark

batchSize
(事务大小) (批次大小)

当与 acknowledgeMode 设置为 AUTO 一起使用时,容器尝试在发送确认之前处理最多此数量的消息(等待每个消息直到接收超时设置)。这也是事务通道提交时。如果 prefetchCount 小于 batchSize,则会增加以匹配 batchSize

tickmark

batchingStrategy
(N/A)

对消息进行反批处理时使用的策略。默认 SimpleDebatchingStrategy。请参阅 批处理@RabbitListener with Batching

tickmark
tickmark

channelTransacted
(通道事务)

布尔标志,用于指示所有消息应在事务中确认(手动或自动)。

tickmark
tickmark

concurrency
(N/A)

m-n 每个侦听器的并发消费者范围(最小值,最大值)。如果只提供 n,则 n 是固定的消费者数量。请参阅 侦听器并发

tickmark

concurrentConsumers
(并发)

每个侦听器最初启动的并发消费者数量。请参阅 侦听器并发。对于 StLC,并发通过重载的 superStream 方法控制;请参阅 使用单个活动消费者消费超级流

tickmark
tickmark

connectionFactory
(connection-factory)

ConnectionFactory 的引用。使用 XML 命名空间配置时,默认引用的 bean 名称是 rabbitConnectionFactory

tickmark
tickmark

consecutiveActiveTrigger
(min-consecutive-active)

消费者连续接收消息的最小数量,在考虑启动新消费者时,不会发生接收超时。还会受到“batchSize”的影响。请参阅 侦听器并发。默认值:10。

tickmark

consecutiveIdleTrigger
(min-consecutive-idle)

消费者必须经历的最小接收超时次数,在考虑停止消费者之前。还会受到“batchSize”的影响。请参阅 侦听器并发。默认值:10。

tickmark

consumerBatchEnabled
(batch-enabled)

如果 MessageListener 支持,将此设置为 true 可启用离散消息的批处理,最多 batchSize;如果在 receiveTimeout 内没有新的消息到达或收集批处理消息的时间超过 batchReceiveTimeout,则会传递部分批处理。当此值为 false 时,仅支持由生产者创建的批处理;请参阅 批处理

tickmark

consumerCustomizer
(N/A)

用于修改容器创建的流消费者的 ConsumerCustomizer bean。

tickmark

consumerStartTimeout
(N/A)

等待消费者线程启动的时间(以毫秒为单位)。如果此时间过去,则会写入错误日志。这可能发生的一个示例是,如果配置的 taskExecutor 没有足够的线程来支持容器 concurrentConsumers

参见 线程和异步消费者。默认值:60000(一分钟)。

tickmark

consumerTagStrategy
(consumer-tag-strategy)

设置 ConsumerTagStrategy 的实现,使每个消费者都能创建(唯一)的标签。

tickmark
tickmark

consumersPerQueue
(consumers-per-queue)

为每个配置的队列创建的消费者数量。参见 监听器并发

tickmark

consumeDelay
(N/A)

当使用 RabbitMQ 分片插件concurrentConsumers > 1 时,存在一个竞争条件,可能会阻止消费者在分片之间均匀分布。使用此属性在消费者启动之间添加一个小的延迟,以避免这种竞争条件。您应该尝试不同的值,以确定适合您的环境的延迟。

tickmark
tickmark

debatchingEnabled
(N/A)

如果为 true,监听器容器将对批处理消息进行解批处理,并使用批处理中的每条消息调用监听器。从版本 2.2.7 开始,如果监听器是 BatchMessageListenerChannelAwareBatchMessageListener,则 生产者创建的批处理 将被解批处理为 List<Message>。否则,批处理中的消息将逐条呈现。默认值为 true。参见 批处理@RabbitListener 与批处理

tickmark
tickmark

declarationRetries
(declaration-retries)

被动队列声明失败时的重试次数。被动队列声明发生在消费者启动时,或者当从多个队列消费时,在初始化期间并非所有队列都可用。如果在重试次数用尽后,无法被动声明任何配置的队列(无论出于何种原因),容器的行为将由前面描述的 'missingQueuesFatal` 属性控制。默认值:三次重试(总共四次尝试)。

tickmark

defaultRequeueRejected
(requeue-rejected)

确定由于监听器抛出异常而被拒绝的消息是否应该重新排队。默认值:true

tickmark
tickmark

errorHandler
(error-handler)

ErrorHandler 策略的引用,用于处理在执行 MessageListener 期间可能发生的任何未捕获异常。默认值:ConditionalRejectingErrorHandler

tickmark
tickmark

exclusive
(exclusive)

确定此容器中的单个消费者是否对队列具有排他访问权。当此值为 true 时,容器的并发性必须为 1。如果另一个消费者具有排他访问权,容器将尝试根据 recovery-intervalrecovery-back-off 恢复消费者。当使用命名空间时,此属性将与队列名称一起出现在 <rabbit:listener/> 元素上。默认值:false

tickmark
tickmark

exclusiveConsumerExceptionLogger
(N/A)

当独占消费者无法访问队列时使用的异常记录器。默认情况下,这将在 WARN 级别记录。

tickmark
tickmark

failedDeclarationRetryInterval
(failed-declaration -retry-interval)

被动队列声明重试尝试之间的间隔。被动队列声明发生在消费者启动时,或者当从多个队列消费时,在初始化期间并非所有队列都可用。默认值:5000(五秒)。

tickmark
tickmark

forceCloseChannel
(N/A)

如果消费者在 shutdownTimeout 内没有响应关闭,如果为 true,则通道将关闭,导致任何未确认的消息重新排队。从 2.0 开始默认为 true。您可以将其设置为 false 以恢复到以前的行为。

tickmark
tickmark

forceStop
(N/A)

设置为 true 以在处理完当前记录后停止(当容器停止时);导致所有预取的消息重新排队。默认情况下,容器将在停止之前取消消费者并处理所有预取的消息。从版本 2.4.14、3.0.6 开始,默认为 false

tickmark
tickmark

globalQos
(global-qos)

当为 true 时,prefetchCount 将全局应用于通道,而不是应用于通道上的每个消费者。有关更多信息,请参见 basicQos.global

tickmark
tickmark

(group)

这仅在使用命名空间时可用。指定后,类型为 Collection<MessageListenerContainer> 的 bean 将使用此名称注册,并且每个 <listener/> 元素的容器将添加到集合中。例如,这允许通过遍历集合来启动和停止容器组。如果多个 <listener-container/> 元素具有相同的组值,则集合中的容器将形成所有指定容器的聚合。

tickmark
tickmark

idleEventInterval
(idle-event-interval)

请参见 检测空闲异步消费者

tickmark
tickmark

javaLangErrorHandler
(N/A)

当容器线程捕获到 Error 时,会调用 AbstractMessageListenerContainer.JavaLangErrorHandler 实现。默认实现调用 System.exit(99);要恢复到之前的行为(不执行任何操作),请添加一个无操作处理程序。

tickmark
tickmark

maxConcurrentConsumers
(最大并发)

根据需要启动的最大并发消费者数量。必须大于或等于 'concurrentConsumers'。参见 监听器并发

tickmark

messagesPerAck
(N/A)

两次确认之间接收的消息数量。使用此选项可以减少发送到代理的确认次数(以增加消息重新传递的可能性为代价)。通常,您应该仅在高流量监听器容器上设置此属性。如果设置了此属性并且消息被拒绝(抛出异常),则挂起的确认将被确认,并且失败的消息将被拒绝。不允许与事务通道一起使用。如果 prefetchCount 小于 messagesPerAck,则会增加以匹配 messagesPerAck。默认值:每条消息确认一次。另请参见此表中的 ackTimeout

tickmark

mismatchedQueuesFatal
(队列不匹配致命)

当容器启动时,如果此属性为 true(默认值:false),则容器会检查上下文中声明的所有队列是否与代理上已存在的队列兼容。如果存在不匹配的属性(例如 auto-delete)或参数(例如 x-message-ttl),则容器(和应用程序上下文)将无法启动,并抛出致命异常。

如果在恢复期间检测到问题(例如,在连接丢失后),则容器将停止。

应用程序上下文中必须有一个 RabbitAdmin(或通过使用 rabbitAdmin 属性在容器上专门配置的一个)。否则,此属性必须为 false

如果代理在初始启动期间不可用,则容器将启动,并在建立连接时检查条件。
检查针对上下文中的所有队列进行,而不仅仅是特定监听器配置为使用的队列。如果您希望将检查限制为容器使用的那些队列,则应该为容器配置一个单独的 RabbitAdmin,并使用 rabbitAdmin 属性提供对它的引用。有关更多信息,请参见 条件声明
在启动标记为@Lazy的 bean 中的@RabbitListener的容器时,会禁用不匹配队列参数检测。这是为了避免潜在的死锁,该死锁可能会延迟此类容器的启动时间长达 60 秒。使用延迟监听器 bean 的应用程序应在获取延迟 bean 的引用之前检查队列参数。
tickmark
tickmark

missingQueuesFatal
(missing-queues-fatal)

当设置为true(默认值)时,如果代理上没有配置的队列可用,则被视为致命错误。这会导致应用程序上下文在启动期间无法初始化。此外,当容器运行时删除队列时,默认情况下,消费者会尝试三次连接到队列(以五秒钟的间隔),如果这些尝试失败,则停止容器。

这在以前的版本中不可配置。

当设置为false时,在进行三次重试后,容器会进入恢复模式,就像其他问题一样,例如代理关闭。容器会根据recoveryInterval属性尝试恢复。在每次恢复尝试期间,每个消费者都会再次尝试四次以五秒钟的间隔被动声明队列。此过程会无限期地继续。

您还可以使用属性 bean 为所有容器全局设置属性,如下所示

<util:properties
        id="spring.amqp.global.properties">
    <prop key="mlc.missing.queues.fatal">
        false
    </prop>
</util:properties>

此全局属性不应用于任何具有显式missingQueuesFatal属性设置的容器。

默认重试属性(以五秒钟的间隔进行三次重试)可以通过设置以下属性来覆盖。

在启动标记为@Lazy的 bean 中的@RabbitListener的容器时,会禁用不匹配队列参数检测。这是为了避免潜在的死锁,该死锁可能会延迟此类容器的启动时间长达 60 秒。使用延迟监听器 bean 的应用程序应在获取延迟 bean 的引用之前检查队列。
tickmark
tickmark

monitorInterval
(monitor-interval)

使用 DMLC,会安排一项任务以该间隔运行,以监控消费者的状态并恢复任何失败的消费者。

tickmark

noLocal
(N/A)

设置为true以禁用服务器向消费者发送在同一通道连接上发布的消息。

tickmark
tickmark

phase
(phase)

autoStartuptrue 时,此容器应启动和停止的生命周期阶段。值越低,此容器启动越早,停止越晚。默认值为 Integer.MAX_VALUE,表示容器尽可能晚启动,尽可能早停止。

tickmark
tickmark

possibleAuthenticationFailureFatal
(possible-authentication-failure-fatal)

当设置为 true(SMLC 的默认值)时,如果在连接期间抛出 PossibleAuthenticationFailureException,则认为它很严重。这会导致应用程序上下文在启动期间无法初始化(如果容器配置为自动启动)。

版本 2.0 起。

DirectMessageListenerContainer

当设置为 false(默认值)时,每个消费者将尝试根据 monitorInterval 重新连接。

SimpleMessageListenerContainer

当设置为 false 时,在进行 3 次重试后,容器将进入恢复模式,就像其他问题一样,例如代理器关闭。容器将尝试根据 recoveryInterval 属性恢复。在每次恢复尝试期间,每个消费者将再次尝试 4 次启动。此过程将无限期地继续。

您还可以使用属性 bean 为所有容器全局设置属性,如下所示

<util:properties
    id="spring.amqp.global.properties">
  <prop
    key="mlc.possible.authentication.failure.fatal">
     false
  </prop>
</util:properties>

此全局属性不会应用于任何具有显式 missingQueuesFatal 属性设置的容器。

默认重试属性(3 次重试,间隔 5 秒)可以使用此属性后的属性覆盖。

tickmark
tickmark

prefetchCount
(prefetch)

每个消费者可以未确认的消息数量。此值越高,消息传递速度越快,但非顺序处理的风险也越高。如果 acknowledgeModeNONE,则忽略此值。如果需要,它会增加以匹配 batchSizemessagePerAck。自 2.0 起默认为 250。您可以将其设置为 1 以恢复到之前的行为。

在某些情况下,预取值应该很低,例如,对于大型消息,尤其是如果处理速度很慢(消息可能会在客户端进程中累积到大量内存),并且如果需要严格的消息排序(在这种情况下,预取值应该设置为 1)。此外,对于低容量消息传递和多个消费者(包括单个侦听器容器实例内的并发),您可能希望减少预取以获得更均匀的消息分布到消费者。

另请参阅 globalQos

tickmark
tickmark

rabbitAdmin
(admin)

当监听器容器监听至少一个自动删除队列,并在启动时发现队列丢失时,容器将使用 RabbitAdmin 来声明队列以及任何相关的绑定和交换机。如果这些元素被配置为使用条件声明(参见 条件声明),则容器必须使用配置用于声明这些元素的管理员。在此处指定该管理员。仅在使用具有条件声明的自动删除队列时才需要。如果您不希望在容器启动之前声明自动删除队列,请将管理员的 auto-startup 设置为 false。默认情况下,使用声明所有非条件元素的 RabbitAdmin

tickmark
tickmark

receiveTimeout
(receive-timeout)

每个消息的最大等待时间。如果 acknowledgeMode=NONE,则此设置几乎没有效果——容器会循环并请求另一条消息。对于具有 batchSize > 1 的事务性 Channel,它具有最大的影响,因为它会导致已消费的消息在超时到期之前不被确认。当 consumerBatchEnabled 为 true 时,如果在批次完成之前发生此超时,则将交付部分批次。

tickmark

batchReceiveTimeout
(batch-receive-timeout)

收集批次消息的超时时间(毫秒)。它限制了等待填充 batchSize 的时间。当 batchSize > 1 且收集批次消息的时间大于 batchReceiveTime 时,将交付批次。默认值为 0(无超时)。

tickmark

recoveryBackOff
(recovery-back-off)

指定在尝试启动消费者(如果由于非致命原因而无法启动)之间的间隔的 BackOff。默认值为 FixedBackOff,每五秒无限次重试。与 recoveryInterval 互斥。

tickmark
tickmark

recoveryInterval
(recovery-interval)

确定在尝试启动消费者(如果由于非致命原因而无法启动)之间的间隔时间(毫秒)。默认值:5000。与 recoveryBackOff 互斥。

tickmark
tickmark

retryDeclarationInterval
(missing-queue- retry-interval)

如果在消费者初始化期间配置的队列中只有一部分可用,则消费者将从这些队列开始消费。消费者尝试使用此间隔被动地声明丢失的队列。当此间隔过去时,将再次使用 'declarationRetries' 和 'failedDeclarationRetryInterval'。如果仍然有丢失的队列,则消费者将再次等待此间隔,然后再尝试。此过程将无限期地持续下去,直到所有队列都可用为止。默认值:60000(一分钟)。

tickmark

shutdownTimeout
(N/A)

当容器关闭(例如,如果其包含的 ApplicationContext 关闭)时,它将等待正在处理的消息完成,最长等待时间为该限制。默认值为五秒。

tickmark
tickmark

startConsumerMinInterval
(最小启动间隔)

每个新消费者按需启动之前必须经过的时间(毫秒)。参见 监听器并发。默认值:10000(10 秒)。

tickmark

statefulRetryFatal
WithNullMessageId (N/A)

使用有状态重试建议时,如果收到缺少 messageId 属性的消息,默认情况下,该消息对于消费者来说是致命的(消费者将停止)。将此设置为 false 以丢弃(或路由到死信队列)此类消息。

tickmark
tickmark

stopConsumerMinInterval
(最小停止间隔)

在检测到空闲消费者后,从最后一个消费者停止到消费者停止之前必须经过的时间(毫秒)。参见 监听器并发。默认值:60000(一分钟)。

tickmark

streamConverter
(N/A)

一个 StreamMessageConverter,用于将本机 Stream 消息转换为 Spring AMQP 消息。

tickmark

taskExecutor
(任务执行器)

对 Spring TaskExecutor(或标准 JDK 1.5+ Executor)的引用,用于执行监听器调用器。默认情况下,使用内部管理的线程的 SimpleAsyncTaskExecutor

tickmark
tickmark

taskScheduler
(任务调度器)

使用 DMLC 时,用于在“monitorInterval”处运行监控任务的调度器。

tickmark

transactionManager
(事务管理器)

监听器操作的外部事务管理器。也与 channelTransacted 相辅相成——如果 Channel 是事务性的,则其事务与外部事务同步。

tickmark
tickmark