@KafkaListener 生命周期管理

为@KafkaListener 注解创建的监听器容器不是应用程序上下文中的 bean。相反,它们是用类型为KafkaListenerEndpointRegistry的基础架构 bean注册的。此 bean 由框架自动声明并管理容器的生命周期;它将自动启动autoStartup设置为true的任何容器。所有容器工厂创建的所有容器必须处于相同的phase中。有关更多信息,请参见监听器容器自动启动。您可以使用注册表以编程方式管理生命周期。启动或停止注册表将启动或停止所有已注册的容器。或者,您可以使用其id属性获取对单个容器的引用。您可以设置注解上的autoStartup,这将覆盖配置到容器工厂中的默认设置。您可以从应用程序上下文(例如自动装配)获取对 bean 的引用,以管理其已注册的容器。以下示例说明了如何操作

@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
public void listen(...) { ... }
@Autowired
private KafkaListenerEndpointRegistry registry;

...

    this.registry.getListenerContainer("myContainer").start();

...

注册表仅维护其管理的容器的生命周期;声明为 bean 的容器不受注册表管理,可以从应用程序上下文获取。可以通过调用注册表的getListenerContainers()方法获取已管理容器的集合。2.2.5 版本添加了一个便捷方法getAllListenerContainers(),它返回所有容器的集合,包括由注册表管理的容器和声明为 bean 的容器。返回的集合将包括任何已初始化的原型 bean,但不会初始化任何延迟 bean 声明。

在应用程序上下文刷新后注册的端点将立即启动,无论其autoStartup属性如何,这符合SmartLifecycle契约,其中autoStartup仅在应用程序上下文初始化期间考虑。延迟注册的一个示例是具有原型范围的@KafkaListener的 bean,其中在上下文初始化后创建了一个实例。从 2.8.7 版本开始,您可以将注册表的alwaysStartAfterRefresh属性设置为false,然后容器的autoStartup属性将定义是否启动容器。

从 KafkaListenerEndpointRegistry 获取 MessageListenerContainers

KafkaListenerEndpointRegistry 提供了用于检索MessageListenerContainer实例的方法,以适应各种管理场景

**所有容器**: 对于涵盖所有监听器容器的操作,请使用getListenerContainers()检索完整的集合。

Collection<MessageListenerContainer> allContainers = registry.getListenerContainers();

**按 ID 指定容器**: 要管理单个容器,可以使用getListenerContainer(String id)按其 ID 进行检索。

MessageListenerContainer specificContainer = registry.getListenerContainer("myContainerId");

**动态容器过滤**: 3.2 版本中引入了两个重载的getListenerContainersMatching方法,可以精确选择容器。一种方法采用Predicate<String>作为参数进行基于 ID 的过滤,另一种方法采用BiPredicate<String, MessageListenerContainer>进行更高级的条件判断,该条件可能包括容器属性或状态作为参数。

// Prefix matching (Predicate<String>)
Collection<MessageListenerContainer> filteredContainers =
    registry.getListenerContainersMatching(id -> id.startsWith("productListener-retry-"));

// Regex matching (Predicate<String>)
Collection<MessageListenerContainer> regexFilteredContainers =
    registry.getListenerContainersMatching(myPattern::matches);

// Pre-built Set of IDs (Predicate<String>)
Collection<MessageListenerContainer> setFilteredContainers =
    registry.getListenerContainersMatching(myIdSet::contains);

// Advanced Filtering: ID prefix and running state (BiPredicate<String, MessageListenerContainer>)
Collection<MessageListenerContainer> advancedFilteredContainers =
    registry.getListenerContainersMatching(
        (id, container) -> id.startsWith("specificPrefix-") && container.isRunning()
    );

利用这些方法可以有效地管理和查询应用程序中的MessageListenerContainer实例。