Apache Kafka Streams 支持

从 1.1.4 版本开始,Spring for Apache Kafka 为 Kafka Streams 提供了一流的支持。要在 Spring 应用程序中使用它,必须在类路径中存在 kafka-streams jar 包。它是 Spring for Apache Kafka 项目的可选依赖项,不会被传递性下载。

基础

Apache Kafka Streams 参考文档建议以下使用 API 的方式

// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.

StreamsBuilder builder = ...;  // when using the Kafka Streams DSL

// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;

KafkaStreams streams = new KafkaStreams(builder, config);

// Start the Kafka Streams instance
streams.start();

// Stop the Kafka Streams instance
streams.close();

因此,我们有两个主要组件

  • StreamsBuilder:具有用于构建 KStream(或 KTable)实例的 API。

  • KafkaStreams:用于管理这些实例的生命周期。

由单个 StreamsBuilderKafkaStreams 实例公开的所有 KStream 实例将同时启动和停止,即使它们具有不同的逻辑。换句话说,由 StreamsBuilder 定义的所有流都与单个生命周期控制绑定在一起。一旦 KafkaStreams 实例通过 streams.close() 关闭,它就不能重新启动。相反,必须创建一个新的 KafkaStreams 实例来重新启动流处理。

Spring 管理

为了简化从 Spring 应用程序上下文角度使用 Kafka Streams 并通过容器使用生命周期管理,Spring for Apache Kafka 引入了 StreamsBuilderFactoryBean。这是一个 AbstractFactoryBean 实现,用于将 StreamsBuilder 单例实例公开为 bean。以下示例创建了这样一个 bean

@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
从版本 2.2 开始,流配置现在以 KafkaStreamsConfiguration 对象的形式提供,而不是 StreamsConfig

StreamsBuilderFactoryBean 还实现了 SmartLifecycle 来管理内部 KafkaStreams 实例的生命周期。与 Kafka Streams API 类似,您必须在启动 KafkaStreams 之前定义 KStream 实例。这也适用于 Kafka Streams 的 Spring API。因此,当您在 StreamsBuilderFactoryBean 上使用默认的 autoStartup = true 时,您必须在应用程序上下文刷新之前在 StreamsBuilder 上声明 KStream 实例。例如,KStream 可以是常规的 bean 定义,而 Kafka Streams API 在没有任何影响的情况下使用。以下示例展示了如何做到这一点

@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
    KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
    // Fluent KStream API
    return stream;
}

如果您想手动控制生命周期(例如,通过某些条件停止和启动),您可以使用工厂 bean (&) 前缀 直接引用 StreamsBuilderFactoryBean bean。由于 StreamsBuilderFactoryBean 使用其内部 KafkaStreams 实例,因此可以安全地停止并再次启动它。每次 start() 时都会创建一个新的 KafkaStreams。如果您想分别控制 KStream 实例的生命周期,您也可以考虑使用不同的 StreamsBuilderFactoryBean 实例。

您还可以指定 KafkaStreams.StateListenerThread.UncaughtExceptionHandlerStateRestoreListener 选项在 StreamsBuilderFactoryBean 上,这些选项将委托给内部 KafkaStreams 实例。此外,除了在 StreamsBuilderFactoryBean 上间接设置这些选项之外,从版本 2.1.5 开始,您可以使用 KafkaStreamsCustomizer 回调接口来配置内部 KafkaStreams 实例。请注意,KafkaStreamsCustomizer 会覆盖 StreamsBuilderFactoryBean 提供的选项。如果您需要直接执行一些 KafkaStreams 操作,您可以使用 StreamsBuilderFactoryBean.getKafkaStreams() 访问该内部 KafkaStreams 实例。您可以按类型自动装配 StreamsBuilderFactoryBean bean,但您应该确保在 bean 定义中使用完整类型,如下面的示例所示

@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;

或者,如果您使用接口 bean 定义,您可以添加 @Qualifier 以按名称进行注入。以下示例展示了如何做到这一点

@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;

从版本 2.4.1 开始,工厂 bean 有一个新的属性 infrastructureCustomizer,其类型为 KafkaStreamsInfrastructureCustomizer;这允许在创建流之前自定义 StreamsBuilder(例如,添加状态存储)和/或 Topology

public interface KafkaStreamsInfrastructureCustomizer {

    void configureBuilder(StreamsBuilder builder);

    void configureTopology(Topology topology);

}

提供默认的无操作实现,以避免在不需要其中一种方法时必须实现两种方法。

提供了一个 CompositeKafkaStreamsInfrastructureCustomizer,用于需要应用多个自定义器的情况。

KafkaStreams Micrometer 支持

