异常处理

本节描述如何处理使用 Spring for Apache Kafka 时可能出现的各种异常。

监听器错误处理器

从 2.0 版本开始,@KafkaListener 注解新增了一个属性:errorHandler

您可以使用errorHandler 提供一个KafkaListenerErrorHandler 实现的 bean 名称。此函数接口有一个方法,如下所示

@FunctionalInterface
public interface KafkaListenerErrorHandler {

    Object handleError(Message<?> message, ListenerExecutionFailedException exception) throws Exception;

}

您可以访问由消息转换器生成的 spring-messaging Message<?> 对象以及监听器抛出的异常(封装在ListenerExecutionFailedException 中)。错误处理器可以抛出原始异常或新的异常,该异常将抛给容器。错误处理器返回的任何内容都将被忽略。

从 2.7 版本开始,您可以设置MessagingMessageConverterBatchMessagingMessageConverter 上的rawRecordHeader 属性,这会导致原始ConsumerRecord 添加到KafkaHeaders.RAW_DATA 头中的已转换Message<?> 中。例如,如果您希望在监听器错误处理器中使用DeadLetterPublishingRecoverer,这将非常有用。它可能用于请求/回复场景中,在捕获死信主题中的失败记录后,在尝试若干次后,您希望将失败结果发送给发送者。

@Bean
public KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
    return (msg, ex) -> {
        if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
            recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
            return "FAILED";
        }
        throw ex;
    };
}

它有一个子接口 (ConsumerAwareListenerErrorHandler),可以通过以下方法访问消费者对象:

Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer);

另一个子接口 (ManualAckListenerErrorHandler) 在使用手动AckMode 时提供对Acknowledgment 对象的访问。

Object handleError(Message<?> message, ListenerExecutionFailedException exception,
			Consumer<?, ?> consumer, @Nullable Acknowledgment ack);

在这两种情况下,您都不应在消费者上执行任何查找操作,因为容器将不知道这些操作。

容器错误处理器

从 2.8 版本开始,旧的ErrorHandlerBatchErrorHandler 接口已被新的CommonErrorHandler 取代。这些错误处理器可以处理记录和批量监听器的错误,允许单个监听器容器工厂为这两种类型的监听器创建容器。提供了CommonErrorHandler 实现来替换大多数旧的框架错误处理器实现。

有关将自定义错误处理器迁移到CommonErrorHandler 的信息,请参见将自定义旧错误处理器实现迁移到CommonErrorHandler

当使用事务时,默认情况下不会配置任何错误处理器,以便异常会回滚事务。事务性容器的错误处理由AfterRollbackProcessor 处理。如果在使用事务时提供自定义错误处理器,则如果要回滚事务,则必须抛出异常。

此接口有一个默认方法isAckAfterHandle(),容器会调用此方法来确定如果错误处理器返回而不抛出异常,是否应该提交偏移量;默认情况下,它返回 true。

通常,框架提供的错误处理器在未“处理”错误(例如,执行查找操作后)时会抛出异常。默认情况下,容器会以ERROR 级别记录此类异常。所有框架错误处理器都扩展了KafkaExceptionLogLevelAware,这允许您控制记录这些异常的级别。

/**
 * Set the level at which the exception thrown by this handler is logged.
 * @param logLevel the level (default ERROR).
 */
public void setLogLevel(KafkaException.Level logLevel) {
    ...
}

您可以指定一个全局错误处理器,用于容器工厂中的所有监听器。以下示例显示了如何执行此操作

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
        kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.setCommonErrorHandler(myErrorHandler);
    ...
    return factory;
}

默认情况下,如果带注解的监听器方法抛出异常,则会将其抛给容器,并且根据容器配置处理消息。

容器在调用错误处理程序之前会提交所有挂起的偏移量提交。

如果您使用的是 Spring Boot,则只需将错误处理程序添加为@Bean,Boot 将将其添加到自动配置的工厂中。

回退处理程序

诸如DefaultErrorHandler之类的错误处理程序使用BackOff来确定在重试投递之前等待多长时间。从 2.9 版开始,您可以配置自定义的BackOffHandler。默认处理程序只是挂起线程,直到回退时间过去(或容器停止)。框架还提供ContainerPausingBackOffHandler,它会暂停监听器容器,直到回退时间过去,然后恢复容器。当延迟时间长于max.poll.interval.ms消费者属性时,这非常有用。请注意,实际回退时间的精度会受到pollTimeout容器属性的影响。

