处理异常

本节介绍如何处理使用 Spring for Apache Kafka 时可能出现的各种异常。

监听器错误处理程序

从 2.0 版本开始,@KafkaListener 注解新增了一个属性:errorHandler

您可以使用errorHandler来提供KafkaListenerErrorHandler实现的bean名称。此函数接口有一个方法,如下所示

@FunctionalInterface
public interface KafkaListenerErrorHandler {

    Object handleError(Message<?> message, ListenerExecutionFailedException exception) throws Exception;

}

您可以访问由消息转换器生成的spring-messaging Message<?>对象和监听器抛出的异常,该异常包装在ListenerExecutionFailedException中。错误处理程序可以抛出原始异常或新异常,该异常将抛出到容器。错误处理程序返回的任何内容都会被忽略。

从版本 2.7 开始,您可以在MessagingMessageConverterBatchMessagingMessageConverter上设置rawRecordHeader属性,这会导致将原始ConsumerRecord添加到KafkaHeaders.RAW_DATA标头中的已转换Message<?>中。例如,如果您希望在监听器错误处理程序中使用DeadLetterPublishingRecoverer,这很有用。它可能用于请求/回复场景,您希望在捕获死信主题中的失败记录后,在经过一定次数的重试后将失败结果发送给发送方。

@Bean
public KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
    return (msg, ex) -> {
        if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
            recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
            return "FAILED";
        }
        throw ex;
    };
}

它有一个子接口(ConsumerAwareListenerErrorHandler),它可以通过以下方法访问消费者对象

Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer);

另一个子接口(ManualAckListenerErrorHandler)在使用手动AckMode时提供对Acknowledgment对象的访问。

Object handleError(Message<?> message, ListenerExecutionFailedException exception,
			Consumer<?, ?> consumer, @Nullable Acknowledgment ack);

在任何情况下,您都不要对消费者执行任何查找,因为容器将不知道它们。

容器错误处理程序

从版本 2.8 开始,传统的ErrorHandlerBatchErrorHandler接口已被新的CommonErrorHandler取代。这些错误处理程序可以处理记录和批处理监听器的错误,允许单个监听器容器工厂为两种类型的监听器创建容器。提供了CommonErrorHandler实现来替换大多数传统的框架错误处理程序实现。

请参阅将自定义传统错误处理程序实现迁移到CommonErrorHandler,以获取有关将自定义错误处理程序迁移到CommonErrorHandler的信息。

当使用事务时,默认情况下不会配置任何错误处理程序,因此异常将回滚事务。事务容器的错误处理由 AfterRollbackProcessor 处理。如果您在使用事务时提供自定义错误处理程序,则必须抛出异常才能回滚事务。

此接口有一个默认方法 isAckAfterHandle(),容器会调用该方法来确定如果错误处理程序在不抛出异常的情况下返回,是否应该提交偏移量;默认情况下它返回 true。

通常,框架提供的错误处理程序会在错误未被“处理”时抛出异常(例如,在执行 seek 操作之后)。默认情况下,容器会在 ERROR 级别记录此类异常。所有框架错误处理程序都扩展了 KafkaExceptionLogLevelAware,它允许您控制记录这些异常的级别。

/**
 * Set the level at which the exception thrown by this handler is logged.
 * @param logLevel the level (default ERROR).
 */
public void setLogLevel(KafkaException.Level logLevel) {
    ...
}

您可以指定一个全局错误处理程序,用于容器工厂中的所有监听器。以下示例展示了如何做到这一点

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
        kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.setCommonErrorHandler(myErrorHandler);
    ...
    return factory;
}

默认情况下,如果带注释的监听器方法抛出异常,则会将其抛出到容器,并且消息将根据容器配置进行处理。

容器会在调用错误处理程序之前提交所有挂起的偏移量提交。

如果您使用的是 Spring Boot,您只需要将错误处理程序添加为 @Bean,Boot 就会将其添加到自动配置的工厂中。

回退处理程序

错误处理程序(例如 DefaultErrorHandler)使用 BackOff 来确定在重试传递之前等待多长时间。从 2.9 版本开始,您可以配置自定义 BackOffHandler。默认处理程序只是挂起线程,直到回退时间过去(或容器停止)。框架还提供了 ContainerPausingBackOffHandler,它会暂停监听器容器,直到回退时间过去,然后恢复容器。当延迟时间长于 max.poll.interval.ms 消费者属性时,这很有用。请注意,实际回退时间的精度将受到 pollTimeout 容器属性的影响。

