带批量处理的 @RabbitListener
当接收 一批 消息时,通常由容器执行分批处理,监听器一次接收一条消息。从版本 2.2 开始,您可以配置监听器容器工厂和监听器以在一个调用中接收整个批次,只需设置工厂的 batchListener 属性,并将方法有效负载参数设置为 List 或 Collection。
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setBatchListener(true);
return factory;
}
@RabbitListener(queues = "batch.1")
public void listen1(List<Thing> in) {
...
}
// or
@RabbitListener(queues = "batch.2")
public void listen2(List<Message<Thing>> in) {
...
}
将 batchListener 属性设置为 true 会自动关闭工厂创建的容器中的 deBatchingEnabled 容器属性(除非 consumerBatchEnabled 为 true - 参见下文)。实际上,分批处理从容器转移到监听器适配器,适配器创建传递给监听器的列表。
批处理启用的工厂不能与 多方法监听器 一起使用。
同样从版本 2.2 开始,当一次接收一批消息时,最后一条消息包含一个布尔头,设置为 true。可以通过将 @Header(AmqpHeaders.LAST_IN_BATCH) 布尔值 last 参数添加到您的监听器方法中来获取此头。该头从 MessageProperties.isLastInBatch() 映射。此外,AmqpHeaders.BATCH_SIZE 在每个消息片段中都填充了批次的大小。
此外,SimpleMessageListenerContainer 添加了一个新属性 consumerBatchEnabled。当此属性为 true 时,容器将创建一批消息,最多达到 batchSize;如果在没有新消息到达的情况下 receiveTimeout 超时,则会交付部分批次。如果收到生产者创建的批次,则会将其分批并添加到消费者端批次中;因此,交付的实际消息数量可能超过 batchSize,batchSize 表示从代理接收到的消息数量。当 consumerBatchEnabled 为 true 时,deBatchingEnabled 必须为 true;容器工厂将强制执行此要求。
@Bean
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setConsumerTagStrategy(consumerTagStrategy());
factory.setBatchListener(true); // configures a BatchMessageListenerAdapter
factory.setBatchSize(2);
factory.setConsumerBatchEnabled(true);
return factory;
}
当 consumerBatchEnabled 与 @RabbitListener 一起使用时
@RabbitListener(queues = "batch.1", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch1(List<Message> amqpMessages) {
...
}
@RabbitListener(queues = "batch.2", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch2(List<org.springframework.messaging.Message<Invoice>> messages) {
...
}
@RabbitListener(queues = "batch.3", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch3(List<Invoice> strings) {
...
}
-
第一个方法使用接收到的原始、未转换的
org.springframework.amqp.core.Message调用。 -
第二个方法使用带有转换后的有效负载和映射的头/属性的
org.springframework.messaging.Message<?>调用。 -
第三个方法使用转换后的有效负载调用,无法访问头/属性。
您还可以添加一个 Channel 参数,这在使用 MANUAL 确认模式时经常使用。这在第三个示例中不太有用,因为您无法访问 delivery_tag 属性。
Spring Boot 为 consumerBatchEnabled 和 batchSize 提供了配置属性,但没有为 batchListener 提供。从版本 3.0 开始,在容器工厂上将 consumerBatchEnabled 设置为 true 也会将 batchListener 设置为 true。当 consumerBatchEnabled 为 true 时,监听器必须是批处理监听器。
从版本 3.0 开始,监听器方法可以消费 Collection<?> 或 List<?>。
| 批量模式下的监听器不支持回复,因为批次中的消息与产生的单个回复之间可能没有关联。异步返回类型 仍然受批处理监听器支持。 |