编程模型

当使用 Kafka Streams 绑定器提供的编程模型时,既可以使用高级的 Streams DSL,也可以混合使用高级和低级 Processor-API。当混合使用高级和低级 API 时,通常通过在 KStream 上调用 transformprocess API 方法来实现。

函数式风格

从 Spring Cloud Stream 3.0.0 开始,Kafka Streams 绑定器允许应用程序使用 Java 8 中提供的函数式编程风格进行设计和开发。这意味着应用程序可以用 java.util.function.Functionjava.util.function.Consumer 类型的 Lambda 表达式简洁地表示。

让我们来看一个非常简单的例子。

@SpringBootApplication
public class SimpleConsumerApplication {

    @Bean
    public java.util.function.Consumer<KStream<Object, String>> process() {

        return input ->
                input.foreach((key, value) -> {
                    System.out.println("Key: " + key + " Value: " + value);
                });
    }
}

尽管很简单,但这是一个完整的独立 Spring Boot 应用程序,它利用 Kafka Streams 进行流处理。这是一个没有输出绑定的消费者应用程序,只有一个输入绑定。应用程序消费数据,并在标准输出上简单地记录来自 KStream 的键和值的信息。应用程序包含 SpringBootApplication 注解和一个标记为 Bean 的方法。bean 方法的类型为 java.util.function.Consumer,它使用 KStream 参数化。然后在实现中,我们返回一个本质上是 Lambda 表达式的 Consumer 对象。在 Lambda 表达式内部,提供了处理数据的代码。

在这个应用程序中,有一个名为 KStream 的单个输入绑定。绑定器为应用程序创建此绑定,其名称为 process-in-0,即函数 Bean 名称后跟一个连字符 (-),然后是文字 in,再跟另一个连字符,最后是参数的序号位置。您可以使用此绑定名称设置其他属性,例如目标。例如,spring.cloud.stream.bindings.process-in-0.destination=my-topic

如果在绑定上未设置目标属性,则会创建一个与绑定名称相同的主题(如果应用程序具有足够的权限)或预期该主题已存在。

构建为 uber-jar(例如,kstream-consumer-app.jar)后,您可以像下面这样运行上述示例。

如果应用程序选择使用 Spring 的 Component 注解定义函数 Bean,绑定器也支持该模型。上面的函数 Bean 可以改写如下。

@Component(name = "process")
public class SimpleConsumer implements java.util.function.Consumer<KStream<Object, String>> {

    @Override
    public void accept(KStream<Object, String> input) {
        input.foreach((key, value) -> {
            System.out.println("Key: " + key + " Value: " + value);
        });
    }
}
java -jar kstream-consumer-app.jar --spring.cloud.stream.bindings.process-in-0.destination=my-topic

这是一个其他的例子,它是一个包含输入和输出绑定的完整处理器。这是一个经典的单词计数示例,应用程序从主题接收数据,然后在滚动时间窗口中计算每个单词出现的次数。

@SpringBootApplication
public class WordCountProcessorApplication {

  @Bean
  public Function<KStream<Object, String>, KStream<?, WordCount>> process() {

    return input -> input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .map((key, value) -> new KeyValue<>(value, value))
                .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
                .windowedBy(TimeWindows.of(5000))
                .count(Materialized.as("word-counts-state-store"))
                .toStream()
                .map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,
                        new Date(key.window().start()), new Date(key.window().end()))));
  }

	public static void main(String[] args) {
		SpringApplication.run(WordCountProcessorApplication.class, args);
	}
}

这里同样是一个完整的 Spring Boot 应用程序。与第一个应用程序的不同之处在于,bean 方法的类型为java.util.function.FunctionFunction的第一个参数化类型用于输入KStream,第二个用于输出。在方法体中,提供了一个类型为Function的 lambda 表达式,并作为实现给出了实际的业务逻辑。类似于前面讨论的基于 Consumer 的应用程序,这里的输入绑定默认命名为process-in-0。对于输出,绑定名称也自动设置为process-out-0

构建为 uber-jar(例如,wordcount-processor.jar)后,您可以像下面这样运行上面的示例。

java -jar wordcount-processor.jar --spring.cloud.stream.bindings.process-in-0.destination=words --spring.cloud.stream.bindings.process-out-0.destination=counts

此应用程序将从 Kafka 主题words消费消息,并将计算结果发布到输出主题counts

Spring Cloud Stream 将确保来自传入和传出主题的消息自动绑定为 KStream 对象。作为开发人员,您可以专注于代码的业务方面,即编写处理器所需的逻辑。框架会自动处理 Kafka Streams 基础设施所需的 Kafka Streams 特定配置的设置。

我们上面看到的两个示例都有一个单一的KStream输入绑定。在这两种情况下,绑定都从单个主题接收记录。如果您想将多个主题多路复用到单个KStream绑定中,可以在下面提供逗号分隔的 Kafka 主题作为目标。

spring.cloud.stream.bindings.process-in-0.destination=topic-1,topic-2,topic-3

