@RabbitListener 与批处理
当接收一批消息时,解批处理通常由容器执行,并且监听器每次只调用一条消息。从 2.2 版开始,您可以配置监听器容器工厂和监听器以一次调用接收整个批次,只需设置工厂的batchListener
属性,并将方法有效负载参数设为List
或Collection
。
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setBatchListener(true);
return factory;
}
@RabbitListener(queues = "batch.1")
public void listen1(List<Thing> in) {
...
}
// or
@RabbitListener(queues = "batch.2")
public void listen2(List<Message<Thing>> in) {
...
}
将batchListener
属性设置为 true 会自动关闭工厂创建的容器中的deBatchingEnabled
容器属性(除非consumerBatchEnabled
为true
- 参见下文)。实际上,解批处理从容器移到了监听器适配器,并且适配器创建传递给监听器的列表。
启用批次的工厂不能与多方法监听器一起使用。
同样从 2.2 版开始,当一次接收一批消息时,最后一条消息包含一个设置为true
的布尔头。可以通过将@Header(AmqpHeaders.LAST_IN_BATCH)
布尔 last`参数添加到您的监听器方法来获取此头。该头从MessageProperties.isLastInBatch()
映射而来。此外,AmqpHeaders.BATCH_SIZE
在每个消息片段中都填充了批次的尺寸。
此外,SimpleMessageListenerContainer
中添加了一个新属性consumerBatchEnabled
。当此属性为 true 时,容器将创建最多batchSize
条消息的批次;如果receiveTimeout
到期且没有新消息到达,则会传递部分批次。如果收到生产者创建的批次,则会将其解批处理并添加到消费者端批次;因此,传递的实际消息数量可能超过batchSize
,batchSize
代表从代理接收的消息数量。当consumerBatchEnabled
为 true 时,deBatchingEnabled
必须为 true;容器工厂将强制执行此要求。
@Bean
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setConsumerTagStrategy(consumerTagStrategy());
factory.setBatchListener(true); // configures a BatchMessageListenerAdapter
factory.setBatchSize(2);
factory.setConsumerBatchEnabled(true);
return factory;
}
当使用consumerBatchEnabled
与@RabbitListener
时
@RabbitListener(queues = "batch.1", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch1(List<Message> amqpMessages) {
...
}
@RabbitListener(queues = "batch.2", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch2(List<org.springframework.messaging.Message<Invoice>> messages) {
...
}
@RabbitListener(queues = "batch.3", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch3(List<Invoice> strings) {
...
}
-
第一个调用接收到的原始未转换的
org.springframework.amqp.core.Message
。 -
第二个调用带有转换后的有效负载和映射的头/属性的
org.springframework.messaging.Message<?>
。 -
第三个调用带有转换后的有效负载,但无法访问头/属性。
您还可以添加一个Channel
参数,通常在使用MANUAL
确认模式时使用。对于第三个示例,这不太有用,因为您无法访问delivery_tag
属性。
Spring Boot 为consumerBatchEnabled
和batchSize
提供了一个配置属性,但没有为batchListener
提供。从 3.0 版开始,在容器工厂上将consumerBatchEnabled
设置为true
也会将batchListener
设置为true
。当consumerBatchEnabled
为true
时,监听器必须是批次监听器。
从 3.0 版开始,监听器方法可以消费Collection<?>
或List<?>
。