错误处理

Apache Kafka Streams 提供了从反序列化错误中本地处理异常的功能。有关此支持的详细信息,请参阅 这里。开箱即用,Apache Kafka Streams 提供两种反序列化异常处理程序 - LogAndContinueExceptionHandlerLogAndFailExceptionHandler。顾名思义,前者将记录错误并继续处理下一条记录,而后者将记录错误并失败。LogAndFailExceptionHandler 是默认的反序列化异常处理程序。

在 Binder 中处理反序列化异常

Kafka Streams Binder 允许使用以下属性指定上述反序列化异常处理程序。

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndContinue

或者

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndFail

除了上述两种反序列化异常处理程序之外,Binder 还提供第三种处理程序,用于将错误记录(毒丸)发送到 DLQ(死信队列)主题。以下是启用此 DLQ 异常处理程序的方法。

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: sendToDlq

设置上述属性后,所有反序列化错误中的记录都会自动发送到 DLQ 主题。

您可以按照以下步骤设置 DLQ 消息发布的目标主题名称。

您可以为 DlqDestinationResolver 提供一个实现,它是一个函数式接口。DlqDestinationResolverConsumerRecord 和异常作为输入,然后允许指定一个主题名称作为输出。通过访问 Kafka ConsumerRecord,可以在 BiFunction 的实现中检查头部记录。

以下是如何为 DlqDestinationResolver 提供实现的示例。

@Bean
public DlqDestinationResolver dlqDestinationResolver() {
    return (rec, ex) -> {
        if (rec.topic().equals("word1")) {
            return "topic1-dlq";
        }
        else {
            return "topic2-dlq";
        }
    };
}

在为 DlqDestinationResolver 提供实现时,需要注意的一点是,绑定器中的供应器不会自动为应用程序创建主题。这是因为绑定器无法推断实现可能发送到的所有 DLQ 主题的名称。因此,如果您使用此策略提供 DLQ 名称,则应用程序有责任确保这些主题已事先创建。

如果 DlqDestinationResolver 作为 bean 存在于应用程序中,则它具有更高的优先级。如果您不想遵循这种方法,而是希望使用配置提供静态 DLQ 名称,则可以设置以下属性。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.dlqName: custom-dlq (Change the binding name accordingly)

如果设置了此属性,则错误记录将发送到主题 custom-dlq。如果应用程序未使用上述两种策略中的任何一种,则它将创建一个名为 error.<input-topic-name>.<application-id> 的 DLQ 主题。例如,如果您的绑定的目标主题是 inputTopic,应用程序 ID 是 process-applicationId,则默认的 DLQ 主题是 error.inputTopic.process-applicationId。始终建议为每个输入绑定显式创建一个 DLQ 主题,如果您的目的是启用 DLQ。

每个输入消费者绑定的 DLQ

属性 spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler 适用于整个应用程序。这意味着,如果同一个应用程序中有多个函数,则此属性将应用于所有函数。但是,如果您在一个处理器中有多个处理器或多个输入绑定,则可以使用绑定器提供的更细粒度的 DLQ 控制,该控制针对每个输入消费者绑定。

如果您有以下处理器,

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}

并且您只想在第一个输入绑定上启用 DLQ,并在第二个绑定上跳过并继续,则可以在消费者上执行以下操作。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler: sendToDlq spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.deserializationExceptionHandler: skipAndContinue

以这种方式设置反序列化异常处理程序的优先级高于在绑定器级别设置。

DLQ 分区

默认情况下,记录将使用与原始记录相同的分区发布到死信主题。这意味着死信主题必须至少与原始记录具有相同数量的分区。

要更改此行为,请将 DlqPartitionFunction 实现作为 @Bean 添加到应用程序上下文。只能存在一个这样的 bean。该函数将提供消费者组(在大多数情况下与应用程序 ID 相同)、失败的 ConsumerRecord 和异常。例如,如果您始终想要路由到分区 0,则可以使用

@Bean
public DlqPartitionFunction partitionFunction() {
    return (group, record, ex) -> 0;
}
如果将消费者绑定的 dlqPartitions 属性设置为 1(并且绑定器的 minPartitionCount 等于 1),则无需提供 DlqPartitionFunction;框架将始终使用分区 0。如果将消费者绑定的 dlqPartitions 属性设置为大于 1 的值(或绑定器的 minPartitionCount 大于 1),则 **必须** 提供 DlqPartitionFunction bean,即使分区计数与原始主题的相同。