DefaultErrorHandler

此新的错误处理程序替换了 SeekToCurrentErrorHandlerRecoveringBatchErrorHandler,它们是几个版本以来的默认错误处理程序。一个区别是,批处理监听器(当抛出除 BatchListenerFailedException 之外的异常时)的回退行为等同于 重试完整批次

从 2.9 版本开始,可以配置 DefaultErrorHandler 以提供与下面讨论的寻求未处理记录偏移量相同的语义,但实际上并没有寻求。相反,这些记录会被监听器容器保留,并在错误处理程序退出后(以及在执行一次暂停的 poll() 后,以保持消费者存活;如果使用 非阻塞重试ContainerPausingBackOffHandler,暂停可能会扩展到多个轮询)重新提交给监听器。错误处理程序会向容器返回一个结果,指示当前失败的记录是否可以重新提交,或者它是否已恢复,然后它将不再发送给监听器。要启用此模式,请将属性 seekAfterError 设置为 false

错误处理程序可以恢复(跳过)持续失败的记录。默认情况下,在十次失败后,失败的记录将被记录(在ERROR级别)。您可以使用自定义恢复器(BiConsumer)和BackOff来配置处理程序,它控制传递尝试和每次尝试之间的延迟。使用具有FixedBackOff.UNLIMITED_ATTEMPTSFixedBackOff会导致(实际上)无限重试。以下示例配置了三次尝试后的恢复

DefaultErrorHandler errorHandler =
    new DefaultErrorHandler((record, exception) -> {
        // recover after 3 failures, with no back off - e.g. send to a dead-letter topic
    }, new FixedBackOff(0L, 2L));

要使用此处理程序的自定义实例配置侦听器容器,请将其添加到容器工厂。

例如,使用@KafkaListener容器工厂,您可以添加DefaultErrorHandler,如下所示

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(AckMode.RECORD);
    factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 2L)));
    return factory;
}

对于记录侦听器,这将重试传递最多 2 次(3 次传递尝试),后退时间为 1 秒,而不是默认配置(FixedBackOff(0L, 9))。重试耗尽后,失败将简单地记录下来。

例如,如果poll返回六条记录(每个分区 0、1、2 返回两条),并且侦听器在第四条记录上抛出异常,则容器将通过提交其偏移量来确认前三条消息。DefaultErrorHandler 将分区 1 的偏移量设置为 1,分区 2 的偏移量设置为 0。下一个poll()将返回三条未处理的记录。

如果AckModeBATCH,则容器将在调用错误处理程序之前提交前两个分区的偏移量。

对于批处理侦听器,侦听器必须抛出BatchListenerFailedException,指示批处理中哪些记录失败。

事件顺序如下

  • 提交索引之前的记录的偏移量。

  • 如果重试未耗尽,请执行查找,以便所有剩余的记录(包括失败的记录)将被重新传递。

  • 如果重试已耗尽,请尝试恢复失败的记录(默认情况下仅记录)并执行查找,以便所有剩余的记录(不包括失败的记录)将被重新传递。已恢复记录的偏移量将被提交。

  • 如果重试已耗尽且恢复失败,则执行查找,就像重试未耗尽一样。

从版本 2.9 开始,可以配置DefaultErrorHandler以提供与上面讨论的查找未处理记录偏移量相同的语义,但实际上不进行查找。相反,错误处理程序会创建一个新的ConsumerRecords<?, ?>,其中仅包含未处理的记录,这些记录将在提交给侦听器后(在执行一次暂停的poll()后,以保持消费者处于活动状态)提交。要启用此模式,请将属性seekAfterError设置为false

默认恢复器在重试耗尽后记录失败的记录。您可以使用自定义恢复器,或使用框架提供的恢复器,例如DeadLetterPublishingRecoverer

当使用 POJO 批量监听器(例如 List<Thing>)时,如果无法将完整的消费者记录添加到异常中,则可以添加失败记录的索引。

@KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<Thing> things) {
    for (int i = 0; i < things.size(); i++) {
        try {
            process(things.get(i));
        }
        catch (Exception e) {
            throw new BatchListenerFailedException("Failed to process", i);
        }
    }
}

当容器配置为 AckMode.MANUAL_IMMEDIATE 时,可以配置错误处理程序以提交恢复记录的偏移量;将 commitRecovered 属性设置为 true