DefaultErrorHandler

这个新的错误处理程序取代了SeekToCurrentErrorHandlerRecoveringBatchErrorHandler,它们是几个版本以来的默认错误处理程序。一个区别是,批处理监听器(当抛出除BatchListenerFailedException以外的异常时)的回退行为等效于重试完整批次

从 2.9 版开始,可以配置DefaultErrorHandler以提供与查找未处理记录偏移量相同的语义(如下所述),但无需实际查找。相反,记录由监听器容器保留,并在错误处理程序退出后(以及执行单个暂停的poll()之后,以保持消费者处于活动状态;如果使用非阻塞重试ContainerPausingBackOffHandler,则暂停可能会持续多个轮询)重新提交给监听器。错误处理程序向容器返回一个结果,指示当前失败的记录是否可以重新提交,或者它是否已恢复,然后它将不会再次发送到监听器。要启用此模式,请将属性seekAfterError设置为false

错误处理程序可以恢复(跳过)持续失败的记录。默认情况下,在十次失败后,失败的记录将被记录(在ERROR级别)。您可以使用自定义恢复程序(BiConsumer)和BackOff来配置处理程序,该BackOff控制投递尝试和每次尝试之间的延迟。使用具有FixedBackOff.UNLIMITED_ATTEMPTSFixedBackOff会导致(实际上)无限次重试。以下示例配置了三次尝试后的恢复

DefaultErrorHandler errorHandler =
    new DefaultErrorHandler((record, exception) -> {
        // recover after 3 failures, with no back off - e.g. send to a dead-letter topic
    }, new FixedBackOff(0L, 2L));

要使用此处理程序的自定义实例配置监听器容器,请将其添加到容器工厂。

例如,对于@KafkaListener容器工厂,您可以按如下方式添加DefaultErrorHandler

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(AckMode.RECORD);
    factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 2L)));
    return factory;
}

对于记录监听器,这将重试投递最多 2 次(3 次投递尝试),回退时间为 1 秒,而不是默认配置(FixedBackOff(0L, 9))。重试耗尽后,只会记录失败。

例如,如果poll返回六条记录(每个分区 0、1、2 各两条),并且监听器在第四条记录上抛出异常,则容器会通过提交其偏移量来确认前三条消息。DefaultErrorHandler将分区 1 的偏移量设置为 1,将分区 2 的偏移量设置为 0。下一个poll()返回三条未处理的记录。

如果AckModeBATCH,则容器在调用错误处理程序之前会提交前两个分区的偏移量。

对于批处理监听器,监听器必须抛出BatchListenerFailedException,指示批处理中哪些记录失败。

事件顺序如下:

  • 提交索引之前的记录的偏移量。

  • 如果重试未耗尽,则执行查找,以便所有剩余记录(包括失败记录)都将被重新投递。

  • 如果重试已耗尽,则尝试恢复失败的记录(默认情况下仅记录)并执行查找,以便剩余记录(不包括失败记录)将被重新投递。已恢复记录的偏移量将被提交。

  • 如果重试已耗尽且恢复失败,则执行查找,就像重试未耗尽一样。

从 2.9 版开始,可以配置DefaultErrorHandler以提供与上面讨论的查找未处理记录偏移量相同的语义,但无需实际查找。相反,错误处理程序会创建一个新的ConsumerRecords<?, ?>,其中只包含未处理的记录,然后将其提交给监听器(在执行单个暂停的poll()之后,以保持消费者处于活动状态)。要启用此模式,请将属性seekAfterError设置为false

重试耗尽后,默认恢复程序会记录失败的记录。您可以使用自定义恢复程序,或框架提供的恢复程序,例如DeadLetterPublishingRecoverer

使用 POJO 批处理监听器(例如List<Thing>)时,如果您没有完整的消费者记录添加到异常中,则只需添加失败记录的索引即可

@KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<Thing> things) {
    for (int i = 0; i < things.size(); i++) {
        try {
            process(things.get(i));
        }
        catch (Exception e) {
            throw new BatchListenerFailedException("Failed to process", i);
        }
    }
}

当容器配置为AckMode.MANUAL_IMMEDIATE时,可以配置错误处理程序以提交已恢复记录的偏移量;将commitRecovered属性设置为true

另请参见发布死信记录

使用事务时,DefaultAfterRollbackProcessor提供了类似的功能。请参见回滚后处理器

