@RabbitListener 与批处理

当接收一批消息时,解批处理通常由容器执行,并且监听器每次只调用一条消息。从 2.2 版开始,您可以配置监听器容器工厂和监听器以一次调用接收整个批次,只需设置工厂的batchListener属性,并将方法有效负载参数设为ListCollection

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setBatchListener(true);
    return factory;
}

@RabbitListener(queues = "batch.1")
public void listen1(List<Thing> in) {
    ...
}

// or

@RabbitListener(queues = "batch.2")
public void listen2(List<Message<Thing>> in) {
    ...
}

batchListener属性设置为 true 会自动关闭工厂创建的容器中的deBatchingEnabled容器属性(除非consumerBatchEnabledtrue - 参见下文)。实际上,解批处理从容器移到了监听器适配器,并且适配器创建传递给监听器的列表。

启用批次的工厂不能与多方法监听器一起使用。

同样从 2.2 版开始,当一次接收一批消息时,最后一条消息包含一个设置为true的布尔头。可以通过将@Header(AmqpHeaders.LAST_IN_BATCH)布尔 last`参数添加到您的监听器方法来获取此头。该头从MessageProperties.isLastInBatch()映射而来。此外,AmqpHeaders.BATCH_SIZE在每个消息片段中都填充了批次的尺寸。

此外,SimpleMessageListenerContainer中添加了一个新属性consumerBatchEnabled。当此属性为 true 时,容器将创建最多batchSize条消息的批次;如果receiveTimeout到期且没有新消息到达,则会传递部分批次。如果收到生产者创建的批次,则会将其解批处理并添加到消费者端批次;因此,传递的实际消息数量可能超过batchSizebatchSize代表从代理接收的消息数量。当consumerBatchEnabled为 true 时,deBatchingEnabled必须为 true;容器工厂将强制执行此要求。

@Bean
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(rabbitConnectionFactory());
    factory.setConsumerTagStrategy(consumerTagStrategy());
    factory.setBatchListener(true); // configures a BatchMessageListenerAdapter
    factory.setBatchSize(2);
    factory.setConsumerBatchEnabled(true);
    return factory;
}

当使用consumerBatchEnabled@RabbitListener

@RabbitListener(queues = "batch.1", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch1(List<Message> amqpMessages) {
    ...
}

@RabbitListener(queues = "batch.2", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch2(List<org.springframework.messaging.Message<Invoice>> messages) {
    ...
}

@RabbitListener(queues = "batch.3", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch3(List<Invoice> strings) {
    ...
}
  • 第一个调用接收到的原始未转换的org.springframework.amqp.core.Message

  • 第二个调用带有转换后的有效负载和映射的头/属性的org.springframework.messaging.Message<?>

  • 第三个调用带有转换后的有效负载,但无法访问头/属性。

您还可以添加一个Channel参数,通常在使用MANUAL确认模式时使用。对于第三个示例,这不太有用,因为您无法访问delivery_tag属性。

Spring Boot 为consumerBatchEnabledbatchSize提供了一个配置属性,但没有为batchListener提供。从 3.0 版开始,在容器工厂上将consumerBatchEnabled设置为true也会将batchListener设置为true。当consumerBatchEnabledtrue时,监听器必须是批次监听器。

从 3.0 版开始,监听器方法可以消费Collection<?>List<?>