手动启动 Kafka Streams 处理器
Spring Cloud Stream Kafka Streams Binder 在 Spring for Apache Kafka 的 `StreamsBuilderFactoryBean` 之上提供了一个名为 `StreamsBuilderFactoryManager` 的抽象。此管理器 API 用于控制基于 Binder 的应用程序中每个处理器中的多个 `StreamsBuilderFactoryBean`。因此,在使用 Binder 时,如果您想手动控制应用程序中各种 `StreamsBuilderFactoryBean` 对象的自动启动,则需要使用 `StreamsBuilderFactoryManager`。您可以使用属性 `spring.kafka.streams.auto-startup` 并将其设置为 `false` 以关闭处理器的自动启动。然后,在应用程序中,您可以使用如下所示的方法来使用 `StreamsBuilderFactoryManager` 启动处理器。
@Bean
public ApplicationRunner runner(StreamsBuilderFactoryManager sbfm) {
return args -> {
sbfm.start();
};
}
当您希望应用程序在主线程中启动并让 Kafka Streams 处理器单独启动时,此功能非常有用。例如,当您有一个需要恢复的大型状态存储时,如果处理器像默认情况下那样正常启动,这可能会阻止您的应用程序启动。如果您正在使用某种存活探测机制(例如在 Kubernetes 上),它可能会认为应用程序已关闭并尝试重新启动。为了纠正这一点,您可以将 `spring.kafka.streams.auto-startup` 设置为 `false` 并遵循上述方法。
请记住,在使用 Spring Cloud Stream Binder 时,您不会直接处理来自 Spring for Apache Kafka 的 `StreamsBuilderFactoryBean`,而是 `StreamsBuilderFactoryManager`,因为 `StreamsBuilderFactoryBean` 对象由 Binder 内部管理。