编程模型的辅助工具
单个应用程序中的多个 Kafka Streams 处理器
Binder 允许在单个 Spring Cloud Stream 应用程序中使用多个 Kafka Streams 处理器。您可以拥有如下应用程序。
@Bean
public java.util.function.Function<KStream<Object, String>, KStream<Object, String>> process() {
...
}
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
...
}
@Bean
public java.util.function.BiFunction<KStream<Object, String>, KTable<Integer, String>, KStream<Object, String>> yetAnotherProcess() {
...
}
在这种情况下,Binder 将创建 3 个具有不同应用程序 ID 的独立 Kafka Streams 对象(更多内容见下文)。但是,如果应用程序中有多个处理器,则必须告诉 Spring Cloud Stream 哪些函数需要激活。以下是激活函数的方法。
spring.cloud.function.definition: process;anotherProcess;yetAnotherProcess
如果您希望某些函数不会立即激活,则可以从该列表中将其移除。
当您在同一应用程序中拥有单个 Kafka Streams 处理器和其他类型的 Function
Bean(通过不同的 Binder 处理,例如基于常规 Kafka 消息通道 Binder 的函数 Bean)时,这也是正确的
Kafka Streams 应用程序 ID
应用程序 ID 是您需要为 Kafka Streams 应用程序提供的强制性属性。Spring Cloud Stream Kafka Streams Binder 允许您通过多种方式配置此应用程序 ID。
如果应用程序中只有一个处理器,则可以使用以下属性在绑定级别设置此属性
spring.cloud.stream.kafka.streams.binder.applicationId
.
为了方便起见,如果只有一个处理器,还可以使用 spring.application.name
作为属性来委派应用程序 ID。
如果应用程序中有多个 Kafka Streams 处理器,则需要为每个处理器设置应用程序 ID。在函数模型的情况下,可以将其作为属性附加到每个函数。
例如,假设您有以下函数。
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
...
}
and
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
...
}
然后可以使用以下绑定级别属性为每个函数设置应用程序 ID。
spring.cloud.stream.kafka.streams.binder.functions.process.applicationId
and
spring.cloud.stream.kafka.streams.binder.functions.anotherProcess.applicationId
对于基于函数的模型,这种在绑定级别设置应用程序 ID 的方法也会起作用。但是,如果您使用的是函数模型,则如上所见,在绑定级别为每个函数设置会容易得多。
对于生产部署,强烈建议通过配置显式指定应用程序 ID。如果您正在自动扩展应用程序,这尤其重要,在这种情况下,您需要确保使用相同的应用程序 ID 部署每个实例。
如果应用程序未提供应用程序 ID,则在这种情况下,绑定程序将自动为您生成一个静态应用程序 ID。这在开发场景中很方便,因为它避免了显式提供应用程序 ID 的需要。以这种方式生成的应用程序 ID 在应用程序重新启动时将保持静态。在函数模型的情况下,生成的应用程序 ID 将是函数 bean 名称,后跟文字 applicationID
,例如,如果 process
是函数 bean 名称,则为 process-applicationID
。
设置应用程序 ID 的摘要
-
默认情况下,绑定程序将为每个函数方法自动生成应用程序 ID。
-
如果您有一个处理器,则可以使用
spring.kafka.streams.applicationId
、spring.application.name
或spring.cloud.stream.kafka.streams.binder.applicationId
。 -
如果您有多个处理器,则可以使用属性
spring.cloud.stream.kafka.streams.binder.functions.<function-name>.applicationId
为每个函数设置应用程序 ID。
使用函数样式覆盖绑定程序生成的默认绑定名称
默认情况下,绑定程序使用上述策略在使用函数样式时生成绑定名称,即 <function-bean-name>-<in>|<out>-[0..n],例如 process-in-0、process-out-0 等。如果您想覆盖这些绑定名称,可以通过指定以下属性来实现。
spring.cloud.stream.function.bindings.<默认绑定名称>
。默认绑定名称是绑定器生成的原始绑定名称。
例如,假设您有此函数。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
绑定器将生成具有名称 process-in-0
、process-in-1
和 process-out-0
的绑定。现在,如果您想将它们完全更改为其他内容,可能是更特定于域的绑定名称,那么您可以按如下方式进行操作。
spring.cloud.stream.function.bindings.process-in-0=users
spring.cloud.stream.function.bindings.process-in-0=regions
and
spring.cloud.stream.function.bindings.process-out-0=clicks
之后,您必须在这些新绑定名称上设置所有绑定级别属性。
请记住,对于上面描述的功能编程模型,在大多数情况下遵循默认绑定名称是有意义的。您可能仍然希望执行此覆盖的唯一原因是当您有大量配置属性并且希望将绑定映射到更友好的域时。
设置引导服务器配置
在运行 Kafka Streams 应用程序时,您必须提供 Kafka 代理服务器信息。如果您不提供此信息,则绑定器期望您在默认 localhost:9092
处运行代理。如果不是这种情况,则需要覆盖它。有几种方法可以做到这一点。
-
使用引导属性 -
spring.kafka.bootstrapServers
-
绑定器级别属性 -
spring.cloud.stream.kafka.streams.binder.brokers
对于绑定器级别属性,如果您使用通过常规 Kafka 绑定器提供的代理属性 spring.cloud.stream.kafka.binder.brokers
,则无关紧要。Kafka Streams 绑定器将首先检查是否设置了 Kafka Streams 绑定器特定的代理属性 (spring.cloud.stream.kafka.streams.binder.brokers
),如果没有找到,它将查找 spring.cloud.stream.kafka.binder.brokers
。