线程和异步消费者

异步消费者涉及多个线程。

RabbitMQ Client传递新消息时,SimpleMessageListenerContainer中配置的TaskExecutor中的线程用于调用MessageListener。 如果未配置,则使用SimpleAsyncTaskExecutor。 如果使用的是池化执行器,则需要确保池大小足以处理配置的并发性。 使用DirectMessageListenerContainer时,MessageListener直接在RabbitMQ Client线程上调用。 在这种情况下,taskExecutor用于监视消费者的任务。

当使用默认的 SimpleAsyncTaskExecutor 时,监听器调用的线程的 threadNamePrefix 将使用监听器容器的 beanName。这对于日志分析很有用。我们通常建议始终在日志追加器配置中包含线程名称。当通过容器上的 taskExecutor 属性专门提供 TaskExecutor 时,它将按原样使用,不会修改。建议您使用类似的技术来命名自定义 TaskExecutor bean 定义创建的线程,以帮助在日志消息中识别线程。

CachingConnectionFactory 中配置的 Executor 在创建连接时传递给 RabbitMQ 客户端,其线程用于将新消息传递给监听器容器。如果未配置,客户端将使用内部线程池执行器,该执行器(在撰写本文时)为每个连接使用 Runtime.getRuntime().availableProcessors() * 2 的池大小。

如果您有大量工厂或使用 CacheMode.CONNECTION,您可能希望考虑使用共享的 ThreadPoolTaskExecutor,该执行器具有足够的线程来满足您的工作负载。

使用 DirectMessageListenerContainer 时,您需要确保连接工厂配置了具有足够线程的任务执行器,以支持所有使用该工厂的监听器容器的所需并发性。默认池大小(在撰写本文时)为 Runtime.getRuntime().availableProcessors() * 2

RabbitMQ 客户端 使用 ThreadFactory 为低级 I/O(套接字)操作创建线程。要修改此工厂,您需要配置底层的 RabbitMQ ConnectionFactory,如 配置底层客户端连接工厂 中所述。