DefaultErrorHandler认为某些异常是致命的,并且会跳过此类异常的重试;在第一次失败时调用恢复程序。默认情况下,被认为是致命的异常有:

  • DeserializationException

  • MessageConversionException

  • ConversionException

  • MethodArgumentResolutionException

  • NoSuchMethodException

  • ClassCastException

因为这些异常不太可能在重试投递时解决。

您可以向不可重试类别添加更多异常类型,或完全替换分类异常的映射。有关更多信息,请参见DefaultErrorHandler.addNotRetryableException()DefaultErrorHandler.setClassifications()的Javadoc,以及spring-retry BinaryExceptionClassifier的Javadoc。

这是一个将IllegalArgumentException添加到不可重试异常的示例

@Bean
public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
    DefaultErrorHandler handler = new DefaultErrorHandler(recoverer);
    handler.addNotRetryableExceptions(IllegalArgumentException.class);
    return handler;
}

可以使用一个或多个RetryListener配置错误处理程序,以接收重试和恢复进度的通知。从 2.8.10 版开始,添加了批处理监听器的方法。

@FunctionalInterface
public interface RetryListener {

    void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);

    default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
    }

    default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
    }

    default void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
    }

    default void recovered(ConsumerRecords<?, ?> records, Exception ex) {
    }

	default void recoveryFailed(ConsumerRecords<?, ?> records, Exception original, Exception failure) {
	}

}

有关更多信息,请参见 JavaDocs。

如果恢复程序失败(抛出异常),则失败的记录将包含在查找中。如果恢复程序失败,则默认情况下将重置BackOff,并且重新投递将再次通过回退,然后再尝试恢复。要在恢复失败后跳过重试,请将错误处理程序的resetStateOnRecoveryFailure设置为false

您可以为错误处理程序提供一个BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>,以根据失败的记录和/或异常来确定要使用的BackOff

handler.setBackOffFunction((record, ex) -> { ... });

如果函数返回null,则将使用处理程序的默认BackOff

设置resetStateOnExceptionChangetrue,如果失败之间的异常类型发生更改,则重试序列将重新启动(包括选择新的BackOff,如果已配置)。当为false(2.9 版之前的默认值)时,不会考虑异常类型。

从 2.9 版开始,此值现在默认为true

另请参见投递尝试标头

批处理错误处理程序的转换错误

从 2.8 版开始,批处理监听器现在可以使用MessageConverterByteArrayDeserializerBytesDeserializerStringDeserializer以及DefaultErrorHandler正确处理转换错误。当发生转换错误时,有效负载将设置为 null,并将反序列化异常添加到记录标头中,类似于ErrorHandlingDeserializer。监听器中提供了一个ConversionException列表,因此监听器可以抛出BatchListenerFailedException,指示发生转换异常的第一个索引。

示例

@KafkaListener(id = "test", topics = "topic")
void listen(List<Thing> in, @Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> exceptions) {
    for (int i = 0; i < in.size(); i++) {
        Foo foo = in.get(i);
        if (foo == null && exceptions.get(i) != null) {
            throw new BatchListenerFailedException("Conversion error", exceptions.get(i), i);
        }
        process(foo);
    }
}

重试完整批次

现在,这是批处理监听器的DefaultErrorHandler的回退行为,其中监听器抛出除BatchListenerFailedException以外的异常。

无法保证在重新投递批次时,批次具有相同的记录数量和/或重新投递的记录顺序相同。因此,很难轻松维护批次的重试状态。FallbackBatchErrorHandler采用以下方法。如果批处理监听器抛出不是BatchListenerFailedException的异常,则将从内存中的记录批次执行重试。为了避免在扩展的重试序列期间重新平衡,错误处理程序会暂停使用者,在每次重试之前轮询它,然后休眠一段时间,然后再次调用监听器。如果/何时重试耗尽,则将为批次中的每个记录调用ConsumerRecordRecoverer。如果恢复程序抛出异常,或者线程在其休眠期间被中断,则记录批次将在下次轮询时重新投递。无论结果如何,在退出之前,使用者都会恢复。

此机制不能与事务一起使用。

在等待BackOff间隔期间,错误处理程序将循环使用短暂的休眠,直到达到所需的延迟,同时检查容器是否已停止,允许休眠在stop()之后很快退出,而不是导致延迟。

容器停止错误处理程序