另请参阅 发布死信记录

当使用事务时,DefaultAfterRollbackProcessor 提供了类似的功能。请参阅 回滚后处理器

DefaultErrorHandler 将某些异常视为致命异常,并跳过对这些异常的重试;恢复器在第一次失败时被调用。默认情况下,被视为致命的异常是

  • DeserializationException

  • MessageConversionException

  • ConversionException

  • MethodArgumentResolutionException

  • NoSuchMethodException

  • ClassCastException

因为这些异常不太可能在重试传递时解决。

您可以将更多异常类型添加到不可重试类别中,或完全替换分类异常的映射。有关更多信息,请参阅 DefaultErrorHandler.addNotRetryableException()DefaultErrorHandler.setClassifications() 的 Javadoc,以及 spring-retry BinaryExceptionClassifier 的 Javadoc。

以下示例将 IllegalArgumentException 添加到不可重试异常中

@Bean
public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
    DefaultErrorHandler handler = new DefaultErrorHandler(recoverer);
    handler.addNotRetryableExceptions(IllegalArgumentException.class);
    return handler;
}

错误处理程序可以配置一个或多个 RetryListener,接收重试和恢复进度的通知。从版本 2.8.10 开始,添加了针对批量监听器的方法。

@FunctionalInterface
public interface RetryListener {

    void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);

    default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
    }

    default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
    }

    default void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
    }

    default void recovered(ConsumerRecords<?, ?> records, Exception ex) {
    }

	default void recoveryFailed(ConsumerRecords<?, ?> records, Exception original, Exception failure) {
	}

}

有关更多信息,请参阅 JavaDocs。

如果恢复器失败(抛出异常),则失败的记录将包含在搜索中。如果恢复器失败,BackOff 将默认情况下重置,并且重新传递将再次经过回退,然后再尝试恢复。要跳过恢复失败后的重试,请将错误处理程序的 resetStateOnRecoveryFailure 设置为 false

您可以为错误处理程序提供一个 BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>,以根据失败的记录和/或异常确定要使用的 BackOff

handler.setBackOffFunction((record, ex) -> { ... });

如果函数返回 null,则将使用处理程序的默认 BackOff

resetStateOnExceptionChange 设置为 true,如果异常类型在失败之间发生变化,则重试序列将重新启动(包括选择新的 BackOff,如果已配置)。当为 false(版本 2.9 之前的默认值)时,不会考虑异常类型。

从版本 2.9 开始,此值现在默认情况下为 true

另请参阅 投递尝试标题

批处理监听器转换错误

从 2.8 版本开始,批处理监听器现在可以使用 MessageConverterByteArrayDeserializerBytesDeserializerStringDeserializer 以及 DefaultErrorHandler 正确处理转换错误。当发生转换错误时,有效负载将设置为 null,并且反序列化异常将添加到记录标头中,类似于 ErrorHandlingDeserializer。监听器中提供了一个 ConversionException 列表,以便监听器可以抛出 BatchListenerFailedException,指示发生转换异常的第一个索引。

示例

@KafkaListener(id = "test", topics = "topic")
void listen(List<Thing> in, @Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> exceptions) {
    for (int i = 0; i < in.size(); i++) {
        Foo foo = in.get(i);
        if (foo == null && exceptions.get(i) != null) {
            throw new BatchListenerFailedException("Conversion error", exceptions.get(i), i);
        }
        process(foo);
    }
}

重试完整批次

这现在是 DefaultErrorHandler 用于批处理监听器的回退行为,其中监听器抛出除 BatchListenerFailedException 之外的异常。

无法保证,当重新投递批次时,批次具有相同数量的记录和/或重新投递的记录顺序相同。因此,无法轻松维护批次的重试状态。FallbackBatchErrorHandler 采用以下方法。如果批处理监听器抛出不是 BatchListenerFailedException 的异常,则将从内存中的记录批次执行重试。为了避免在扩展的重试序列期间重新平衡,错误处理程序会暂停消费者,在每次重试之前轮询它,然后休眠以进行回退,并再次调用监听器。如果/当重试耗尽时,将为批次中的每条记录调用 ConsumerRecordRecoverer。如果恢复程序抛出异常,或者线程在其休眠期间被中断,则记录批次将在下次轮询时重新投递。在退出之前,无论结果如何,都会恢复消费者。

此机制不能与事务一起使用。