从 2.5.3 版本开始,您可以配置 KafkaStreamsMicrometerListener 以自动为工厂 Bean 管理的 KafkaStreams 对象注册 Micrometer 指标。

streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
        Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));

Streams JSON 序列化和反序列化

对于以 JSON 格式读取或写入主题或状态存储时数据的序列化和反序列化,Spring for Apache Kafka 提供了一个 JsonSerde 实现,它使用 JSON,并委托给 序列化、反序列化和消息转换 中描述的 JsonSerializerJsonDeserializerJsonSerde 实现通过其构造函数(目标类型或 ObjectMapper)提供相同的配置选项。在以下示例中,我们使用 JsonSerde 来序列化和反序列化 Kafka 流的 Cat 负载(JsonSerde 可以以类似的方式在需要实例的任何地方使用)。

stream.through(Serdes.Integer(), new JsonSerde<>(Cat.class), "cats");

从 2.3 版本开始,在以编程方式构建生产者/消费者工厂中使用的序列化器/反序列化器时,可以使用流畅的 API,这简化了配置。

stream.through(
    new JsonSerde<>(MyKeyType.class)
        .forKeys()
        .noTypeInfo(),
    new JsonSerde<>(MyValueType.class)
        .noTypeInfo(),
    "myTypes");

使用 KafkaStreamBrancher

KafkaStreamBrancher 类引入了在 KStream 之上构建条件分支的更便捷方式。

考虑以下不使用 KafkaStreamBrancher 的示例。

KStream<String, String>[] branches = builder.stream("source").branch(
        (key, value) -> value.contains("A"),
        (key, value) -> value.contains("B"),
        (key, value) -> true
);
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");

以下示例使用 KafkaStreamBrancher

new KafkaStreamBrancher<String, String>()
        .branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
        .branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
        //default branch should not necessarily be defined in the end of the chain!
        .defaultBranch(ks -> ks.to("C"))
        .onTopOf(builder.stream("source"));
        //onTopOf method returns the provided stream so we can continue with method chaining

配置

要配置 Kafka Streams 环境,StreamsBuilderFactoryBean 需要一个 KafkaStreamsConfiguration 实例。有关所有可能的选项,请参阅 Apache Kafka 文档

从 2.2 版本开始,流配置现在以 KafkaStreamsConfiguration 对象的形式提供,而不是以 StreamsConfig 的形式提供。

为了避免大多数情况下,尤其是开发微服务时,编写样板代码,Spring for Apache Kafka 提供了 `@EnableKafkaStreams` 注解,您应该将其放在 `@Configuration` 类上。您只需要声明一个名为 `defaultKafkaStreamsConfig` 的 `KafkaStreamsConfiguration` bean。一个名为 `defaultKafkaStreamsBuilder` 的 `StreamsBuilderFactoryBean` bean 会自动在应用程序上下文中声明。您也可以声明和使用任何其他 `StreamsBuilderFactoryBean` bean。您可以通过提供一个实现 `StreamsBuilderFactoryBeanConfigurer` 的 bean 来对该 bean 进行额外的自定义。如果有多个这样的 bean,它们将根据其 `Ordered.order` 属性应用。

清理和停止配置

当工厂停止时,会调用 `KafkaStreams.close()` 方法,并带两个参数

  • closeTimeout:等待线程关闭的时间(默认为 `DEFAULT_CLOSE_TIMEOUT`,设置为 10 秒)。可以使用 `StreamsBuilderFactoryBean.setCloseTimeout()` 进行配置。

  • leaveGroupOnClose:触发组中的消费者离开调用(默认为 `false`)。可以使用 `StreamsBuilderFactoryBean.setLeaveGroupOnClose()` 进行配置。

默认情况下,当工厂 bean 停止时,会调用 `KafkaStreams.cleanUp()` 方法。从版本 2.1.2 开始,工厂 bean 有额外的构造函数,接受一个 `CleanupConfig` 对象,该对象具有属性,允许您控制在 `start()` 或 `stop()` 期间是否调用 `cleanUp()` 方法,或者两者都不调用。从版本 2.7 开始,默认情况下永远不会清理本地状态。

头信息增强器

版本 3.0 添加了 `HeaderEnricherProcessor` 扩展,它是 `ContextualProcessor` 的扩展;提供与已弃用的 `HeaderEnricher` 相同的功能,该 `HeaderEnricher` 实现已弃用的 `Transformer` 接口。这可以用于在流处理中添加头信息;头信息的值是 SpEL 表达式;表达式评估的根对象具有 3 个属性

  • record - `org.apache.kafka.streams.processor.api.Record`(`key`、`value`、`timestamp`、`headers`)

  • key - 当前记录的键

  • value - 当前记录的值

  • context - ProcessorContext,允许访问当前记录的元数据