如果监听器抛出异常,则CommonContainerStoppingErrorHandler会停止容器。对于记录监听器,当AckModeRECORD时,会提交已处理记录的偏移量。对于记录监听器,当AckMode为任何手动值时,会提交已确认记录的偏移量。对于记录监听器,当AckModeBATCH时,或者对于批处理监听器,在容器重新启动时会重播整个批次。

容器停止后,会抛出一个包装了ListenerExecutionFailedException的异常。这是为了导致事务回滚(如果启用了事务)。

委托错误处理程序

CommonDelegatingErrorHandler可以根据异常类型委托给不同的错误处理程序。例如,您可能希望为大多数异常调用DefaultErrorHandler,或为其他异常调用CommonContainerStoppingErrorHandler

所有委托必须共享相同的兼容属性(ackAfterHandleseekAfterError……)。

日志记录错误处理程序

CommonLoggingErrorHandler只是记录异常;对于记录监听器,先前轮询中剩余的记录将传递给监听器。对于批处理监听器,批次中的所有记录都将被记录。

对记录和批处理监听器使用不同的通用错误处理程序

如果您希望对记录和批处理监听器使用不同的错误处理策略,则提供CommonMixedErrorHandler,允许为每种监听器类型配置特定的错误处理程序。

通用错误处理程序摘要

  • DefaultErrorHandler

  • CommonContainerStoppingErrorHandler

  • CommonDelegatingErrorHandler

  • CommonLoggingErrorHandler

  • CommonMixedErrorHandler

旧版错误处理程序及其替代品

旧版错误处理器 替换方案

LoggingErrorHandler

CommonLoggingErrorHandler

BatchLoggingErrorHandler

CommonLoggingErrorHandler

ConditionalDelegatingErrorHandler

DelegatingErrorHandler

ConditionalDelegatingBatchErrorHandler

DelegatingErrorHandler

ContainerStoppingErrorHandler

CommonContainerStoppingErrorHandler

ContainerStoppingBatchErrorHandler

CommonContainerStoppingErrorHandler

SeekToCurrentErrorHandler

DefaultErrorHandler

SeekToCurrentBatchErrorHandler

无替换方案,请使用带有无限 `BackOff` 的 `DefaultErrorHandler`。

RecoveringBatchErrorHandler

DefaultErrorHandler

RetryingBatchErrorHandler

无替换方案,请使用 `DefaultErrorHandler` 并抛出 `BatchListenerFailedException` 以外的异常。

将自定义旧版错误处理器实现迁移到 `CommonErrorHandler`

请参考 `CommonErrorHandler` 中的 JavaDoc。

要替换 `ErrorHandler` 或 `ConsumerAwareErrorHandler` 实现,您应该实现 `handleOne()` 并让 `seeksAfterHandle()` 返回 `false`(默认值)。您还应该实现 `handleOtherException()` 来处理记录处理范围之外发生的异常(例如消费者错误)。

要替换 `RemainingRecordsErrorHandler` 实现,您应该实现 `handleRemaining()` 并覆盖 `seeksAfterHandle()` 以返回 `true`(错误处理器必须执行必要的 seek 操作)。您还应该实现 `handleOtherException()` 来处理记录处理范围之外发生的异常(例如消费者错误)。

要替换任何 `BatchErrorHandler` 实现,您应该实现 `handleBatch()`。您还应该实现 `handleOtherException()` 来处理记录处理范围之外发生的异常(例如消费者错误)。

回滚后处理器

使用事务时,如果监听器抛出异常(以及如果存在,错误处理器也抛出异常),则事务将回滚。默认情况下,任何未处理的记录(包括失败的记录)将在下次轮询时重新获取。这是通过在 `DefaultAfterRollbackProcessor` 中执行 `seek` 操作来实现的。对于批量监听器,整个批次的记录都会重新处理(容器不知道批次中哪个记录失败)。要修改此行为,您可以使用自定义的 `AfterRollbackProcessor` 配置监听器容器。例如,对于基于记录的监听器,您可能希望跟踪失败的记录,并在尝试一定次数后放弃,也许是将其发布到死信主题。

从 2.2 版本开始,`DefaultAfterRollbackProcessor` 现在可以恢复(跳过)持续失败的记录。默认情况下,在十次失败后,失败的记录将被记录(在 `ERROR`级别)。您可以使用自定义恢复器(`BiConsumer`)和最大失败次数来配置处理器。将 `maxFailures` 属性设置为负数会导致无限重试。以下示例配置了三次尝试后的恢复

