Kafka Streams 应用中的基于事件类型的路由

Kafka Streams Binder 不支持常规基于消息通道的 Binder 中可用的路由函数。但是,Kafka Streams Binder 仍然通过入站记录上的事件类型记录头提供路由功能。

要启用基于事件类型的路由,应用程序必须提供以下属性。

spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.eventTypes.

这可以是一个逗号分隔的值。

例如,假设我们有这个函数

@Bean
public Function<KStream<Integer, Foo>, KStream<Integer, Foo>> process() {
    return input -> input;
}

让我们假设我们只想执行此函数中的业务逻辑,如果传入的记录的事件类型为foobar。可以使用绑定上的eventTypes属性如下表达。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypes=foo,bar

现在,当应用程序运行时,Binder 会检查每个传入记录的event_type头,并查看其值是否设置为foobar。如果找不到其中任何一个,则函数执行将被跳过。

默认情况下,Binder 期望记录头键为event_type,但这可以根据绑定进行更改。例如,如果我们想将此绑定的头键更改为my_event而不是默认值,则可以如下更改。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypeHeaderKey=my_event.

在 Kafka Streams Binder 中使用事件路由功能时,它使用字节数组 `Serde` 来反序列化所有传入的记录。只有当记录头与事件类型匹配时,它才会使用实际的 `Serde` 使用配置的或推断的 `Serde` 进行正确的反序列化。如果在绑定上设置了反序列化异常处理程序,这会导致问题,因为预期的反序列化仅发生在堆栈下游,从而导致意外错误。为了解决这个问题,可以在绑定上设置以下属性,以强制 Binder 使用配置的或推断的 `Serde` 而不是字节数组 `Serde`。

spring.cloud.stream.kafka.streams.bindings.<process-in-0>.consumer.useConfiguredSerdeWhenRoutingEvents

这样,应用程序就可以在使用事件路由功能时立即检测到反序列化问题,并可以做出相应的处理决策。