按顺序启动 @KafkaListeners

一种常见的用例是在一个监听器消费完主题中的所有记录后启动另一个监听器。例如,您可能希望在处理来自其他主题的记录之前,将一个或多个压缩主题的内容加载到内存中。从版本2.7.3开始,引入了一个新的组件ContainerGroupSequencer。它使用@KafkaListenercontainerGroup属性将容器分组在一起,并在当前组中的所有容器都空闲后启动下一个组中的容器。

通过一个例子可以更好地说明。

@KafkaListener(id = "listen1", topics = "topic1", containerGroup = "g1", concurrency = "2")
public void listen1(String in) {
}

@KafkaListener(id = "listen2", topics = "topic2", containerGroup = "g1", concurrency = "2")
public void listen2(String in) {
}

@KafkaListener(id = "listen3", topics = "topic3", containerGroup = "g2", concurrency = "2")
public void listen3(String in) {
}

@KafkaListener(id = "listen4", topics = "topic4", containerGroup = "g2", concurrency = "2")
public void listen4(String in) {
}

@Bean
ContainerGroupSequencer sequencer(KafkaListenerEndpointRegistry registry) {
    return new ContainerGroupSequencer(registry, 5000, "g1", "g2");
}

这里,我们在两个组g1g2中有4个监听器。

在应用程序上下文初始化期间,序列器将所提供组中所有容器的autoStartup属性设置为false。它还为任何容器(如果尚未设置)设置idleEventInterval为提供的值(在本例中为5000ms)。然后,当序列器由应用程序上下文启动时,将启动第一个组中的容器。当接收到ListenerContainerIdleEvent时,每个容器中的每个单独的子容器都会停止。当ConcurrentMessageListenerContainer中的所有子容器都停止时,父容器停止。当一个组中的所有容器都停止后,将启动下一个组中的容器。组的数量或组中容器的数量没有限制。

默认情况下,最终组(上面是g2)中的容器在空闲时不会停止。要修改该行为,请在序列器上将stopLastGroupWhenIdle设置为true

另外,之前每个组中的容器都被添加到类型为Collection<MessageListenerContainer>的bean中,bean名称为containerGroup。这些集合现在已弃用,取而代之的是类型为ContainerGroup的bean,其bean名称是组名,后缀为.group;在上面的示例中,将有两个bean g1.groupg2.groupCollection bean将在未来的版本中移除。

© . This site is unofficial and not affiliated with VMware.