AfterRollbackProcessor<String, String> processor =
    new DefaultAfterRollbackProcessor((record, exception) -> {
        // recover after 3 failures, with no back off - e.g. send to a dead-letter topic
    }, new FixedBackOff(0L, 2L));

如果您不使用事务,可以通过配置 `DefaultErrorHandler` 来实现类似的功能。请参见 容器错误处理器

从 3.2 版本开始,恢复现在可以恢复(跳过)持续失败的整个记录批次。设置 `ContainerProperties.setBatchRecoverAfterRollback(true)` 以启用此功能。

默认行为,对于批量监听器,恢复是不可能的,因为框架不知道批次中哪个记录持续失败。在这种情况下,应用程序监听器必须处理持续失败的记录。

另请参见发布死信记录

从 2.2.5 版本开始,`DefaultAfterRollbackProcessor` 可以在新的事务中调用(在失败的事务回滚后启动)。然后,如果您使用 `DeadLetterPublishingRecoverer` 来发布失败的记录,则处理器会将恢复的记录在原始主题/分区中的偏移量发送到事务。要启用此功能,请在 `DefaultAfterRollbackProcessor` 上设置 `commitRecovered` 和 `kafkaTemplate` 属性。

如果恢复器失败(抛出异常),则失败的记录将包含在 seek 中。从 2.5.5 版本开始,如果恢复器失败,则默认情况下 `BackOff` 将被重置,并且重新投递将再次经过 back off,然后再尝试恢复。在早期版本中,`BackOff` 没有被重置,并且在下次失败时会重新尝试恢复。要恢复到之前的行为,请将处理器的 `resetStateOnRecoveryFailure` 属性设置为 `false`。

从 2.6 版本开始,您现在可以为处理器提供一个 `BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>` 来确定要使用的 `BackOff`,基于失败的记录和/或异常。

handler.setBackOffFunction((record, ex) -> { ... });

如果函数返回 `null`,则将使用处理器的默认 `BackOff`。

从 2.6.3 版本开始,如果异常类型在失败之间发生变化,则将 `resetStateOnExceptionChange` 设置为 `true`,重试序列将重新启动(包括选择新的 `BackOff`,如果已配置)。默认情况下,不考虑异常类型。

从 2.3.1 版本开始,类似于 `DefaultErrorHandler`,`DefaultAfterRollbackProcessor` 将某些异常视为致命异常,并将跳过此类异常的重试;恢复器在第一次失败时被调用。默认情况下,被视为致命的异常包括

  • DeserializationException

  • MessageConversionException

  • ConversionException

  • MethodArgumentResolutionException

  • NoSuchMethodException

  • ClassCastException

因为这些异常不太可能在重试投递时解决。

您可以向不可重试类别添加更多异常类型,或完全替换分类异常的映射。有关更多信息,请参阅 `DefaultAfterRollbackProcessor.setClassifications()` 的 Javadoc,以及 `spring-retry` `BinaryExceptionClassifier` 的 Javadoc。

这是一个将IllegalArgumentException添加到不可重试异常的示例

@Bean
public DefaultAfterRollbackProcessor errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
    DefaultAfterRollbackProcessor processor = new DefaultAfterRollbackProcessor(recoverer);
    processor.addNotRetryableException(IllegalArgumentException.class);
    return processor;
}

另请参见投递尝试标头

使用当前的 `kafka-clients`,容器无法检测 `ProducerFencedException` 是由重新平衡引起的,还是由于超时或过期导致生产者的 `transactional.id` 被撤销。因为在大多数情况下,它是由重新平衡引起的,所以容器不会调用 `AfterRollbackProcessor`(因为对分区进行 seek 是不合适的,因为我们不再分配它们)。如果您确保超时足够长以处理每个事务并定期执行“空”事务(例如,通过 `ListenerContainerIdleEvent`),您可以避免由于超时和过期而导致的围栏。或者,您可以将 `stopContainerWhenFenced` 容器属性设置为 `true`,容器将停止,避免记录丢失。您可以使用 `ConsumerStoppedEvent` 并检查 `Reason` 属性是否为 `FENCED` 来检测此条件。由于事件也包含对容器的引用,您可以使用此事件重新启动容器。

从 2.7 版本开始,在等待 `BackOff` 间隔期间,错误处理器将循环并短暂休眠,直到达到所需的延迟,同时检查容器是否已停止,允许休眠在 `stop()` 后不久退出,而不是造成延迟。