在 Kafka Streams 绑定器中使用异常处理功能时,请牢记以下几点。

  • 属性 spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler 适用于整个应用程序。这意味着,如果同一个应用程序中有多个函数,则此属性将应用于所有函数。

  • 反序列化的异常处理与原生反序列化和框架提供的消息转换一致。

在绑定器中处理生产异常

与上面描述的反序列化异常处理程序支持不同,绑定器没有提供用于处理生产异常的此类一流机制。但是,您仍然可以使用 StreamsBuilderFactoryBean 自定义程序配置生产异常处理程序,您可以在下面部分中找到有关它的更多详细信息。

运行时错误处理

在处理来自应用程序代码(即来自业务逻辑执行)的错误时,通常由应用程序来处理。因为 Kafka Streams 绑定器无法干预应用程序代码。但是,为了使应用程序更容易,绑定器提供了一个方便的 RecordRecoverableProcessor,使用它,您可以指定如何处理应用程序级别的错误。

考虑以下代码。

@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process() {
    return input -> input
        .map(...);
}

如果上面的 map 调用中的业务代码抛出异常,则您有责任处理该错误。这就是 RecordRecoverableProcessor 派上用场的地方。默认情况下,RecordRecoverableProcessor 将简单地记录错误并让应用程序继续执行。假设您想将失败的记录发布到 DLT,而不是在应用程序中处理它。在这种情况下,您必须使用 RecordRecoverableProcessor 的自定义实现,称为 DltAwareProcessor。以下是如何做到这一点。

@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process(DltPublishingContext dltSenderContext) {
    return input -> input
        .process(() -> new DltAwareProcessor<>(record -> {
					throw new RuntimeException("error");
				}, "hello-dlt-1", dltPublishingContext));
}

来自原始map调用的业务逻辑代码现在已作为KStream#process方法调用的一部分移动,该方法接受一个ProcessorSupplier。然后,我们传入自定义的DltAwareProcessor,它能够发布到 DLT。上面的DltAwareProcessor的构造函数接受三个参数 - 一个Function,它接受输入记录,然后将业务逻辑操作作为Function主体的一部分,DLT 主题,最后是DltPublishingContext。当Function的 lambda 表达式抛出异常时,DltAwareProcessor会将输入记录发送到 DLT。DltPublishingContextDltAwareProcessor提供必要的发布基础设施 bean。DltPublishingContext由绑定器自动配置,因此您可以直接将其注入应用程序。

如果您不希望绑定器将失败的记录发布到 DLT,那么您必须直接使用RecordRecoverableProcessor,而不是DltAwareProcessor。您可以提供自己的恢复器作为BiConsumer,它接受输入Record和异常作为参数。假设一种场景,您不想将记录发送到 DLT,而只是记录消息并继续。下面是如何实现这一点的示例。

@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process() {
    return input -> input
        .process(() -> new RecordRecoverableProcessor<>(record -> {
					throw new RuntimeException("error");
				},
                (record, exception) -> {
                  // Handle the record
                }));
}

在这种情况下,当记录失败时,RecordRecoverableProcessor使用用户提供的恢复器,它是一个BiConsumer,它接受失败的记录和抛出的异常作为参数。

在 DltAwareProcessor 中处理记录键

当使用DltAwareProcessor将失败的记录发送到 DLT 时,如果您想将记录键发送到 DLT 主题,那么您需要在 DLT 绑定上设置正确的序列化器。这是因为,DltAwareProcessor使用StreamBridge,它使用常规的 Kafka 绑定器(基于消息通道),默认情况下它使用ByteArraySerializer作为键。在记录值的情况下,Spring Cloud Stream 将有效负载转换为适当的byte[];但是,键的情况并非如此,因为它只是将它在标头中接收到的内容作为键传递。如果您提供非字节数组键,那么这可能会导致类转换异常,为了避免这种情况,您需要在 DLT 绑定上设置序列化器,如下所示。

假设 DLT 目标是hello-dlt-1,记录键的数据类型是 String。

spring.cloud.stream.kafka.bindings.hello-dlt-1.producer.configuration.key.serializer=org.apache.kafka.common.serialization.StringSerializer