编程模型
使用 Kafka Streams Binder 提供的编程模型时,可以使用高级的 Streams DSL 以及高级和低级 Processor-API 的混合作为选项。当混合使用高级和低级 API 时,通常通过在 KStream
上调用 transform
或 process
API 方法来实现。
函数式风格
从 Spring Cloud Stream 3.0.0
开始,Kafka Streams Binder 允许应用程序使用 Java 8 中提供的函数式编程风格进行设计和开发。这意味着应用程序可以简洁地表示为 java.util.function.Function
或 java.util.function.Consumer
类型的 lambda 表达式。
让我们来看一个非常基本的例子。
@SpringBootApplication
public class SimpleConsumerApplication {
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
return input ->
input.foreach((key, value) -> {
System.out.println("Key: " + key + " Value: " + value);
});
}
}
虽然简单,但这是一个完整的独立 Spring Boot 应用程序,它利用 Kafka Streams 进行流处理。这是一个没有出站绑定的消费者应用程序,只有一个入站绑定。应用程序消费数据,并简单地在标准输出上记录来自 KStream
键和值的的信息。应用程序包含 SpringBootApplication
注解和一个标记为 Bean
的方法。bean 方法的类型为 java.util.function.Consumer
,它用 KStream
参数化。然后在实现中,我们返回一个本质上是 lambda 表达式的 Consumer 对象。在 lambda 表达式中,提供了处理数据的代码。
在这个应用程序中,只有一个类型为 KStream
的输入绑定。Binder 为应用程序创建了这个绑定,名称为 process-in-0
,即函数 bean 名称后跟一个连字符 (-
),然后是文字 in
,再跟一个连字符,最后是参数的序号。使用此绑定名称设置其他属性,例如目标。例如,spring.cloud.stream.bindings.process-in-0.destination=my-topic
。
如果绑定上未设置目标属性,则会创建一个与绑定名称相同的主题(如果应用程序具有足够的权限),或者预期该主题已存在。 |
构建为 uber-jar(例如,kstream-consumer-app.jar
)后,您可以像下面这样运行上面的示例。
如果应用程序选择使用 Spring 的 Component
注解定义功能 Bean,则绑定器也支持该模型。上面的功能 Bean 可以改写如下。
@Component(name = "process")
public class SimpleConsumer implements java.util.function.Consumer<KStream<Object, String>> {
@Override
public void accept(KStream<Object, String> input) {
input.foreach((key, value) -> {
System.out.println("Key: " + key + " Value: " + value);
});
}
}
java -jar kstream-consumer-app.jar --spring.cloud.stream.bindings.process-in-0.destination=my-topic
以下是一个完整的处理器示例,它同时具有输入和输出绑定。这是经典的词频统计示例,其中应用程序从主题接收数据,然后在滚动时间窗口中计算每个词的出现次数。
@SpringBootApplication
public class WordCountProcessorApplication {
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {
return input -> input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("word-counts-state-store"))
.toStream()
.map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))));
}
public static void main(String[] args) {
SpringApplication.run(WordCountProcessorApplication.class, args);
}
}
同样,这是一个完整的 Spring Boot 应用程序。与第一个应用程序的不同之处在于,Bean 方法的类型为 java.util.function.Function
。Function
的第一个参数化类型用于输入 KStream
,第二个参数化类型用于输出。在方法体中,提供了一个类型为 Function
的 lambda 表达式,作为实现,给出了实际的业务逻辑。类似于之前讨论的基于 Consumer 的应用程序,这里的输入绑定默认命名为 process-in-0
。对于输出,绑定名称也会自动设置为 process-out-0
。
构建为 uber-jar(例如,wordcount-processor.jar
)后,您可以像下面这样运行上面的示例。
java -jar wordcount-processor.jar --spring.cloud.stream.bindings.process-in-0.destination=words --spring.cloud.stream.bindings.process-out-0.destination=counts
此应用程序将从 Kafka 主题 words
中消费消息,并将计算结果发布到输出主题 counts
。
Spring Cloud Stream 将确保来自传入和传出主题的消息自动绑定为 KStream 对象。作为开发人员,您可以专注于代码的业务方面,即编写处理器中所需的逻辑。框架会自动处理 Kafka Streams 基础设施所需的 Kafka Streams 特定配置。
我们上面看到的两个示例只有一个 KStream
输入绑定。在这两种情况下,绑定都从单个主题接收记录。如果您想将多个主题多路复用到单个 KStream
绑定中,您可以在下面提供以逗号分隔的 Kafka 主题作为目标。
spring.cloud.stream.bindings.process-in-0.destination=topic-1,topic-2,topic-3
此外,如果您想根据正则表达式匹配主题,您也可以提供主题模式作为目标。
spring.cloud.stream.bindings.process-in-0.destination=input.*
多个输入绑定
许多非平凡的 Kafka Streams 应用程序通常通过多个绑定从多个主题消费数据。例如,一个主题作为 Kstream
消费,另一个主题作为 KTable
或 GlobalKTable
消费。应用程序可能希望以表格类型接收数据的许多原因。考虑一个用例,其中基础主题通过来自数据库的变更数据捕获 (CDC) 机制填充,或者应用程序只关心最新更新以进行下游处理。如果应用程序指定数据需要绑定为 KTable
或 GlobalKTable
,那么 Kafka Streams 绑定器将正确地将目标绑定到 KTable
或 GlobalKTable
,并使它们可供应用程序操作。我们将研究几种不同的场景,说明 Kafka Streams 绑定器如何处理多个输入绑定。
Kafka Streams 绑定器中的 BiFunction
这是一个示例,其中有两个输入和一个输出。在这种情况下,应用程序可以利用java.util.function.BiFunction
。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
return (userClicksStream, userRegionsTable) -> (userClicksStream
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
regionWithClicks.getClicks()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum)
.toStream());
}
同样,基本主题与前面的示例相同,但这里有两个输入。Java 的BiFunction
支持用于将输入绑定到所需的目的地。绑定器为输入生成的默认绑定名称分别是process-in-0
和 process-in-1
。默认输出绑定是process-out-0
。在本例中,BiFunction
的第一个参数绑定为第一个输入的KStream
,第二个参数绑定为第二个输入的KTable
。
Kafka Streams 绑定器中的 BiConsumer
如果有两个输入,但没有输出,在这种情况下,我们可以使用java.util.function.BiConsumer
,如下所示。
@Bean
public BiConsumer<KStream<String, Long>, KTable<String, String>> process() {
return (userClicksStream, userRegionsTable) -> {}
}
超过两个输入
如果你有多于两个输入怎么办?在某些情况下,你需要超过两个输入。在这种情况下,绑定器允许你链接部分函数。在函数式编程术语中,这种技术通常被称为柯里化。随着 Java 8 中添加的函数式编程支持,Java 现在使你能够编写柯里化函数。Spring Cloud Stream Kafka Streams 绑定器可以利用此功能来启用多个输入绑定。
让我们看一个例子。
@Bean
public Function<KStream<Long, Order>,
Function<GlobalKTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {
return orders -> (
customers -> (
products -> (
orders.join(customers,
(orderId, order) -> order.getCustomerId(),
(order, customer) -> new CustomerOrder(customer, order))
.join(products,
(orderId, customerOrder) -> customerOrder
.productId(),
(customerOrder, product) -> {
EnrichedOrder enrichedOrder = new EnrichedOrder();
enrichedOrder.setProduct(product);
enrichedOrder.setCustomer(customerOrder.customer);
enrichedOrder.setOrder(customerOrder.order);
return enrichedOrder;
})
)
)
);
}
让我们看看上面介绍的绑定模型的细节。在这个模型中,我们在入站有 3 个部分应用的函数。让我们将它们称为f(x)
、f(y)
和 f(z)
。如果我们从真正的数学函数的角度扩展这些函数,它将看起来像这样:f(x) → (fy) → f(z) → KStream<Long, EnrichedOrder>
。x
变量代表KStream<Long, Order>
,y
变量代表GlobalKTable<Long, Customer>
,z
变量代表GlobalKTable<Long, Product>
。第一个函数f(x)
具有应用程序的第一个输入绑定 (KStream<Long, Order>
),其输出是函数 f(y)。函数f(y)
具有应用程序的第二个输入绑定 (GlobalKTable<Long, Customer>
),其输出是另一个函数f(z)
。函数f(z)
的输入是应用程序的第三个输入 (GlobalKTable<Long, Product>
),其输出是KStream<Long, EnrichedOrder>
,它是应用程序的最终输出绑定。来自三个部分函数的输入,分别是KStream
、GlobalKTable
、GlobalKTable
,在方法体中可用于实现作为 lambda 表达式一部分的业务逻辑。
输入绑定分别命名为enrichOrder-in-0
、enrichOrder-in-1
和 enrichOrder-in-2
。输出绑定命名为enrichOrder-out-0
。
使用柯里化函数,您可以拥有任意数量的输入。但是,请记住,如果输入数量过多,并且像上面在 Java 中那样使用部分应用函数,可能会导致代码难以阅读。因此,如果您的 Kafka Streams 应用程序需要超过合理数量的输入绑定,并且您想使用这种函数式模型,那么您可能需要重新考虑您的设计并适当地分解应用程序。
输出绑定
Kafka Streams 绑定器允许将 KStream
或 KTable
类型的任何一种作为输出绑定。在幕后,绑定器使用 KStream
上的 to
方法将结果记录发送到输出主题。如果应用程序在函数中提供 KTable
作为输出,绑定器仍然会通过委托给 KStream
的 to
方法来使用此技术。
例如,下面的两个函数都将起作用
@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
return KStream::toTable;
};
}
@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
return KTable::toStream;
}
多个输出绑定
Kafka Streams 允许将出站数据写入多个主题。此功能在 Kafka Streams 中称为分支。使用多个输出绑定时,您需要提供一个 KStream
数组 (KStream[]
) 作为出站返回类型。
以下是一个示例
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>[]> process() {
Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");
return input -> {
final Map<String, KStream<Object, WordCount>> stringKStreamMap = input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
.count(Materialized.as("WordCounts-branch"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))))
.split()
.branch(isEnglish)
.branch(isFrench)
.branch(isSpanish)
.noDefaultBranch();
return stringKStreamMap.values().toArray(new KStream[0]);
};
}
编程模型保持不变,但是出站参数化类型是 KStream[]
。默认输出绑定名称分别为 process-out-0
、process-out-1
、process-out-2
,用于上面的函数。绑定器生成三个输出绑定的原因是它检测到返回的 KStream
数组的长度为三个。请注意,在此示例中,我们提供了 noDefaultBranch()
;如果我们使用 defaultBranch()
代替,则需要额外的输出绑定,本质上返回长度为四的 KStream
数组。
Kafka Streams 函数式编程风格总结
总之,下表显示了可以在函数式范式中使用的各种选项。
输入数量 | 输出数量 | 要使用的组件 |
---|---|---|
1 |
0 |
java.util.function.Consumer |
2 |
0 |
java.util.function.BiConsumer |
1 |
1..n |
java.util.function.Function |
2 |
1..n |
java.util.function.BiFunction |
>= 3 |
0..n |
使用柯里化函数 |
-
在本表中,如果有多个输出,类型将简单地变为
KStream[]
。
Kafka Streams 绑定器中的函数组合
Kafka Streams 绑定器支持线性拓扑的最小形式的函数组合。使用 Java 函数式 API 支持,您可以编写多个函数,然后使用 andThen
方法自行组合它们。例如,假设您有以下两个函数。
@Bean
public Function<KStream<String, String>, KStream<String, String>> foo() {
return input -> input.peek((s, s2) -> {});
}
@Bean
public Function<KStream<String, String>, KStream<String, Long>> bar() {
return input -> input.peek((s, s2) -> {});
}
即使没有绑定器中的函数组合支持,您也可以像下面这样组合这两个函数。
@Bean
public Function<KStream<String, String>, KStream<String, Long>> composed() {
foo().andThen(bar());
}
然后,您可以提供 spring.cloud.function.definition=foo;bar;composed
形式的定义。使用绑定器中的函数组合支持,您不需要编写此第三个函数,在其中进行显式函数组合。
你可以简单地这样做
spring.cloud.function.definition=foo|bar
你甚至可以这样做
spring.cloud.function.definition=foo|bar;foo;bar
在这个例子中,组合函数的默认绑定名称变为foobar-in-0
和foobar-out-0
。
Kafka Streams bincer 中函数组合的限制
当你有一个java.util.function.Function
bean 时,它可以与另一个函数或多个函数组合。同一个函数 bean 也可以与一个java.util.function.Consumer
组合。在这种情况下,consumer 是最后一个组合的组件。一个函数可以与多个函数组合,然后以一个java.util.function.Consumer
bean 结束。
当组合类型为java.util.function.BiFunction
的 bean 时,BiFunction
必须是定义中的第一个函数。组合的实体必须是java.util.function.Function
或java.util.funciton.Consumer
类型。换句话说,你不能使用一个BiFunction
bean 然后与另一个BiFunction
组合。
你不能与BiConsumer
类型或Consumer
是第一个组件的定义组合。你也不能与输出为数组(KStream[]
用于分支)的函数组合,除非这是定义中的最后一个组件。
函数定义中的第一个Function
或BiFunction
可以使用柯里化形式。例如,以下操作是可能的。
@Bean
public Function<KStream<String, String>, Function<KTable<String, String>, KStream<String, String>>> curriedFoo() {
return a -> b ->
a.join(b, (value1, value2) -> value1 + value2);
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> bar() {
return input -> input.mapValues(value -> value + "From-anotherFooFunc");
}
函数定义可以是curriedFoo|bar
。在幕后,绑定器将为柯里化函数创建两个输入绑定,以及一个基于定义中最终函数的输出绑定。在这种情况下,默认输入绑定将是curriedFoobar-in-0
和curriedFoobar-in-1
。此示例的默认输出绑定变为curriedFoobar-out-0
。
关于在函数组合中使用KTable
作为输出的特殊说明
假设你有以下两个函数。
@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
return KStream::toTable;
};
}
@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
return KTable::toStream;
}
你可以将它们组合为foo|bar
,但请记住,第二个函数(在本例中为bar
)必须具有KTable
作为输入,因为第一个函数(foo
)具有KTable
作为输出。