在等待 BackOff 间隔时,错误处理程序将循环使用短暂的休眠,直到达到所需的延迟,同时检查容器是否已停止,允许休眠在 stop() 之后尽快退出,而不是导致延迟。

容器停止错误处理程序

如果监听器抛出异常,CommonContainerStoppingErrorHandler 将停止容器。对于记录监听器,当 AckModeRECORD 时,将提交已处理记录的偏移量。对于记录监听器,当 AckMode 为任何手动值时,将提交已确认记录的偏移量。对于记录监听器,当 AckModeBATCH 时,或者对于批处理监听器,当容器重新启动时,将重播整个批次。

容器停止后,将抛出包装 ListenerExecutionFailedException 的异常。这是为了导致事务回滚(如果启用了事务)。

委托错误处理程序

CommonDelegatingErrorHandler 可以根据异常类型委托给不同的错误处理程序。例如,您可能希望为大多数异常调用 DefaultErrorHandler,或为其他异常调用 CommonContainerStoppingErrorHandler

所有委托必须共享相同的兼容属性(ackAfterHandleseekAfterError …​)。

日志记录错误处理程序

CommonLoggingErrorHandler 只是记录异常;使用记录监听器,来自先前轮询的剩余记录将传递给监听器。对于批处理监听器,将记录批处理中的所有记录。

对记录和批处理监听器使用不同的通用错误处理程序

如果您希望对记录和批处理监听器使用不同的错误处理策略,则提供 CommonMixedErrorHandler,允许为每种监听器类型配置特定的错误处理程序。

通用错误处理程序摘要

  • DefaultErrorHandler

  • CommonContainerStoppingErrorHandler

  • CommonDelegatingErrorHandler

  • CommonLoggingErrorHandler

  • CommonMixedErrorHandler

旧版错误处理程序及其替换

旧版错误处理程序 替换

LoggingErrorHandler

CommonLoggingErrorHandler

BatchLoggingErrorHandler

CommonLoggingErrorHandler

ConditionalDelegatingErrorHandler

DelegatingErrorHandler

ConditionalDelegatingBatchErrorHandler

DelegatingErrorHandler

ContainerStoppingErrorHandler

CommonContainerStoppingErrorHandler

ContainerStoppingBatchErrorHandler

CommonContainerStoppingErrorHandler

SeekToCurrentErrorHandler

DefaultErrorHandler

SeekToCurrentBatchErrorHandler

没有替换,使用 DefaultErrorHandler 以及无限 BackOff

RecoveringBatchErrorHandler

DefaultErrorHandler

RetryingBatchErrorHandler

没有替换,使用 DefaultErrorHandler 并抛出除 BatchListenerFailedException 之外的异常。

将自定义旧版错误处理程序实现迁移到 CommonErrorHandler

请参阅 CommonErrorHandler 中的 JavaDocs。

要替换 ErrorHandlerConsumerAwareErrorHandler 实现,您应该实现 handleOne() 并让 seeksAfterHandle() 返回 false(默认值)。您还应该实现 handleOtherException() 来处理在记录处理范围之外发生的异常(例如,消费者错误)。

要替换 RemainingRecordsErrorHandler 实现,您应该实现 handleRemaining() 并覆盖 seeksAfterHandle() 以返回 true(错误处理程序必须执行必要的查找)。您还应该实现 handleOtherException() - 处理在记录处理范围之外发生的异常(例如,消费者错误)。

要替换任何 BatchErrorHandler 实现,您应该实现 handleBatch()。您还应该实现 handleOtherException() - 用于处理在记录处理范围之外发生的异常(例如,消费者错误)。

回滚处理器

在使用事务时,如果监听器抛出异常(并且错误处理程序(如果存在)也抛出异常),则事务将回滚。默认情况下,任何未处理的记录(包括失败的记录)将在下次轮询时重新获取。这是通过在 DefaultAfterRollbackProcessor 中执行 seek 操作来实现的。对于批处理监听器,整个批次记录将被重新处理(容器不知道批次中哪个记录失败)。要修改此行为,您可以使用自定义 AfterRollbackProcessor 配置监听器容器。例如,对于基于记录的监听器,您可能希望跟踪失败的记录并在尝试一定次数后放弃,也许是通过将其发布到死信主题。

