Apache Kafka Streams 支持
从 1.1.4 版本开始,Spring for Apache Kafka 为 Kafka Streams 提供了一流的支持。要在 Spring 应用程序中使用它,必须在类路径中存在 kafka-streams
jar 包。它是 Spring for Apache Kafka 项目的可选依赖项,不会被传递性下载。
基础
Apache Kafka Streams 参考文档建议以下使用 API 的方式
// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.
StreamsBuilder builder = ...; // when using the Kafka Streams DSL
// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;
KafkaStreams streams = new KafkaStreams(builder, config);
// Start the Kafka Streams instance
streams.start();
// Stop the Kafka Streams instance
streams.close();
因此,我们有两个主要组件
-
StreamsBuilder
:具有用于构建KStream
(或KTable
)实例的 API。 -
KafkaStreams
:用于管理这些实例的生命周期。
由单个 StreamsBuilder 向 KafkaStreams 实例公开的所有 KStream 实例将同时启动和停止,即使它们具有不同的逻辑。换句话说,由 StreamsBuilder 定义的所有流都与单个生命周期控制绑定在一起。一旦 KafkaStreams 实例通过 streams.close() 关闭,它就不能重新启动。相反,必须创建一个新的 KafkaStreams 实例来重新启动流处理。
|
Spring 管理
为了简化从 Spring 应用程序上下文角度使用 Kafka Streams 并通过容器使用生命周期管理,Spring for Apache Kafka 引入了 StreamsBuilderFactoryBean
。这是一个 AbstractFactoryBean
实现,用于将 StreamsBuilder
单例实例公开为 bean。以下示例创建了这样一个 bean
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
从版本 2.2 开始,流配置现在以 KafkaStreamsConfiguration 对象的形式提供,而不是 StreamsConfig 。
|
StreamsBuilderFactoryBean
还实现了 SmartLifecycle
来管理内部 KafkaStreams
实例的生命周期。与 Kafka Streams API 类似,您必须在启动 KafkaStreams
之前定义 KStream
实例。这也适用于 Kafka Streams 的 Spring API。因此,当您在 StreamsBuilderFactoryBean
上使用默认的 autoStartup = true
时,您必须在应用程序上下文刷新之前在 StreamsBuilder
上声明 KStream
实例。例如,KStream
可以是常规的 bean 定义,而 Kafka Streams API 在没有任何影响的情况下使用。以下示例展示了如何做到这一点
@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
// Fluent KStream API
return stream;
}
如果您想手动控制生命周期(例如,通过某些条件停止和启动),您可以使用工厂 bean (&
) 前缀 直接引用 StreamsBuilderFactoryBean
bean。由于 StreamsBuilderFactoryBean
使用其内部 KafkaStreams
实例,因此可以安全地停止并再次启动它。每次 start()
时都会创建一个新的 KafkaStreams
。如果您想分别控制 KStream
实例的生命周期,您也可以考虑使用不同的 StreamsBuilderFactoryBean
实例。
您还可以指定 KafkaStreams.StateListener
、Thread.UncaughtExceptionHandler
和 StateRestoreListener
选项在 StreamsBuilderFactoryBean
上,这些选项将委托给内部 KafkaStreams
实例。此外,除了在 StreamsBuilderFactoryBean
上间接设置这些选项之外,从版本 2.1.5 开始,您可以使用 KafkaStreamsCustomizer
回调接口来配置内部 KafkaStreams
实例。请注意,KafkaStreamsCustomizer
会覆盖 StreamsBuilderFactoryBean
提供的选项。如果您需要直接执行一些 KafkaStreams
操作,您可以使用 StreamsBuilderFactoryBean.getKafkaStreams()
访问该内部 KafkaStreams
实例。您可以按类型自动装配 StreamsBuilderFactoryBean
bean,但您应该确保在 bean 定义中使用完整类型,如下面的示例所示
@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
或者,如果您使用接口 bean 定义,您可以添加 @Qualifier
以按名称进行注入。以下示例展示了如何做到这一点
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
从版本 2.4.1 开始,工厂 bean 有一个新的属性 infrastructureCustomizer
,其类型为 KafkaStreamsInfrastructureCustomizer
;这允许在创建流之前自定义 StreamsBuilder
(例如,添加状态存储)和/或 Topology
。
public interface KafkaStreamsInfrastructureCustomizer {
void configureBuilder(StreamsBuilder builder);
void configureTopology(Topology topology);
}
提供默认的无操作实现,以避免在不需要其中一种方法时必须实现两种方法。
提供了一个 CompositeKafkaStreamsInfrastructureCustomizer
,用于需要应用多个自定义器的情况。
KafkaStreams Micrometer 支持
从 2.5.3 版本开始,您可以配置 KafkaStreamsMicrometerListener
以自动为工厂 Bean 管理的 KafkaStreams
对象注册 Micrometer 指标。
streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
Streams JSON 序列化和反序列化
对于以 JSON 格式读取或写入主题或状态存储时数据的序列化和反序列化,Spring for Apache Kafka 提供了一个 JsonSerde
实现,它使用 JSON,并委托给 序列化、反序列化和消息转换 中描述的 JsonSerializer
和 JsonDeserializer
。JsonSerde
实现通过其构造函数(目标类型或 ObjectMapper
)提供相同的配置选项。在以下示例中,我们使用 JsonSerde
来序列化和反序列化 Kafka 流的 Cat
负载(JsonSerde
可以以类似的方式在需要实例的任何地方使用)。
stream.through(Serdes.Integer(), new JsonSerde<>(Cat.class), "cats");
从 2.3 版本开始,在以编程方式构建生产者/消费者工厂中使用的序列化器/反序列化器时,可以使用流畅的 API,这简化了配置。
stream.through(
new JsonSerde<>(MyKeyType.class)
.forKeys()
.noTypeInfo(),
new JsonSerde<>(MyValueType.class)
.noTypeInfo(),
"myTypes");
使用 KafkaStreamBrancher
KafkaStreamBrancher
类引入了在 KStream
之上构建条件分支的更便捷方式。
考虑以下不使用 KafkaStreamBrancher
的示例。
KStream<String, String>[] branches = builder.stream("source").branch(
(key, value) -> value.contains("A"),
(key, value) -> value.contains("B"),
(key, value) -> true
);
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");
以下示例使用 KafkaStreamBrancher
。
new KafkaStreamBrancher<String, String>()
.branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
.branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
//default branch should not necessarily be defined in the end of the chain!
.defaultBranch(ks -> ks.to("C"))
.onTopOf(builder.stream("source"));
//onTopOf method returns the provided stream so we can continue with method chaining
配置
要配置 Kafka Streams 环境,StreamsBuilderFactoryBean
需要一个 KafkaStreamsConfiguration
实例。有关所有可能的选项,请参阅 Apache Kafka 文档。
从 2.2 版本开始,流配置现在以 KafkaStreamsConfiguration 对象的形式提供,而不是以 StreamsConfig 的形式提供。
|
为了避免大多数情况下,尤其是开发微服务时,编写样板代码,Spring for Apache Kafka 提供了 `@EnableKafkaStreams` 注解,您应该将其放在 `@Configuration` 类上。您只需要声明一个名为 `defaultKafkaStreamsConfig` 的 `KafkaStreamsConfiguration` bean。一个名为 `defaultKafkaStreamsBuilder` 的 `StreamsBuilderFactoryBean` bean 会自动在应用程序上下文中声明。您也可以声明和使用任何其他 `StreamsBuilderFactoryBean` bean。您可以通过提供一个实现 `StreamsBuilderFactoryBeanConfigurer` 的 bean 来对该 bean 进行额外的自定义。如果有多个这样的 bean,它们将根据其 `Ordered.order` 属性应用。
清理和停止配置
当工厂停止时,会调用 `KafkaStreams.close()` 方法,并带两个参数
-
closeTimeout:等待线程关闭的时间(默认为 `DEFAULT_CLOSE_TIMEOUT`,设置为 10 秒)。可以使用 `StreamsBuilderFactoryBean.setCloseTimeout()` 进行配置。
-
leaveGroupOnClose:触发组中的消费者离开调用(默认为 `false`)。可以使用 `StreamsBuilderFactoryBean.setLeaveGroupOnClose()` 进行配置。
默认情况下,当工厂 bean 停止时,会调用 `KafkaStreams.cleanUp()` 方法。从版本 2.1.2 开始,工厂 bean 有额外的构造函数,接受一个 `CleanupConfig` 对象,该对象具有属性,允许您控制在 `start()` 或 `stop()` 期间是否调用 `cleanUp()` 方法,或者两者都不调用。从版本 2.7 开始,默认情况下永远不会清理本地状态。
头信息增强器
版本 3.0 添加了 `HeaderEnricherProcessor` 扩展,它是 `ContextualProcessor` 的扩展;提供与已弃用的 `HeaderEnricher` 相同的功能,该 `HeaderEnricher` 实现已弃用的 `Transformer` 接口。这可以用于在流处理中添加头信息;头信息的值是 SpEL 表达式;表达式评估的根对象具有 3 个属性
-
record
- `org.apache.kafka.streams.processor.api.Record`(`key`、`value`、`timestamp`、`headers`) -
key
- 当前记录的键 -
value
- 当前记录的值 -
context
-ProcessorContext
,允许访问当前记录的元数据
表达式必须返回一个 byte[]
或 String
(它将使用 UTF-8
转换为 byte[]
)。
要在流中使用 enricher
.process(() -> new HeaderEnricherProcessor(expressions))
处理器不会更改 key
或 value
;它只是添加头信息。
每条记录都需要一个新的实例。 |
.process(() -> new HeaderEnricherProcessor<..., ...>(expressionMap))
以下是一个简单的示例,添加一个文字头信息和一个变量
Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("record.timestamp() + ' @' + record.offset()"));
ProcessorSupplier supplier = () -> new HeaderEnricher<String, String>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
.process(() -> supplier)
.to(OUTPUT);
MessagingProcessor
版本 3.0 添加了 MessagingProcessor
,它是 ContextualProcessor
的扩展,提供与已弃用的 MessagingTransformer
相同的功能,后者实现了已弃用的 Transformer
接口。这允许 Kafka Streams 拓扑与 Spring Messaging 组件(例如 Spring Integration 流)进行交互。转换器需要 MessagingFunction
的实现。
@FunctionalInterface
public interface MessagingFunction {
Message<?> exchange(Message<?> message);
}
Spring Integration 使用其 GatewayProxyFactoryBean
自动提供实现。它还需要一个 MessagingMessageConverter
来将键、值和元数据(包括头信息)转换为/从 Spring Messaging Message<?>
转换。有关更多信息,请参阅 [从 KStream
调用 Spring Integration 流]。
从反序列化异常中恢复
版本 2.3 引入了 RecoveringDeserializationExceptionHandler
,它可以在发生反序列化异常时采取一些措施。请参阅 Kafka 文档中有关 DeserializationExceptionHandler
的内容,RecoveringDeserializationExceptionHandler
是其一个实现。RecoveringDeserializationExceptionHandler
使用 ConsumerRecordRecoverer
实现进行配置。框架提供了 DeadLetterPublishingRecoverer
,它将失败的记录发送到死信主题。有关此恢复器的更多信息,请参阅 发布死信记录。
要配置恢复器,请将以下属性添加到您的流配置中
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
...
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
RecoveringDeserializationExceptionHandler.class);
props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
...
return new KafkaStreamsConfiguration(props);
}
@Bean
public DeadLetterPublishingRecoverer recoverer() {
return new DeadLetterPublishingRecoverer(kafkaTemplate(),
(record, ex) -> new TopicPartition("recovererDLQ", -1));
}
当然,recoverer()
bean 可以是您自己的 ConsumerRecordRecoverer
实现。
交互式查询支持
从 3.2 版本开始,Spring for Apache Kafka 提供了 Kafka Streams 中交互式查询所需的基本设施。交互式查询在有状态的 Kafka Streams 应用程序中很有用,因为它们提供了一种持续查询应用程序中有状态存储的方法。因此,如果应用程序想要物化正在考虑的系统的当前视图,交互式查询提供了一种方法。要了解有关交互式查询的更多信息,请参阅此 文章。Spring for Apache Kafka 中的支持围绕一个名为 KafkaStreamsInteractiveQueryService
的 API 展开,它是 Kafka Streams 库中交互式查询 API 的一个门面。应用程序可以创建此服务的实例作为 bean,然后稍后使用它通过名称检索状态存储。
以下代码片段显示了一个示例。
@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
return kafkaStreamsInteractiveQueryService;
}
假设一个 Kafka Streams 应用程序有一个名为 app-store
的状态存储,那么可以通过 KafkStreamsInteractiveQuery
API 检索该存储,如下所示。
@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;
ReadOnlyKeyValueStore<Object, Object> appStore = interactiveQueryService.retrieveQueryableStore("app-store", QueryableStoreTypes.keyValueStore());
一旦应用程序获得了对状态存储的访问权限,它就可以从该存储中查询键值信息。
在这种情况下,应用程序使用的状态存储是一个只读键值存储。Kafka Streams 应用程序可以使用其他类型的状态存储。例如,如果应用程序希望查询基于窗口的存储,它可以在 Kafka Streams 应用程序业务逻辑中构建该存储,然后稍后检索它。由于这个原因,KafkaStreamsInteractiveQueryService
中检索可查询存储的 API 具有通用的存储类型签名,以便最终用户可以分配适当的类型。
以下是 API 的类型签名。
public <T> T retrieveQueryableStore(String storeName, QueryableStoreType<T> storeType)
在调用此方法时,用户可以专门请求适当的状态存储类型,就像我们在上面的示例中所做的那样。
重试状态存储检索
当尝试使用 KafkaStreamsInteractiveQueryService
检索状态存储时,可能会由于各种原因而找不到状态存储。如果这些原因是暂时的,KafkaStreamsInteractiveQueryService
提供了一个选项,通过允许注入自定义 RetryTemplate
来重试状态存储的检索。默认情况下,KafkaStreamsInteractiveQueryService
中使用的 RetryTemmplate
使用最大尝试次数为 3,固定回退时间为 1 秒。
以下是如何将自定义 RetryTemmplate
注入 KafkaStreamsInteractiveQueryService
,最大尝试次数为 10。
@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setBackOffPolicy(new FixedBackOffPolicy());
RetryPolicy retryPolicy = new SimpleRetryPolicy(10);
retryTemplate.setRetryPolicy(retryPolicy);
kafkaStreamsInteractiveQueryService.setRetryTemplate(retryTemplate);
return kafkaStreamsInteractiveQueryService;
}
查询远程状态存储
上面展示的用于检索状态存储的 API - `retrieveQueryableStore` 旨在用于本地可用的键值状态存储。在生产环境中,Kafka Streams 应用程序最有可能根据分区数量进行分布。如果一个主题有四个分区,并且有四个相同 Kafka Streams 处理器的实例在运行,那么每个实例可能负责处理来自该主题的一个分区。在这种情况下,调用 `retrieveQueryableStore` 可能不会返回实例正在寻找的正确结果,尽管它可能会返回一个有效的存储。假设这个有四个分区的主题包含关于各种键的数据,并且一个分区始终负责一个特定的键。如果调用 `retrieveQueryableStore` 的实例正在查找关于该实例没有托管的键的信息,那么它将不会收到任何数据。这是因为当前的 Kafka Streams 实例对这个键一无所知。为了解决这个问题,调用实例首先需要确保他们拥有托管特定键的 Kafka Streams 处理器实例的主机信息。这可以通过以下方式从任何具有相同 `application.id` 的 Kafka Streams 实例中获取。
@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;
HostInfo kafkaStreamsApplicationHostInfo = this.interactiveQueryService.getKafkaStreamsApplicationHostInfo("app-store", 12345, new IntegerSerializer());
在上面的示例代码中,调用实例正在从名为 `app-store` 的状态存储中查询特定键 `12345`。该 API 还需要一个相应的键序列化器,在本例中为 `IntegerSerializer`。Kafka Streams 会遍历所有具有相同 `application.id` 的实例,并尝试找到哪个实例托管了这个特定的键。一旦找到,它就会将该主机信息作为 `HostInfo` 对象返回。
该 API 的外观如下所示
public <K> HostInfo getKafkaStreamsApplicationHostInfo(String store, K key, Serializer<K> serializer)
当以这种分布式方式使用多个具有相同application.id
的 Kafka Streams 处理器实例时,应用程序应该提供一个 RPC 层,以便可以通过 RPC 端点(例如 REST 端点)查询状态存储。有关此内容的更多详细信息,请参阅此文章。当使用 Spring for Apache Kafka 时,使用 spring-web 技术非常容易添加基于 Spring 的 REST 端点。一旦有了 REST 端点,就可以使用它从任何 Kafka Streams 实例查询状态存储,前提是实例知道HostInfo
(其中包含密钥的主机信息)。
如果包含密钥的实例是当前实例,则应用程序不需要调用 RPC 机制,而是进行 JVM 内调用。然而,问题在于,应用程序可能不知道进行调用的实例是否包含密钥,因为某个特定服务器可能会由于消费者重新平衡而丢失分区。为了解决这个问题,KafkaStreamsInteractiveQueryService
提供了一个方便的 API,用于通过 API 方法getCurrentKafkaStreamsApplicationHostInfo()
查询当前主机信息,该方法返回当前HostInfo
。其思路是,应用程序可以首先获取有关密钥所在位置的信息,然后将HostInfo
与当前实例的HostInfo
进行比较。如果HostInfo
数据匹配,则可以通过retrieveQueryableStore
进行简单的 JVM 调用,否则使用 RPC 选项。
Kafka Streams 示例
以下示例结合了我们在本章中介绍的各种主题
@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
return new KafkaStreamsConfiguration(props);
}
@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
return fb -> fb.setStateListener((newState, oldState) -> {
System.out.println("State transition from " + oldState + " to " + newState);
});
}
@Bean
public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
stream
.mapValues((ValueMapper<String, String>) String::toUpperCase)
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMillis(1_000)))
.reduce((String value1, String value2) -> value1 + value2,
Named.as("windowStore"))
.toStream()
.map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
.filter((i, s) -> s.length() > 40)
.to("streamingTopic2");
stream.print(Printed.toSysOut());
return stream;
}
}