暂停和恢复监听器容器
版本 2.1.3 为监听器容器添加了 pause()
和 resume()
方法。以前,您可以在 ConsumerAwareMessageListener
中暂停消费者,并通过监听 ListenerContainerIdleEvent
来恢复它,该事件提供对 Consumer
对象的访问。虽然您可以在空闲容器中使用事件监听器来暂停消费者,但在某些情况下,这并不线程安全,因为无法保证事件监听器是在消费者线程上调用的。为了安全地暂停和恢复消费者,您应该使用监听器容器上的 pause
和 resume
方法。pause()
在下一次 poll()
之前生效;resume()
在当前 poll()
返回后生效。当容器被暂停时,它会继续 poll()
消费者,如果使用组管理,则避免重新平衡,但它不会检索任何记录。有关更多信息,请参阅 Kafka 文档。
从版本 2.1.5 开始,您可以调用 isPauseRequested()
来查看是否已调用 pause()
。但是,消费者可能尚未实际暂停。isConsumerPaused()
如果所有 Consumer
实例都已实际暂停,则返回 true。
此外(自 2.1.5 版本起),ConsumerPausedEvent
和 ConsumerResumedEvent
实例会以容器作为 source
属性,以及涉及的 TopicPartition
实例作为 partitions
属性发布。
从 2.9 版本开始,一个新的容器属性 pauseImmediate
,当设置为 true 时,会导致暂停在当前记录处理完后生效。默认情况下,暂停在所有来自先前轮询的记录处理完后生效。请参阅 pauseImmediate。
以下简单的 Spring Boot 应用程序演示了如何使用容器注册表获取对 @KafkaListener
方法容器的引用,并暂停或恢复其消费者,以及接收相应的事件。
@SpringBootApplication
public class Application implements ApplicationListener<KafkaEvent> {
public static void main(String[] args) {
SpringApplication.run(Application.class, args).close();
}
@Override
public void onApplicationEvent(KafkaEvent event) {
System.out.println(event);
}
@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
KafkaTemplate<String, String> template) {
return args -> {
template.send("pause.resume.topic", "thing1");
Thread.sleep(10_000);
System.out.println("pausing");
registry.getListenerContainer("pause.resume").pause();
Thread.sleep(10_000);
template.send("pause.resume.topic", "thing2");
Thread.sleep(10_000);
System.out.println("resuming");
registry.getListenerContainer("pause.resume").resume();
Thread.sleep(10_000);
};
}
@KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
public void listen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("pause.resume.topic")
.partitions(2)
.replicas(1)
.build();
}
}
以下清单显示了前面示例的结果。
partitions assigned: [pause.resume.topic-1, pause.resume.topic-0]
thing1
pausing
ConsumerPausedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
resuming
ConsumerResumedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
thing2