@KafkaListener
生命周期管理
为@KafkaListener
注解创建的监听器容器不是应用程序上下文中的 Bean。相反,它们是在类型为KafkaListenerEndpointRegistry
的基础设施 Bean 中注册的。此 Bean 由框架自动声明,并管理容器的生命周期;它会自动启动所有autoStartup
设置为true
的容器。所有容器工厂创建的所有容器必须处于相同的phase
。有关更多信息,请参见监听器容器自动启动。您可以使用注册表以编程方式管理生命周期。启动或停止注册表将启动或停止所有注册的容器。或者,您可以使用其id
属性获取对单个容器的引用。您可以在注解上设置autoStartup
,它会覆盖配置到容器工厂中的默认设置。您可以从应用程序上下文(例如自动装配)获取对 Bean 的引用,以管理其注册的容器。以下示例展示了如何执行此操作
@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
public void listen(...) { ... }
@Autowired
private KafkaListenerEndpointRegistry registry;
...
this.registry.getListenerContainer("myContainer").start();
...
注册表仅维护其管理的容器的生命周期;声明为 Bean 的容器不受注册表管理,可以从应用程序上下文获取。可以通过调用注册表的getListenerContainers()
方法获取已管理容器的集合。版本 2.2.5 添加了一个便捷方法getAllListenerContainers()
,它返回所有容器的集合,包括由注册表管理的容器和声明为 Bean 的容器。返回的集合将包括已初始化的任何原型 Bean,但不会初始化任何延迟 Bean 声明。
在应用程序上下文刷新后注册的端点将立即启动,无论其autoStartup 属性如何,以符合SmartLifecycle 契约,其中autoStartup 仅在应用程序上下文初始化期间考虑。延迟注册的一个示例是具有原型范围的@KafkaListener 的 Bean,其中实例在上下文初始化后创建。从版本 2.8.7 开始,您可以将注册表的alwaysStartAfterRefresh 属性设置为false ,然后容器的autoStartup 属性将定义容器是否启动。
|
从 KafkaListenerEndpointRegistry 获取 MessageListenerContainer
KafkaListenerEndpointRegistry
提供用于检索MessageListenerContainer
实例的方法,以适应各种管理场景
所有容器:对于涵盖所有监听器容器的操作,请使用 getListenerContainers()
获取完整的集合。
Collection<MessageListenerContainer> allContainers = registry.getListenerContainers();
通过 ID 获取特定容器:要管理单个容器,getListenerContainer(String id)
允许通过其 ID 进行检索。
MessageListenerContainer specificContainer = registry.getListenerContainer("myContainerId");
动态容器过滤:从 3.2 版本开始,两个重载的 getListenerContainersMatching
方法允许对容器进行更精细的选择。一种方法使用 Predicate<String>
作为参数进行基于 ID 的过滤,而另一种方法使用 BiPredicate<String, MessageListenerContainer>
作为参数进行更高级的条件,这些条件可能包括容器属性或状态。
// Prefix matching (Predicate<String>)
Collection<MessageListenerContainer> filteredContainers =
registry.getListenerContainersMatching(id -> id.startsWith("productListener-retry-"));
// Regex matching (Predicate<String>)
Collection<MessageListenerContainer> regexFilteredContainers =
registry.getListenerContainersMatching(myPattern::matches);
// Pre-built Set of IDs (Predicate<String>)
Collection<MessageListenerContainer> setFilteredContainers =
registry.getListenerContainersMatching(myIdSet::contains);
// Advanced Filtering: ID prefix and running state (BiPredicate<String, MessageListenerContainer>)
Collection<MessageListenerContainer> advancedFilteredContainers =
registry.getListenerContainersMatching(
(id, container) -> id.startsWith("specificPrefix-") && container.isRunning()
);
使用这些方法可以有效地管理和查询应用程序中的 MessageListenerContainer
实例。