线程和异步消费者

异步消费者涉及许多不同的线程。

RabbitMQ 客户端传递新消息时,SimpleMessageListenerContainer中配置的TaskExecutor的线程用于调用MessageListener。如果未配置,则使用SimpleAsyncTaskExecutor。如果您使用的是池化执行器,则需要确保池大小足以处理配置的并发性。使用DirectMessageListenerContainer时,MessageListener会直接在RabbitMQ 客户端线程上调用。在这种情况下,taskExecutor用于监视消费者的任务。

当使用默认的SimpleAsyncTaskExecutor时,对于监听器调用的线程,监听器容器的beanName将用于threadNamePrefix。这对于日志分析很有用。我们通常建议始终在日志附加程序配置中包含线程名称。当通过容器上的taskExecutor属性专门提供TaskExecutor时,它将按原样使用,无需修改。建议您使用类似的技术来命名自定义TaskExecutor bean 定义创建的线程,以帮助在日志消息中识别线程。

在创建连接时,CachingConnectionFactory中配置的Executor将传递到RabbitMQ 客户端,并且其线程用于将新消息传递到监听器容器。如果未配置,则客户端将使用内部线程池执行器,每个连接的池大小(在撰写本文时)为Runtime.getRuntime().availableProcessors() * 2

如果您有大量工厂或使用CacheMode.CONNECTION,则可能需要考虑使用具有足够线程的共享ThreadPoolTaskExecutor来满足您的工作负载。

使用DirectMessageListenerContainer时,您需要确保连接工厂配置了具有足够线程的任务执行器,以支持使用该工厂的所有监听器容器所需的并发性。默认池大小(在撰写本文时)为Runtime.getRuntime().availableProcessors() * 2

RabbitMQ 客户端使用ThreadFactory为低级 I/O(套接字)操作创建线程。要修改此工厂,您需要配置底层的 RabbitMQ ConnectionFactory,如配置底层客户端连接工厂中所述。