监听器并发性

SimpleMessageListenerContainer

默认情况下,监听器容器启动单个消费者,该消费者从队列接收消息。

在查看上一节中的表格时,您可以看到许多控制并发的属性和特性。最简单的是 `concurrentConsumers`,它创建该(固定)数量的并发处理消息的消费者。

在 1.3.0 版本之前,这是唯一可用的设置,并且必须停止并重新启动容器才能更改该设置。

从 1.3.0 版本开始,您现在可以动态调整 `concurrentConsumers` 属性。如果在容器运行时更改它,则会根据需要添加或删除消费者以适应新的设置。

此外,添加了一个名为 `maxConcurrentConsumers` 的新属性,并且容器会根据工作负载动态调整并发性。这与四个附加属性一起使用:`consecutiveActiveTrigger`、`startConsumerMinInterval`、`consecutiveIdleTrigger` 和 `stopConsumerMinInterval`。使用默认设置,增加消费者的算法如下:

如果尚未达到 `maxConcurrentConsumers`,并且现有消费者连续十个周期处于活动状态,并且自上次启动消费者以来至少经过 10 秒,则会启动一个新消费者。如果消费者在 `batchSize` * `receiveTimeout` 毫秒内至少收到一条消息,则该消费者被认为是活动的。

使用默认设置,减少消费者的算法如下:

如果有超过 `concurrentConsumers` 个消费者正在运行,并且某个消费者检测到十个连续超时(空闲),并且上次停止消费者至少 60 秒前,则会停止一个消费者。超时取决于 `receiveTimeout` 和 `batchSize` 属性。如果消费者在 `batchSize` * `receiveTimeout` 毫秒内未收到任何消息,则该消费者被认为是空闲的。因此,使用默认超时(一秒)和 `batchSize` 为四,在 40 秒空闲时间后(四个超时对应一个空闲检测)将停止消费者。

实际上,只有在整个容器空闲一段时间后才能停止消费者。这是因为代理在所有活动消费者之间共享其工作。

每个消费者使用单个通道,无论配置了多少个队列。

从 2.0 版本开始,可以使用 `concurrency` 属性设置 `concurrentConsumers` 和 `maxConcurrentConsumers` 属性,例如 `2-4`。

使用 `DirectMessageListenerContainer`

对于此容器,并发性基于配置的队列和 `consumersPerQueue`。每个队列的每个消费者都使用一个单独的通道,并发性由 rabbit 客户端库控制。默认情况下,在撰写本文时,它使用 `DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2` 线程的池。

您可以配置一个 `taskExecutor` 来提供所需的最大并发性。