此外,如果您想根据正则表达式匹配主题,还可以提供主题模式作为目标。

spring.cloud.stream.bindings.process-in-0.destination=input.*

多个输入绑定

许多非平凡的 Kafka Streams 应用程序通常通过多个绑定从多个主题消费数据。例如,一个主题作为Kstream消费,另一个作为KTableGlobalKTable消费。应用程序可能希望将数据接收为表类型的原因有很多。考虑一个用例,其中底层主题通过来自数据库的更改数据捕获 (CDC) 机制填充,或者应用程序只关心最新更新以进行下游处理。如果应用程序指定数据需要绑定为KTableGlobalKTable,那么 Kafka Streams 绑定器将正确地将目标绑定到KTableGlobalKTable,并使它们可供应用程序操作。我们将了解 Kafka Streams 绑定器中如何处理多个输入绑定的几种不同场景。

Kafka Streams 绑定器中的 BiFunction

这是一个我们有两个输入和一个输出的示例。在这种情况下,应用程序可以利用java.util.function.BiFunction

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
    return (userClicksStream, userRegionsTable) -> (userClicksStream
            .leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
                            "UNKNOWN" : region, clicks),
                    Joined.with(Serdes.String(), Serdes.Long(), null))
            .map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
                    regionWithClicks.getClicks()))
            .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
            .reduce(Long::sum)
            .toStream());
}

这里再次与前面的示例基本相同,但这里我们有两个输入。Java 的BiFunction支持用于将输入绑定到所需的目标。绑定器为输入生成的默认绑定名称分别为process-in-0process-in-1。默认输出绑定为process-out-0。在此示例中,BiFunction的第一个参数绑定为第一个输入的KStream,第二个参数绑定为第二个输入的KTable

Kafka Streams 绑定器中的 BiConsumer

如果有两个输入但没有输出,在这种情况下,我们可以使用java.util.function.BiConsumer,如下所示。

@Bean
public BiConsumer<KStream<String, Long>, KTable<String, String>> process() {
    return (userClicksStream, userRegionsTable) -> {}
}

超过两个输入

如果您有多于两个输入怎么办?在某些情况下,您可能需要多于两个输入。在这种情况下,绑定器允许您链接部分函数。在函数式编程术语中,此技术通常称为柯里化。随着 Java 8 中添加的函数式编程支持,Java 现在使您能够编写柯里化函数。Spring Cloud Stream Kafka Streams 绑定器可以利用此功能来启用多个输入绑定。

让我们来看一个例子。

@Bean
public Function<KStream<Long, Order>,
        Function<GlobalKTable<Long, Customer>,
                Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {

    return orders -> (
              customers -> (
                    products -> (
                        orders.join(customers,
                            (orderId, order) -> order.getCustomerId(),
                                (order, customer) -> new CustomerOrder(customer, order))
                                .join(products,
                                        (orderId, customerOrder) -> customerOrder
                                                .productId(),
                                        (customerOrder, product) -> {
                                            EnrichedOrder enrichedOrder = new EnrichedOrder();
                                            enrichedOrder.setProduct(product);
                                            enrichedOrder.setCustomer(customerOrder.customer);
                                            enrichedOrder.setOrder(customerOrder.order);
                                            return enrichedOrder;
                                        })
                        )
                )
    );
}

让我们看看上面介绍的绑定模型的细节。在此模型中,我们在入站方向有 3 个部分应用的函数。让我们将它们称为f(x)f(y)f(z)。如果我们从真正数学函数的角度扩展这些函数,它将如下所示:f(x) → (fy) → f(z) → KStream<Long, EnrichedOrder>x变量代表KStream<Long, Order>y变量代表GlobalKTable<Long, Customer>z变量代表GlobalKTable<Long, Product>。第一个函数f(x)具有应用程序的第一个输入绑定 (KStream<Long, Order>),其输出是函数 f(y)。函数f(y)具有应用程序的第二个输入绑定 (GlobalKTable<Long, Customer>),其输出是另一个函数f(z)。函数f(z)的输入是应用程序的第三个输入 (GlobalKTable<Long, Product>),其输出是KStream<Long, EnrichedOrder>,它是应用程序的最终输出绑定。来自三个部分函数的输入,分别是KStreamGlobalKTableGlobalKTable,可在方法体中使用,用于作为 lambda 表达式的一部分实现业务逻辑。

输入绑定分别命名为enrichOrder-in-0enrichOrder-in-1enrichOrder-in-2。输出绑定命名为enrichOrder-out-0

使用柯里化函数,您可以拥有任意数量的输入。但是,请记住,在 Java 中,超过少量输入及其部分应用函数可能会导致代码难以阅读。因此,如果您的 Kafka Streams 应用程序需要多于合理数量的输入绑定,并且您希望使用此函数模型,则可能需要重新考虑您的设计并适当地分解应用程序。

输出绑定

Kafka Streams 绑定器允许KStreamKTable作为输出绑定类型。在后台,绑定器使用KStream上的to方法将结果记录发送到输出主题。如果应用程序在函数中提供KTable作为输出,则绑定器仍将通过委托给KStreamto方法来使用此技术。