表达式必须返回一个 byte[]String(它将使用 UTF-8 转换为 byte[])。

要在流中使用 enricher

.process(() -> new HeaderEnricherProcessor(expressions))

处理器不会更改 keyvalue;它只是添加头信息。

每条记录都需要一个新的实例。
.process(() -> new HeaderEnricherProcessor<..., ...>(expressionMap))

以下是一个简单的示例,添加一个文字头信息和一个变量

Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("record.timestamp() + ' @' + record.offset()"));
ProcessorSupplier supplier = () -> new HeaderEnricher<String, String>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
        .process(() -> supplier)
        .to(OUTPUT);

MessagingProcessor

版本 3.0 添加了 MessagingProcessor,它是 ContextualProcessor 的扩展,提供与已弃用的 MessagingTransformer 相同的功能,后者实现了已弃用的 Transformer 接口。这允许 Kafka Streams 拓扑与 Spring Messaging 组件(例如 Spring Integration 流)进行交互。转换器需要 MessagingFunction 的实现。

@FunctionalInterface
public interface MessagingFunction {

    Message<?> exchange(Message<?> message);

}

Spring Integration 使用其 GatewayProxyFactoryBean 自动提供实现。它还需要一个 MessagingMessageConverter 来将键、值和元数据(包括头信息)转换为/从 Spring Messaging Message<?> 转换。有关更多信息,请参阅 [从 KStream 调用 Spring Integration 流]

从反序列化异常中恢复

版本 2.3 引入了 RecoveringDeserializationExceptionHandler,它可以在发生反序列化异常时采取一些措施。请参阅 Kafka 文档中有关 DeserializationExceptionHandler 的内容,RecoveringDeserializationExceptionHandler 是其一个实现。RecoveringDeserializationExceptionHandler 使用 ConsumerRecordRecoverer 实现进行配置。框架提供了 DeadLetterPublishingRecoverer,它将失败的记录发送到死信主题。有关此恢复器的更多信息,请参阅 发布死信记录

要配置恢复器,请将以下属性添加到您的流配置中

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
            RecoveringDeserializationExceptionHandler.class);
    props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
    ...
    return new KafkaStreamsConfiguration(props);
}

@Bean
public DeadLetterPublishingRecoverer recoverer() {
    return new DeadLetterPublishingRecoverer(kafkaTemplate(),
            (record, ex) -> new TopicPartition("recovererDLQ", -1));
}

当然,recoverer() bean 可以是您自己的 ConsumerRecordRecoverer 实现。

交互式查询支持

从 3.2 版本开始,Spring for Apache Kafka 提供了 Kafka Streams 中交互式查询所需的基本设施。交互式查询在有状态的 Kafka Streams 应用程序中很有用,因为它们提供了一种持续查询应用程序中有状态存储的方法。因此,如果应用程序想要物化正在考虑的系统的当前视图,交互式查询提供了一种方法。要了解有关交互式查询的更多信息,请参阅此 文章。Spring for Apache Kafka 中的支持围绕一个名为 KafkaStreamsInteractiveQueryService 的 API 展开,它是 Kafka Streams 库中交互式查询 API 的一个门面。应用程序可以创建此服务的实例作为 bean,然后稍后使用它通过名称检索状态存储。

以下代码片段显示了一个示例。

@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
    final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
            new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
    return kafkaStreamsInteractiveQueryService;
}

假设一个 Kafka Streams 应用程序有一个名为 app-store 的状态存储,那么可以通过 KafkStreamsInteractiveQuery API 检索该存储,如下所示。

@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;

ReadOnlyKeyValueStore<Object, Object>  appStore = interactiveQueryService.retrieveQueryableStore("app-store", QueryableStoreTypes.keyValueStore());

一旦应用程序获得了对状态存储的访问权限,它就可以从该存储中查询键值信息。

在这种情况下,应用程序使用的状态存储是一个只读键值存储。Kafka Streams 应用程序可以使用其他类型的状态存储。例如,如果应用程序希望查询基于窗口的存储,它可以在 Kafka Streams 应用程序业务逻辑中构建该存储,然后稍后检索它。由于这个原因,KafkaStreamsInteractiveQueryService 中检索可查询存储的 API 具有通用的存储类型签名,以便最终用户可以分配适当的类型。

以下是 API 的类型签名。

public <T> T retrieveQueryableStore(String storeName, QueryableStoreType<T> storeType)

在调用此方法时,用户可以专门请求适当的状态存储类型,就像我们在上面的示例中所做的那样。

重试状态存储检索

