监听器并发

SimpleMessageListenerContainer

默认情况下,监听器容器启动一个单一的消费者,从队列接收消息。

在查看上一节中的表格时,您可以看到许多控制并发性的属性和特性。最简单的是 concurrentConsumers,它创建指定数量的消费者,这些消费者并发地处理消息。

在 1.3.0 版本之前,这是唯一可用的设置,并且容器必须停止并重新启动才能更改设置。

从 1.3.0 版本开始,您可以动态调整 `concurrentConsumers` 属性。如果在容器运行时更改此属性,将根据需要添加或删除消费者以适应新的设置。

此外,添加了一个名为 `maxConcurrentConsumers` 的新属性,容器会根据工作负载动态调整并发性。这与四个附加属性协同工作:`consecutiveActiveTrigger`、`startConsumerMinInterval`、`consecutiveIdleTrigger` 和 `stopConsumerMinInterval`。使用默认设置,增加消费者的算法如下:

如果尚未达到 `maxConcurrentConsumers`,并且现有消费者在连续十个周期内处于活动状态,并且自上次启动消费者以来已过去至少 10 秒,则会启动一个新的消费者。如果消费者在 `batchSize` * `receiveTimeout` 毫秒内至少收到一条消息,则该消费者被视为处于活动状态。

使用默认设置,减少消费者的算法如下:

如果运行的消费者数量超过 `concurrentConsumers`,并且一个消费者检测到十次连续超时(空闲),并且自上次停止消费者以来已过去至少 60 秒,则会停止一个消费者。超时时间取决于 `receiveTimeout` 和 `batchSize` 属性。如果消费者在 `batchSize` * `receiveTimeout` 毫秒内未收到任何消息,则该消费者被视为处于空闲状态。因此,使用默认超时时间(一秒)和 `batchSize` 为四,在 40 秒的空闲时间后(四次超时对应一次空闲检测)会停止消费者。

实际上,只有在整个容器空闲一段时间后才能停止消费者。这是因为代理在所有活动消费者之间共享其工作。

每个消费者使用一个通道,无论配置了多少个队列。

从 2.0 版本开始,可以使用 `concurrency` 属性设置 `concurrentConsumers` 和 `maxConcurrentConsumers` 属性,例如 `2-4`。

使用 `DirectMessageListenerContainer`

使用此容器,并发性基于配置的队列和 `consumersPerQueue`。每个队列的每个消费者使用一个单独的通道,并发性由 RabbitMQ 客户端库控制。默认情况下,在撰写本文时,它使用一个包含 `DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2` 个线程的线程池。

您可以配置一个 `taskExecutor` 来提供所需的最高并发性。