从 2.7 版本开始,可以使用一个或多个 `RetryListener` 配置处理器,接收重试和恢复进度的通知。

@FunctionalInterface
public interface RetryListener {

    void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);

    default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
    }

    default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
    }

}

有关更多信息,请参见 JavaDocs。

投递尝试标头

以下内容仅适用于记录监听器,不适用于批量监听器。

从 2.5 版本开始,当使用实现 `DeliveryAttemptAware` 的 `ErrorHandler` 或 `AfterRollbackProcessor` 时,可以启用将 `KafkaHeaders.DELIVERY_ATTEMPT` 标头(`kafka_deliveryAttempt`)添加到记录中。此标头的值为从 1 开始递增的整数。当接收原始 `ConsumerRecord<?, ?>` 时,整数位于 `byte[4]` 中。

int delivery = ByteBuffer.wrap(record.headers()
    .lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value())
    .getInt();

当使用 `@KafkaListener` 和 `DefaultKafkaHeaderMapper` 或 `SimpleKafkaHeaderMapper` 时,可以通过将 `@Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery` 添加为监听器方法的参数来获取它。

要启用此标头的填充,请将容器属性 `deliveryAttemptHeader` 设置为 `true`。默认情况下它被禁用,以避免查找每个记录的状态和添加标头的(少量)开销。

`DefaultErrorHandler` 和 `DefaultAfterRollbackProcessor` 支持此功能。

监听器信息标头

在某些情况下,能够知道监听器在哪个容器中运行非常有用。

从 2.8.4 版本开始,您现在可以在监听器容器上设置 `listenerInfo` 属性,或在 `@KafkaListener` 注解上设置 `info` 属性。然后,容器会将此信息添加到所有传入消息的 `KafkaListener.LISTENER_INFO` 标头中;然后它可以在记录拦截器、过滤器等或监听器本身中使用。

@KafkaListener(id = "something", topics = "topic", filter = "someFilter",
        info = "this is the something listener")
public void listen(@Payload Thing thing,
        @Header(KafkaHeaders.LISTENER_INFO) String listenerInfo) {
    ...
}

在 `RecordInterceptor` 或 `RecordFilterStrategy` 实现中使用时,标头作为字节数组位于消费者记录中,使用 `KafkaListenerAnnotationBeanPostProcessor` 的 `charSet` 属性进行转换。

标头映射器在从消费者记录创建 `MessageHeaders` 时也转换为 `String`,并且永远不会在出站记录上映射此标头。

对于 POJO 批量监听器,从 2.8.6 版本开始,标头被复制到批次的每个成员中,并且在转换后也可以作为单个 `String` 参数使用。

@KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory",
        info = "info for batch")
public void listen(List<Thing> list,
        @Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
        @Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
        @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
        @Header(KafkaHeaders.OFFSET) List<Long> offsets,
        @Header(KafkaHeaders.LISTENER_INFO) String info) {
            ...
}
如果批量监听器有过滤器并且过滤器导致批次为空,则需要向 `@Header` 参数添加 `required = false`,因为信息对于空批次不可用。

如果您收到 `List<Message<Thing>>`,则信息位于每个 `Message<?>` 的 `KafkaHeaders.LISTENER_INFO` 标头中。

有关使用批次的更多信息,请参见 批量监听器

发布死信记录

当记录达到最大失败次数时,您可以使用记录恢复器配置 `DefaultErrorHandler` 和 `DefaultAfterRollbackProcessor`。框架提供了 `DeadLetterPublishingRecoverer`,它将失败的消息发布到另一个主题。恢复器需要一个 `KafkaTemplate<Object, Object>`,用于发送记录。您还可以选择使用 `BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>` 配置它,该函数用于解析目标主题和分区。

默认情况下,死信记录将发送到名为 `<originalTopic>.DLT`(原始主题名称后缀为 `.DLT`)的主题,并发送到与原始记录相同的分区。因此,当您使用默认解析器时,死信主题**必须至少与原始主题具有相同数量的分区**。

如果返回的 `TopicPartition` 具有负分区,则不会在 `ProducerRecord` 中设置分区,因此分区由 Kafka 选择。从 2.2.4 版本开始,任何 `ListenerExecutionFailedException`(例如,当在 `@KafkaListener` 方法中检测到异常时抛出)都将使用 `groupId` 属性进行增强。这允许目标解析器除了使用 `ConsumerRecord` 中的信息外,还可以使用此信息来选择死信主题。