从版本 2.2 开始,DefaultAfterRollbackProcessor 现在可以恢复(跳过)一直失败的记录。默认情况下,在十次失败后,失败的记录将被记录(在 ERROR 级别)。您可以使用自定义恢复器(BiConsumer)和最大失败次数配置处理器。将 maxFailures 属性设置为负数会导致无限次重试。以下示例配置了三次尝试后的恢复

AfterRollbackProcessor<String, String> processor =
    new DefaultAfterRollbackProcessor((record, exception) -> {
        // recover after 3 failures, with no back off - e.g. send to a dead-letter topic
    }, new FixedBackOff(0L, 2L));

当您不使用事务时,可以通过配置 DefaultErrorHandler 来实现类似的功能。请参阅 容器错误处理程序

从版本 3.2 开始,恢复现在可以恢复(跳过)一直失败的整个批次记录。设置 ContainerProperties.setBatchRecoverAfterRollback(true) 以启用此功能。

默认行为,对于批处理监听器,恢复是不可能的,因为框架不知道批次中哪个记录一直失败。在这种情况下,应用程序监听器必须处理一直失败的记录。

另请参阅 发布死信记录

从 2.2.5 版本开始,DefaultAfterRollbackProcessor 可以在一个新的事务中被调用(在失败的事务回滚后开始)。然后,如果您使用 DeadLetterPublishingRecoverer 来发布失败的记录,处理器将发送恢复记录在原始主题/分区中的偏移量到事务中。要启用此功能,请在 DefaultAfterRollbackProcessor 上设置 commitRecoveredkafkaTemplate 属性。

如果恢复器失败(抛出异常),失败的记录将被包含在搜索中。从 2.5.5 版本开始,如果恢复器失败,BackOff 将默认重置,并且重新传递将再次经过回退,然后再尝试恢复。在早期版本中,BackOff 不会重置,并且在下次失败时会重新尝试恢复。要恢复到之前的行为,请将处理器的 resetStateOnRecoveryFailure 属性设置为 false

从 2.6 版本开始,您现在可以为处理器提供一个 BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> 来确定要使用的 BackOff,基于失败的记录和/或异常。

handler.setBackOffFunction((record, ex) -> { ... });

如果函数返回 null,将使用处理器的默认 BackOff

从 2.6.3 版本开始,将 resetStateOnExceptionChange 设置为 true,如果异常类型在失败之间发生变化,则重试序列将重新开始(包括选择新的 BackOff,如果已配置)。默认情况下,不会考虑异常类型。

从 2.3.1 版本开始,类似于 DefaultErrorHandlerDefaultAfterRollbackProcessor 将某些异常视为致命异常,并且对于此类异常将跳过重试;恢复器在第一次失败时被调用。默认情况下,被视为致命的异常是

  • DeserializationException

  • MessageConversionException

  • ConversionException

  • MethodArgumentResolutionException

  • NoSuchMethodException

  • ClassCastException

因为这些异常不太可能在重试传递时解决。

您可以将更多异常类型添加到不可重试类别中,或者完全替换分类异常的映射。有关更多信息,请参阅 DefaultAfterRollbackProcessor.setClassifications() 的 Javadocs,以及 spring-retry BinaryExceptionClassifier 的 Javadocs。

以下示例将 IllegalArgumentException 添加到不可重试异常中

@Bean
public DefaultAfterRollbackProcessor errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
    DefaultAfterRollbackProcessor processor = new DefaultAfterRollbackProcessor(recoverer);
    processor.addNotRetryableException(IllegalArgumentException.class);
    return processor;
}

另请参阅 投递尝试标题

使用当前的 kafka-clients,容器无法检测到 ProducerFencedException 是由重新平衡引起,还是由于超时或过期导致生产者的 transactional.id 被撤销。因为,在大多数情况下,它是由于重新平衡引起的,所以容器不会调用 AfterRollbackProcessor(因为它不适合搜索分区,因为我们不再被分配它们)。如果您确保超时时间足够长以处理每个事务并定期执行“空”事务(例如,通过 ListenerContainerIdleEvent),您可以避免由于超时和过期而导致的围栏。或者,您可以将 stopContainerWhenFenced 容器属性设置为 true,容器将停止,避免记录丢失。您可以使用 ConsumerStoppedEvent 并检查 Reason 属性以获取 FENCED 来检测此条件。由于事件还包含对容器的引用,因此您可以使用此事件重新启动容器。

从 2.7 版本开始,在等待 BackOff 间隔时,错误处理程序将循环执行短暂的休眠,直到达到所需的延迟,同时检查容器是否已停止,允许休眠在 stop() 之后立即退出,而不是造成延迟。

