状态存储
当使用高级 DSL 并进行适当的调用以触发状态存储时,Kafka Streams 会自动创建状态存储。
如果您想将传入的 `KTable` 绑定物料化为命名状态存储,可以使用以下策略。
假设您有以下函数。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
然后通过设置以下属性,传入的 `KTable` 数据将被物料化为命名状态存储。
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.materializedAs: incoming-store
您可以在应用程序中将自定义状态存储定义为 bean,这些 bean 将被绑定程序检测到并添加到 Kafka Streams 构建器中。特别是在使用处理器 API 时,您需要手动注册状态存储。为此,您可以在应用程序中创建 StateStore 作为 bean。以下是如何定义此类 bean 的示例。
@Bean
public StoreBuilder myStore() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
Serdes.Long());
}
@Bean
public StoreBuilder otherStore() {
return Stores.windowStoreBuilder(
Stores.persistentWindowStore("other-store",
1L, 3, 3L, false), Serdes.Long(),
Serdes.Long());
}
然后,应用程序可以直接访问这些状态存储。
在引导过程中,绑定程序将处理上述 bean 并将其传递给 Streams 构建器对象。
访问状态存储
Processor<Object, Product>() {
WindowStore<Object, String> state;
@Override
public void init(ProcessorContext processorContext) {
state = (WindowStore)processorContext.getStateStore("mystate");
}
...
}
这对于注册全局状态存储将不起作用。要注册全局状态存储,请参阅下面关于自定义 `StreamsBuilderFactoryBean` 的部分。