以下示例显示了如何连接自定义目标解析器

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
        (r, e) -> {
            if (e instanceof FooException) {
                return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
            }
            else {
                return new TopicPartition(r.topic() + ".other.failures", r.partition());
            }
        });
CommonErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 2L));

发送到死信主题的记录使用以下标头进行增强

  • KafkaHeaders.DLT_EXCEPTION_FQCN:异常类名(通常是 `ListenerExecutionFailedException`,但也可能是其他异常)。

  • KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN:如果存在,则为异常原因类名(自 2.8 版本起)。

  • KafkaHeaders.DLT_EXCEPTION_STACKTRACE:异常堆栈跟踪。

  • KafkaHeaders.DLT_EXCEPTION_MESSAGE:异常消息。

  • KafkaHeaders.DLT_KEY_EXCEPTION_FQCN:异常类名(仅限键反序列化错误)。

  • KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE:异常堆栈跟踪(仅限键反序列化错误)。

  • KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE:异常消息(仅限键反序列化错误)。

  • KafkaHeaders.DLT_ORIGINAL_TOPIC:原始主题。

  • KafkaHeaders.DLT_ORIGINAL_PARTITION:原始分区。

  • KafkaHeaders.DLT_ORIGINAL_OFFSET:原始偏移量。

  • KafkaHeaders.DLT_ORIGINAL_TIMESTAMP:原始时间戳。

  • KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE:原始时间戳类型。

  • KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP:未能处理记录的原始消费者组(自 2.8 版本起)。

键异常仅由DeserializationException引起,因此不存在DLT_KEY_EXCEPTION_CAUSE_FQCN

有两种机制可以添加更多头信息。

  1. 子类化恢复器并覆盖createProducerRecord() - 调用super.createProducerRecord()并添加更多头信息。

  2. 提供一个BiFunction来接收消费者记录和异常,返回一个Headers对象;其中的头信息将被复制到最终的生产者记录;另见管理死信记录头信息。使用setHeadersFunction()设置BiFunction

第二种方法更易于实现,但第一种方法可用的信息更多,包括已组装的标准头信息。

从 2.3 版本开始,当与ErrorHandlingDeserializer结合使用时,发布者将在死信生产者记录中将记录value()恢复为未能反序列化的原始值。以前,value()为 null,用户代码必须从消息头中解码DeserializationException。此外,您可以向发布者提供多个KafkaTemplate;例如,如果您想发布来自DeserializationExceptionbyte[],以及使用与成功反序列化的记录不同的序列化器的值,则可能需要这样做。以下是如何使用使用Stringbyte[]序列化器的KafkaTemplate配置发布者的示例

@Bean
public DeadLetterPublishingRecoverer publisher(KafkaTemplate<?, ?> stringTemplate,
        KafkaTemplate<?, ?> bytesTemplate) {
    Map<Class<?>, KafkaTemplate<?, ?>> templates = new LinkedHashMap<>();
    templates.put(String.class, stringTemplate);
    templates.put(byte[].class, bytesTemplate);
    return new DeadLetterPublishingRecoverer(templates);
}

发布者使用映射键来查找适合要发布的value()的模板。建议使用LinkedHashMap,以便按顺序检查键。

发布null值时,如果有多个模板,恢复器将查找Void类的模板;如果没有,则将使用values().iterator()中的第一个模板。

从 2.7 版本开始,您可以使用setFailIfSendResultIsError方法,以便在消息发布失败时抛出异常。您还可以使用setWaitForSendResultTimeout设置发送者成功验证的超时。

如果恢复器失败(抛出异常),则失败的记录将包含在查找中。从 2.5.5 版本开始,如果恢复器失败,则默认情况下BackOff将被重置,并且重新投递将在尝试恢复之前再次经过回退。在早期版本中,BackOff不会重置,并且会在下一次失败时重新尝试恢复。要恢复到之前的行为,请将错误处理程序的resetStateOnRecoveryFailure属性设置为false

从 2.6.3 版本开始,如果异常类型在失败之间发生变化,则将 `resetStateOnExceptionChange` 设置为 `true`,重试序列将重新启动(包括选择新的 `BackOff`,如果已配置)。默认情况下,不考虑异常类型。

从 2.3 版本开始,恢复器也可以与 Kafka Streams 一起使用 - 有关详细信息,请参见从反序列化异常中恢复