从 2.7 版本开始,处理器可以配置一个或多个 RetryListener,接收重试和恢复进度的通知。

@FunctionalInterface
public interface RetryListener {

    void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);

    default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
    }

    default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
    }

}

有关更多信息,请参阅 JavaDocs。

投递尝试标头

以下内容仅适用于记录监听器,不适用于批处理监听器。

从 2.5 版本开始,当使用实现 DeliveryAttemptAwareErrorHandlerAfterRollbackProcessor 时,可以启用将 KafkaHeaders.DELIVERY_ATTEMPT 标头 (kafka_deliveryAttempt) 添加到记录中。此标头的值为从 1 开始递增的整数。当接收原始 ConsumerRecord<?, ?> 时,整数位于 byte[4] 中。

int delivery = ByteBuffer.wrap(record.headers()
    .lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value())
    .getInt();

当使用 @KafkaListener 以及 DefaultKafkaHeaderMapperSimpleKafkaHeaderMapper 时,可以通过将 @Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery 作为参数添加到监听器方法中来获取它。

要启用此标头的填充,请将容器属性 deliveryAttemptHeader 设置为 true。默认情况下它被禁用,以避免为每个记录查找状态并添加标头的(很小的)开销。

DefaultErrorHandlerDefaultAfterRollbackProcessor 支持此功能。

监听器信息标头

在某些情况下,能够知道监听器在哪个容器中运行非常有用。

从 2.8.4 版本开始,您现在可以在监听器容器上设置 listenerInfo 属性,或在 @KafkaListener 注解上设置 info 属性。然后,容器将将其添加到所有传入消息的 KafkaListener.LISTENER_INFO 标头中;然后可以在记录拦截器、过滤器等中使用它,或者在监听器本身中使用它。

@KafkaListener(id = "something", topics = "topic", filter = "someFilter",
        info = "this is the something listener")
public void listen(@Payload Thing thing,
        @Header(KafkaHeaders.LISTENER_INFO) String listenerInfo) {
    ...
}

当在 RecordInterceptorRecordFilterStrategy 实现中使用时,标头在消费者记录中以字节数组形式存在,使用 KafkaListenerAnnotationBeanPostProcessorcharSet 属性进行转换。

标头映射器在从消费者记录创建 MessageHeaders 时也转换为 String,并且永远不会在出站记录上映射此标头。

对于 POJO 批处理监听器,从 2.8.6 版本开始,标头被复制到批处理的每个成员中,并且在转换后也作为单个 String 参数可用。

@KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory",
        info = "info for batch")
public void listen(List<Thing> list,
        @Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
        @Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
        @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
        @Header(KafkaHeaders.OFFSET) List<Long> offsets,
        @Header(KafkaHeaders.LISTENER_INFO) String info) {
            ...
}
如果批处理监听器有过滤器,并且过滤器导致批处理为空,则需要在@Header参数中添加required = false,因为空批处理没有可用信息。

如果收到List<Message<Thing>>,则信息位于每个Message<?>KafkaHeaders.LISTENER_INFO头中。

有关消费批处理的更多信息,请参见批处理监听器

发布死信记录

当记录达到最大失败次数时,您可以使用记录恢复器配置DefaultErrorHandlerDefaultAfterRollbackProcessor。框架提供了DeadLetterPublishingRecoverer,它将失败的消息发布到另一个主题。恢复器需要一个KafkaTemplate<Object, Object>,用于发送记录。您还可以选择使用BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>进行配置,该函数用于解析目标主题和分区。

默认情况下,死信记录将发送到名为<originalTopic>.DLT(原始主题名称后缀为.DLT)的主题,并发送到与原始记录相同的分区。因此,当您使用默认解析器时,死信主题**必须至少与原始主题具有相同数量的分区**。

如果返回的TopicPartition的分区为负数,则不会在ProducerRecord中设置分区,因此分区由 Kafka 选择。从版本 2.2.4 开始,任何ListenerExecutionFailedException(例如,当在@KafkaListener方法中检测到异常时抛出)都将使用groupId属性进行增强。这允许目标解析器使用此属性以及ConsumerRecord中的信息来选择死信主题。

以下示例展示了如何连接自定义目标解析器

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
        (r, e) -> {
            if (e instanceof FooException) {
                return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
            }
            else {
                return new TopicPartition(r.topic() + ".other.failures", r.partition());
            }
        });
CommonErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 2L));

发送到死信主题的记录将使用以下头信息进行增强

  • KafkaHeaders.DLT_EXCEPTION_FQCN:异常类名(通常为ListenerExecutionFailedException,但可能是其他异常)。

  • KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN:异常原因类名(如果存在)(从版本 2.8 开始)。

  • KafkaHeaders.DLT_EXCEPTION_STACKTRACE:异常堆栈跟踪。

  • KafkaHeaders.DLT_EXCEPTION_MESSAGE:异常消息。

  • KafkaHeaders.DLT_KEY_EXCEPTION_FQCN:异常类名(仅限键反序列化错误)。

  • KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE: 异常堆栈跟踪(仅限键反序列化错误)。

  • KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE: 异常消息(仅限键反序列化错误)。

  • KafkaHeaders.DLT_ORIGINAL_TOPIC: 原始主题。

  • KafkaHeaders.DLT_ORIGINAL_PARTITION: 原始分区。

  • KafkaHeaders.DLT_ORIGINAL_OFFSET: 原始偏移量。

  • KafkaHeaders.DLT_ORIGINAL_TIMESTAMP: 原始时间戳。

  • KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE: 原始时间戳类型。

  • KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP: 无法处理记录的原始消费者组(从版本 2.8 开始)。

键异常仅由 DeserializationException 引起,因此没有 DLT_KEY_EXCEPTION_CAUSE_FQCN

有两种机制可以添加更多头信息。

  1. 子类化恢复器并覆盖 createProducerRecord() - 调用 super.createProducerRecord() 并添加更多头信息。

  2. 提供一个 BiFunction 来接收消费者记录和异常,返回一个 Headers 对象;来自那里的头信息将被复制到最终的生产者记录;另请参阅 管理死信记录头信息。使用 setHeadersFunction() 设置 BiFunction

第二种方法实现起来更简单,但第一种方法提供了更多可用信息,包括已经组装的标准头信息。

从版本 2.3 开始,当与 ErrorHandlingDeserializer 结合使用时,发布者将在死信生产者记录中恢复记录的 value(),恢复为无法反序列化的原始值。以前,value() 为 null,用户代码必须从消息头信息中解码 DeserializationException。此外,您可以向发布者提供多个 KafkaTemplate;例如,如果您想发布来自 DeserializationExceptionbyte[],以及使用与成功反序列化的记录不同的序列化器进行反序列化的值,则可能需要这样做。以下是如何使用使用 Stringbyte[] 序列化器的 KafkaTemplate 配置发布者的示例

@Bean
public DeadLetterPublishingRecoverer publisher(KafkaTemplate<?, ?> stringTemplate,
        KafkaTemplate<?, ?> bytesTemplate) {
    Map<Class<?>, KafkaTemplate<?, ?>> templates = new LinkedHashMap<>();
    templates.put(String.class, stringTemplate);
    templates.put(byte[].class, bytesTemplate);
    return new DeadLetterPublishingRecoverer(templates);
}

发布者使用映射键来定位适合要发布的 value() 的模板。建议使用 LinkedHashMap,以便按顺序检查键。

当发布null值时,如果有多个模板,恢复器将查找Void类的模板;如果不存在,将使用values().iterator()中的第一个模板。

从 2.7 版本开始,您可以使用setFailIfSendResultIsError方法,以便在消息发布失败时抛出异常。您还可以使用setWaitForSendResultTimeout设置发送成功验证的超时时间。

如果恢复器失败(抛出异常),失败的记录将包含在搜索中。从 2.5.5 版本开始,如果恢复器失败,BackOff将默认重置,重新传递将再次经过回退,然后才会再次尝试恢复。在早期版本中,BackOff不会重置,并且在下次失败时会重新尝试恢复。要恢复到之前的行为,请将错误处理程序的resetStateOnRecoveryFailure属性设置为false

从 2.6.3 版本开始,将 resetStateOnExceptionChange 设置为 true,如果异常类型在失败之间发生变化,则重试序列将重新开始(包括选择新的 BackOff,如果已配置)。默认情况下,不会考虑异常类型。

从 2.3 版本开始,恢复器也可以与 Kafka Streams 一起使用 - 有关更多信息,请参阅从反序列化异常中恢复

