暂停和恢复监听器容器

版本 2.1.3 向监听器容器添加了 pause()resume() 方法。以前,您可以在 ConsumerAwareMessageListener 中暂停使用者,并通过侦听 ListenerContainerIdleEvent 来恢复它,该事件提供对 Consumer 对象的访问。虽然您可以使用事件监听器在空闲容器中暂停使用者,但在某些情况下,这不是线程安全的,因为无法保证事件监听器是在使用者线程上调用的。为了安全地暂停和恢复使用者,您应该使用监听器容器上的 pauseresume 方法。pause() 在下一次 poll() 之前生效;resume() 在当前 poll() 返回后生效。当容器暂停时,它继续 poll() 使用者,避免在使用组管理时重新平衡,但它不检索任何记录。有关更多信息,请参阅 Kafka 文档。

从版本 2.1.5 开始,您可以调用 isPauseRequested() 来查看是否已调用 pause()。但是,使用者可能尚未真正暂停。isConsumerPaused() 如果所有 Consumer 实例实际上都已暂停,则返回 true。

此外(同样从 2.1.5 开始),ConsumerPausedEventConsumerResumedEvent 实例会发布,其中容器作为 source 属性,涉及到的 TopicPartition 实例作为 partitions 属性。

从版本 2.9 开始,一个新的容器属性 pauseImmediate,当设置为 true 时,会导致暂停在当前记录处理完成后生效。默认情况下,暂停在处理完上一次轮询的所有记录后生效。请参阅 pauseImmediate

以下简单的 Spring Boot 应用程序通过使用容器注册表获取对 @KafkaListener 方法的容器的引用,并暂停或恢复其使用者以及接收相应的事件来演示

@SpringBootApplication
public class Application implements ApplicationListener<KafkaEvent> {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args).close();
    }

    @Override
    public void onApplicationEvent(KafkaEvent event) {
        System.out.println(event);
    }

    @Bean
    public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
            KafkaTemplate<String, String> template) {
        return args -> {
            template.send("pause.resume.topic", "thing1");
            Thread.sleep(10_000);
            System.out.println("pausing");
            registry.getListenerContainer("pause.resume").pause();
            Thread.sleep(10_000);
            template.send("pause.resume.topic", "thing2");
            Thread.sleep(10_000);
            System.out.println("resuming");
            registry.getListenerContainer("pause.resume").resume();
            Thread.sleep(10_000);
        };
    }

    @KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("pause.resume.topic")
            .partitions(2)
            .replicas(1)
            .build();
    }

}

以下清单显示了前面示例的结果

partitions assigned: [pause.resume.topic-1, pause.resume.topic-0]
thing1
pausing
ConsumerPausedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
resuming
ConsumerResumedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
thing2