ErrorHandlingDeserializer在头信息ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADERErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER中添加反序列化异常(使用 Java 序列化)。默认情况下,这些头信息不会保留在发布到死信主题的消息中。从 2.7 版本开始,如果键和值的反序列化都失败,则发送到 DLT 的记录中将填充两者的原始值。

如果传入的记录相互依赖,但可能无序到达,则将失败的记录重新发布到原始主题的尾部(进行一定次数)可能很有用,而不是直接将其发送到死信主题。有关示例,请参见此 Stack Overflow 问题

以下错误处理程序配置将执行此操作

@Bean
public ErrorHandler eh(KafkaOperations<String, String> template) {
    return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template,
            (rec, ex) -> {
                org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
                if (retries == null) {
                    retries = new RecordHeader("retries", new byte[] { 1 });
                    rec.headers().add(retries);
                }
                else {
                    retries.value()[0]++;
                }
                return retries.value()[0] > 5
                        ? new TopicPartition("topic.DLT", rec.partition())
                        : new TopicPartition("topic", rec.partition());
            }), new FixedBackOff(0L, 0L));
}

从 2.7 版本开始,恢复器检查目标解析器选择的 partition 是否实际存在。如果 partition 不存在,则ProducerRecord中的 partition 将设置为null,允许KafkaProducer选择 partition。您可以通过将verifyPartition属性设置为false来禁用此检查。

从 3.1 版本开始,将logRecoveryRecord属性设置为true将记录恢复记录和异常。

管理死信记录头信息

参考上面发布死信记录DeadLetterPublishingRecoverer有两个属性用于管理头信息,当这些头信息已存在时(例如,重新处理失败的死信记录时,包括使用非阻塞重试时)。

  • appendOriginalHeaders(默认值为true

  • stripPreviousExceptionHeaders(自 2.8 版本起默认为true

Apache Kafka 支持具有相同名称的多个头信息;要获取“最新”值,可以使用headers.lastHeader(headerName);要获取多个头信息的迭代器,可以使用headers.headers(headerName).iterator()

重复重新发布失败的记录时,这些头信息可能会增长(最终由于RecordTooLargeException导致发布失败);对于异常头信息,特别是对于堆栈跟踪头信息尤其如此。

这两个属性的原因是,虽然您可能只想保留最新的异常信息,但您可能希望保留记录在每次失败时经过的主题历史记录。

appendOriginalHeaders应用于所有名为ORIGINAL的头信息,而stripPreviousExceptionHeaders应用于所有名为EXCEPTION的头信息。

从 2.8.4 版本开始,您现在可以控制哪些标准头信息将添加到输出记录中。有关默认情况下添加的(当前)10 个标准头信息的通用名称,请参见enum HeadersToAdd(这些不是实际的头信息名称,只是一个抽象;实际的头信息名称由getHeaderNames()方法设置,子类可以覆盖此方法)。

要排除头信息,请使用excludeHeaders()方法;例如,要禁止在头信息中添加异常堆栈跟踪,请使用

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.excludeHeaders(HeaderNames.HeadersToAdd.EX_STACKTRACE);

此外,您可以通过添加ExceptionHeadersCreator完全自定义异常头信息的添加;这也将禁用所有标准异常头信息。

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> {
    kafkaHeaders.add(new RecordHeader(..., ...));
});

从 2.8.4 版本开始,您现在还可以通过addHeadersFunction方法提供多个头信息函数。这允许应用其他函数,即使另一个函数已注册,例如,当使用非阻塞重试时。

另请参见使用非阻塞重试失败头信息管理

ExponentialBackOffWithMaxRetries实现

Spring Framework 提供了许多BackOff实现。默认情况下,ExponentialBackOff将无限期重试;要在一些重试尝试后放弃,需要计算maxElapsedTime。从 2.7.3 版本开始,Spring for Apache Kafka 提供了ExponentialBackOffWithMaxRetries,这是一个接收maxRetries属性并自动计算maxElapsedTime的子类,这更方便一些。

@Bean
DefaultErrorHandler handler() {
    ExponentialBackOffWithMaxRetries bo = new ExponentialBackOffWithMaxRetries(6);
    bo.setInitialInterval(1_000L);
    bo.setMultiplier(2.0);
    bo.setMaxInterval(10_000L);
    return new DefaultErrorHandler(myRecoverer, bo);
}

这将在调用恢复器之前以 1、2、4、8、10、10 秒的时间间隔重试。