Kafka Streams 应用中基于事件类型的路由
Kafka Streams 绑定器不支持常规消息通道绑定器中可用的路由函数。但是,Kafka Streams 绑定器仍然通过入站记录上的事件类型记录头提供路由功能。
要启用基于事件类型的路由,应用程序必须提供以下属性。
spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.eventTypes.
这可以是一个逗号分隔的值。
例如,假设我们有以下函数:
@Bean
public Function<KStream<Integer, Foo>, KStream<Integer, Foo>> process() {
return input -> input;
}
我们还假设,我们只希望在此函数中执行业务逻辑,如果传入记录的事件类型为 foo 或 bar。这可以使用绑定上的 eventTypes 属性表示如下。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypes=foo,bar
现在,当应用程序运行时,绑定器会检查每个传入记录的 event_type 头,并查看其值是否设置为 foo 或 bar。如果它没有找到其中任何一个,则将跳过函数执行。
默认情况下,绑定器期望记录头键为 event_type,但这可以按绑定更改。例如,如果我们要将此绑定上的头键更改为 my_event 而不是默认值,可以按如下方式更改。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypeHeaderKey=my_event.
当在 Kafka Streams 绑定器中使用事件路由功能时,它使用字节数组 Serde 反序列化所有传入记录。如果记录头匹配事件类型,则它只使用实际的 Serde 通过配置的或推断的 Serde 进行正确的反序列化。如果您在绑定上设置了反序列化异常处理程序,这会引入问题,因为预期的反序列化只会发生在堆栈下方,从而导致意外错误。为了解决这个问题,您可以在绑定上设置以下属性,以强制绑定器使用配置的或推断的 Serde,而不是字节数组 Serde。
spring.cloud.stream.kafka.streams.bindings.<process-in-0>.consumer.useConfiguredSerdeWhenRoutingEvents
通过这种方式,应用程序在使用事件路由功能时可以立即检测到反序列化问题,并可以采取适当的处理决策。