批量处理的 @RabbitListener

当接收一批消息时,拆批通常由容器执行,并且监听器每次只用一条消息被调用。从 2.2 版本开始,您可以配置监听器容器工厂和监听器以一次接收整个批次,只需设置工厂的 batchListener 属性,并将方法的有效负载参数设置为 ListCollection

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setBatchListener(true);
    return factory;
}

@RabbitListener(queues = "batch.1")
public void listen1(List<Thing> in) {
    ...
}

// or

@RabbitListener(queues = "batch.2")
public void listen2(List<Message<Thing>> in) {
    ...
}

batchListener 属性设置为 true 会自动关闭工厂创建的容器中的 deBatchingEnabled 容器属性(除非 consumerBatchEnabledtrue - 请参见下文)。实际上,拆批从容器移动到监听器适配器,并且适配器创建传递给监听器的列表。

启用批处理的工厂不能与多方法监听器一起使用。

同样从 2.2 版本开始,当一次接收一条批量消息时,最后一条消息包含一个设置为 true 的布尔标头。可以通过向监听器方法添加 @Header(AmqpHeaders.LAST_IN_BATCH) 布尔型 last 参数来获取此标头。标头从 MessageProperties.isLastInBatch() 映射而来。此外,AmqpHeaders.BATCH_SIZE 在每个消息片段中都填充了批次的的大小。

此外,SimpleMessageListenerContainer 中添加了一个新属性 consumerBatchEnabled。当此属性为 true 时,容器将创建最多 batchSize 条消息的批次;如果在没有收到新消息的情况下 receiveTimeout 到期,则会传递部分批次。如果接收到生产者创建的批次,则会将其拆批并添加到消费者端的批次中;因此,传递的消息的实际数量可能会超过 batchSizebatchSize 表示从代理接收到的消息数量。当 consumerBatchEnabled 为 true 时,deBatchingEnabled 必须为 true;容器工厂将强制执行此要求。

@Bean
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(rabbitConnectionFactory());
    factory.setConsumerTagStrategy(consumerTagStrategy());
    factory.setBatchListener(true); // configures a BatchMessageListenerAdapter
    factory.setBatchSize(2);
    factory.setConsumerBatchEnabled(true);
    return factory;
}

当将 consumerBatchEnabled@RabbitListener 一起使用时

@RabbitListener(queues = "batch.1", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch1(List<Message> amqpMessages) {
    ...
}

@RabbitListener(queues = "batch.2", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch2(List<org.springframework.messaging.Message<Invoice>> messages) {
    ...
}

@RabbitListener(queues = "batch.3", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch3(List<Invoice> strings) {
    ...
}
  • 第一个方法会使用接收到的原始、未转换的 org.springframework.amqp.core.Message 调用。

  • 第二个方法会使用带有转换后的有效负载和映射的标头/属性的 org.springframework.messaging.Message<?> 调用。

  • 第三个方法会使用转换后的有效负载调用,并且无法访问标头/属性。

您还可以添加一个 Channel 参数,通常在使用 MANUAL 确认模式时使用。对于第三个示例,这不太有用,因为您无法访问 delivery_tag 属性。

Spring Boot 为 consumerBatchEnabledbatchSize 提供了配置属性,但没有为 batchListener 提供配置属性。从 3.0 版本开始,在容器工厂上将 consumerBatchEnabled 设置为 true 也会将 batchListener 设置为 true。当 consumerBatchEnabledtrue 时,监听器**必须**是批处理监听器。

从 3.0 版本开始,监听器方法可以消费 Collection<?>List<?>