ErrorHandlingDeserializer在标头ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADERErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER中添加反序列化异常(使用 Java 序列化)。默认情况下,这些标头不会保留在发布到死信主题的消息中。从 2.7 版本开始,如果键和值都反序列化失败,则将原始键和值填充到发送到 DLT 的记录中。

如果传入的记录相互依赖,但可能以乱序到达,则将失败的记录重新发布到原始主题的尾部(多次)可能很有用,而不是直接将其发送到死信主题。有关示例,请参阅此 Stack Overflow 问题

以下错误处理程序配置将完全执行此操作

@Bean
public ErrorHandler eh(KafkaOperations<String, String> template) {
    return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template,
            (rec, ex) -> {
                org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
                if (retries == null) {
                    retries = new RecordHeader("retries", new byte[] { 1 });
                    rec.headers().add(retries);
                }
                else {
                    retries.value()[0]++;
                }
                return retries.value()[0] > 5
                        ? new TopicPartition("topic.DLT", rec.partition())
                        : new TopicPartition("topic", rec.partition());
            }), new FixedBackOff(0L, 0L));
}

从 2.7 版本开始,恢复器检查目标解析器选择的分区是否实际存在。如果分区不存在,则ProducerRecord中的分区将设置为null,允许KafkaProducer选择分区。您可以通过将verifyPartition属性设置为false来禁用此检查。

从 3.1 版本开始,将logRecoveryRecord属性设置为true将记录恢复记录和异常。

管理死信记录标头

参考上面发布死信记录DeadLetterPublishingRecoverer有两个属性用于管理标头,当这些标头已经存在时(例如,当重新处理失败的死信记录时,包括使用非阻塞重试时)。

  • appendOriginalHeaders(默认值为true

  • stripPreviousExceptionHeaders(从 2.8 版本开始,默认值为true

Apache Kafka 支持具有相同名称的多个标头;要获取“最新”值,您可以使用headers.lastHeader(headerName);要获取多个标头的迭代器,请使用headers.headers(headerName).iterator()

当重复发布失败的记录时,这些标头可能会增长(并最终导致发布失败,原因是RecordTooLargeException);这对异常标头尤其如此,特别是对于堆栈跟踪标头。

这两个属性的原因是,虽然你可能只想保留最后一个异常信息,但你可能还想保留记录在每次失败时经过的主题历史记录。

appendOriginalHeaders 应用于所有名为 ORIGINAL 的标头,而 stripPreviousExceptionHeaders 应用于所有名为 EXCEPTION 的标头。

从 2.8.4 版本开始,您现在可以控制哪些标准标头将被添加到输出记录中。请参阅 enum HeadersToAdd 以获取(目前)默认添加的 10 个标准标头的通用名称(这些不是实际的标头名称,只是一个抽象;实际的标头名称由 getHeaderNames() 方法设置,子类可以覆盖该方法)。

要排除标头,请使用 excludeHeaders() 方法;例如,要抑制在标头中添加异常堆栈跟踪,请使用

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.excludeHeaders(HeaderNames.HeadersToAdd.EX_STACKTRACE);

此外,您可以通过添加 ExceptionHeadersCreator 来完全自定义异常标头的添加;这也会禁用所有标准异常标头。

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> {
    kafkaHeaders.add(new RecordHeader(..., ...));
});

从 2.8.4 版本开始,您现在还可以通过 addHeadersFunction 方法提供多个标头函数。这允许应用其他函数,即使另一个函数已经注册,例如,当使用 非阻塞重试 时。

另请参阅 失败标头管理非阻塞重试

ExponentialBackOffWithMaxRetries 实现

Spring 框架提供了一些 BackOff 实现。默认情况下,ExponentialBackOff 将无限期地重试;要在一定次数的重试尝试后放弃,需要计算 maxElapsedTime。从 2.7.3 版本开始,Spring for Apache Kafka 提供了 ExponentialBackOffWithMaxRetries,它是一个接收 maxRetries 属性并自动计算 maxElapsedTime 的子类,这更方便一些。

@Bean
DefaultErrorHandler handler() {
    ExponentialBackOffWithMaxRetries bo = new ExponentialBackOffWithMaxRetries(6);
    bo.setInitialInterval(1_000L);
    bo.setMultiplier(2.0);
    bo.setMaxInterval(10_000L);
    return new DefaultErrorHandler(myRecoverer, bo);
}

这将在调用恢复程序之前,在 1, 2, 4, 8, 10, 10 秒后重试。