例如,下面的两个函数都可以工作

@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
    return KStream::toTable;
    };
}

@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
    return KTable::toStream;
}

多个输出绑定

Kafka Streams 允许将出站数据写入多个主题。此功能在 Kafka Streams 中称为分支。使用多个输出绑定时,需要提供一个 KStream 数组 (KStream[]) 作为出站返回类型。

这是一个例子

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>[]> process() {

    Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
    Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
    Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");

    return input -> {
        final Map<String, KStream<Object, WordCount>> stringKStreamMap = input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .groupBy((key, value) -> value)
                .windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
                .count(Materialized.as("WordCounts-branch"))
                .toStream()
                .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
                        new Date(key.window().start()), new Date(key.window().end()))))
                .split()
                .branch(isEnglish)
                .branch(isFrench)
                .branch(isSpanish)
                .noDefaultBranch();

        return stringKStreamMap.values().toArray(new KStream[0]);
    };
}

编程模型保持不变,但是出站参数化类型为KStream[]。上面函数的默认输出绑定名称分别为process-out-0process-out-1process-out-2。绑定器生成三个输出绑定的原因是它检测到返回的KStream数组的长度为三个。请注意,在此示例中,我们提供了一个noDefaultBranch();如果我们改用defaultBranch(),则需要一个额外的输出绑定,本质上返回长度为四的KStream数组。

Kafka Streams 函数式编程风格总结

总之,下表显示了函数范式中可用的各种选项。

输入数量 输出数量 要使用的组件

1

0

java.util.function.Consumer

2

0

java.util.function.BiConsumer

1

1..n

java.util.function.Function

2

1..n

java.util.function.BiFunction

>= 3

0..n

使用柯里化函数

  • 在此表中,如果有多个输出,类型将简单地变为KStream[]

Kafka Streams 绑定器中的函数组合

Kafka Streams 绑定器支持线性拓扑的最小形式的函数组合。使用 Java 函数式 API 支持,您可以编写多个函数,然后使用andThen方法自行组合它们。例如,假设您有以下两个函数。

@Bean
public Function<KStream<String, String>, KStream<String, String>> foo() {
    return input -> input.peek((s, s2) -> {});
}

@Bean
public Function<KStream<String, String>, KStream<String, Long>> bar() {
    return input -> input.peek((s, s2) -> {});
}

即使绑定器中没有函数组合支持,您也可以像下面这样组合这两个函数。

@Bean
public Function<KStream<String, String>, KStream<String, Long>> composed() {
    foo().andThen(bar());
}

然后,您可以提供表单spring.cloud.function.definition=foo;bar;composed的定义。使用绑定器中的函数组合支持,您无需编写此第三个函数,在其中进行显式函数组合。

您可以简单地这样做

spring.cloud.function.definition=foo|bar

您甚至可以这样做

spring.cloud.function.definition=foo|bar;foo;bar

在此示例中,组合函数的默认绑定名称变为foobar-in-0foobar-out-0

Kafka Streams 绑定器中函数组合的限制

当您拥有java.util.function.Function bean 时,它可以与另一个函数或多个函数组合。同一个函数 bean 也可以与java.util.function.Consumer组合。在这种情况下,consumer 是组合的最后一个组件。一个函数可以与多个函数组合,然后以java.util.function.Consumer bean 结尾。

组合类型为java.util.function.BiFunction的 bean 时,BiFunction必须是定义中的第一个函数。组合的实体必须是java.util.function.Functionjava.util.funciton.Consumer类型。换句话说,您不能获取一个BiFunction bean,然后与另一个BiFunction组合。

您不能与BiConsumer类型或Consumer作为第一个组件的定义组合。您也不能与输出为数组(分支的KStream[])的函数组合,除非这是定义中的最后一个组件。

函数定义中的第一个FunctionBiFunction也可以使用柯里化形式。例如,以下是可能的。

@Bean
public Function<KStream<String, String>, Function<KTable<String, String>, KStream<String, String>>> curriedFoo() {
    return a -> b ->
            a.join(b, (value1, value2) -> value1 + value2);
}

@Bean
public Function<KStream<String, String>, KStream<String, String>> bar() {
    return input -> input.mapValues(value -> value + "From-anotherFooFunc");
}

函数定义可以是curriedFoo|bar。在后台,绑定器将为柯里化函数创建两个输入绑定,以及基于定义中最终函数的输出绑定。在这种情况下,默认输入绑定将为curriedFoobar-in-0curriedFoobar-in-1。此示例的默认输出绑定变为curriedFoobar-out-0

关于在函数组合中使用KTable作为输出的特殊说明

假设您有以下两个函数。

@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
    return KStream::toTable;
    };
}

@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
    return KTable::toStream;
}

您可以将它们组合为foo|bar,但请记住,第二个函数 (此情况下的bar) 必须将KTable作为输入,因为第一个函数 (foo) 将KTable作为输出。