当尝试使用 KafkaStreamsInteractiveQueryService 检索状态存储时,可能会由于各种原因而找不到状态存储。如果这些原因是暂时的,KafkaStreamsInteractiveQueryService 提供了一个选项,通过允许注入自定义 RetryTemplate 来重试状态存储的检索。默认情况下,KafkaStreamsInteractiveQueryService 中使用的 RetryTemmplate 使用最大尝试次数为 3,固定回退时间为 1 秒。

以下是如何将自定义 RetryTemmplate 注入 KafkaStreamsInteractiveQueryService,最大尝试次数为 10。

@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
    final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
            new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setBackOffPolicy(new FixedBackOffPolicy());
    RetryPolicy retryPolicy = new SimpleRetryPolicy(10);
    retryTemplate.setRetryPolicy(retryPolicy);
    kafkaStreamsInteractiveQueryService.setRetryTemplate(retryTemplate);
    return kafkaStreamsInteractiveQueryService;
}

查询远程状态存储

上面展示的用于检索状态存储的 API - `retrieveQueryableStore` 旨在用于本地可用的键值状态存储。在生产环境中,Kafka Streams 应用程序最有可能根据分区数量进行分布。如果一个主题有四个分区,并且有四个相同 Kafka Streams 处理器的实例在运行,那么每个实例可能负责处理来自该主题的一个分区。在这种情况下,调用 `retrieveQueryableStore` 可能不会返回实例正在寻找的正确结果,尽管它可能会返回一个有效的存储。假设这个有四个分区的主题包含关于各种键的数据,并且一个分区始终负责一个特定的键。如果调用 `retrieveQueryableStore` 的实例正在查找关于该实例没有托管的键的信息,那么它将不会收到任何数据。这是因为当前的 Kafka Streams 实例对这个键一无所知。为了解决这个问题,调用实例首先需要确保他们拥有托管特定键的 Kafka Streams 处理器实例的主机信息。这可以通过以下方式从任何具有相同 `application.id` 的 Kafka Streams 实例中获取。

@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;

HostInfo kafkaStreamsApplicationHostInfo = this.interactiveQueryService.getKafkaStreamsApplicationHostInfo("app-store", 12345, new IntegerSerializer());

在上面的示例代码中,调用实例正在从名为 `app-store` 的状态存储中查询特定键 `12345`。该 API 还需要一个相应的键序列化器,在本例中为 `IntegerSerializer`。Kafka Streams 会遍历所有具有相同 `application.id` 的实例,并尝试找到哪个实例托管了这个特定的键。一旦找到,它就会将该主机信息作为 `HostInfo` 对象返回。

该 API 的外观如下所示

public <K> HostInfo getKafkaStreamsApplicationHostInfo(String store, K key, Serializer<K> serializer)

当以这种分布式方式使用多个具有相同application.id的 Kafka Streams 处理器实例时,应用程序应该提供一个 RPC 层,以便可以通过 RPC 端点(例如 REST 端点)查询状态存储。有关此内容的更多详细信息,请参阅此文章。当使用 Spring for Apache Kafka 时,使用 spring-web 技术非常容易添加基于 Spring 的 REST 端点。一旦有了 REST 端点,就可以使用它从任何 Kafka Streams 实例查询状态存储,前提是实例知道HostInfo(其中包含密钥的主机信息)。

如果包含密钥的实例是当前实例,则应用程序不需要调用 RPC 机制,而是进行 JVM 内调用。然而,问题在于,应用程序可能不知道进行调用的实例是否包含密钥,因为某个特定服务器可能会由于消费者重新平衡而丢失分区。为了解决这个问题,KafkaStreamsInteractiveQueryService 提供了一个方便的 API,用于通过 API 方法getCurrentKafkaStreamsApplicationHostInfo() 查询当前主机信息,该方法返回当前HostInfo。其思路是,应用程序可以首先获取有关密钥所在位置的信息,然后将HostInfo与当前实例的HostInfo进行比较。如果HostInfo数据匹配,则可以通过retrieveQueryableStore进行简单的 JVM 调用,否则使用 RPC 选项。

Kafka Streams 示例

以下示例结合了我们在本章中介绍的各种主题

@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        return new KafkaStreamsConfiguration(props);
    }

    @Bean
    public StreamsBuilderFactoryBeanConfigurer configurer() {
        return fb -> fb.setStateListener((newState, oldState) -> {
            System.out.println("State transition from " + oldState + " to " + newState);
        });
    }

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
        KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
        stream
                .mapValues((ValueMapper<String, String>) String::toUpperCase)
                .groupByKey()
                .windowedBy(TimeWindows.of(Duration.ofMillis(1_000)))
                .reduce((String value1, String value2) -> value1 + value2,
                		Named.as("windowStore"))
                .toStream()
                .map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
                .filter((i, s) -> s.length() > 40)
                .to("streamingTopic2");

        stream.print(Printed.toSysOut());

        return stream;
    }

}