功能

大多数功能都同时适用于 @RetryableTopic 注解和 RetryTopicConfiguration bean。

回退配置

回退配置依赖于 Spring Retry 项目中的 BackOffPolicy 接口。

它包括

  • 固定回退

  • 指数回退

  • 随机指数回退

  • 均匀随机回退

  • 无回退

  • 自定义回退

@RetryableTopic(attempts = 5,
    backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 5000))
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackOff(3_000)
            .maxAttempts(4)
            .create(template);
}

您还可以提供 Spring Retry 的 SleepingBackOffPolicy 接口的自定义实现

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .customBackoff(new MyCustomBackOffPolicy())
            .maxAttempts(5)
            .create(template);
}
默认回退策略是 FixedBackOffPolicy,最多尝试 3 次,间隔 1000 毫秒。
ExponentialBackOffPolicy 的默认最大延迟为 30 秒。如果您的回退策略需要大于此值的延迟,请相应调整 maxDelay 属性。
第一次尝试计入 maxAttempts,因此,如果您提供的 maxAttempts 值为 4,则将有原始尝试加上 3 次重试。

全局超时

您可以为重试过程设置全局超时。如果达到该时间,则消费者下次抛出异常时,消息将直接发送到 DLT,或者如果没有可用的 DLT,则结束处理。

@RetryableTopic(backoff = @Backoff(2_000), timeout = 5_000)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackOff(2_000)
            .timeoutAfter(5_000)
            .create(template);
}
默认情况下,没有设置超时,这也可以通过提供 -1 作为超时值来实现。

异常分类器

您可以指定要重试哪些异常以及哪些异常不要重试。您还可以将其设置为遍历原因以查找嵌套异常。

@RetryableTopic(include = {MyRetryException.class, MyOtherRetryException.class}, traversingCauses = true)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    throw new RuntimeException(new MyRetryException()); // will retry
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .notRetryOn(MyDontRetryException.class)
            .create(template);
}
默认行为是重试所有异常并且不遍历原因。

从 2.8.3 开始,有一个全局致命异常列表,这些异常会导致记录在没有任何重试的情况下发送到 DLT。有关致命异常的默认列表,请参阅 DefaultErrorHandler。您可以通过覆盖扩展 RetryTopicConfigurationSupport@Configuration 类中的 configureNonBlockingRetries 方法,向此列表添加或删除异常。有关更多信息,请参阅 配置全局设置和功能

@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
    nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
要禁用致命异常的分类,只需清除提供的列表即可。

包含和排除主题

您可以通过 .includeTopic(String topic)、.includeTopics(Collection<String> topics) .excludeTopic(String topic) 和 .excludeTopics(Collection<String> topics) 方法来决定哪些主题将由 RetryTopicConfiguration bean 处理,哪些主题将不会处理。

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .includeTopics(List.of("my-included-topic", "my-other-included-topic"))
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .excludeTopic("my-excluded-topic")
            .create(template);
}
默认行为是包含所有主题。

主题自动创建

除非另有说明,否则框架将使用 KafkaAdmin bean 使用的 NewTopic bean 自动创建所需的主题。您可以指定创建主题的分区数和副本因子,并且可以关闭此功能。从 3.0 版开始,默认副本因子为 -1,表示使用代理默认值。如果您的代理版本早于 2.4,则需要设置显式值。

请注意,如果您没有使用 Spring Boot,则需要提供 KafkaAdmin bean 才能使用此功能。
@RetryableTopic(numPartitions = 2, replicationFactor = 3)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}

@RetryableTopic(autoCreateTopics = false)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .autoCreateTopicsWith(2, 3)
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .doNotAutoCreateRetryTopics()
            .create(template);
}
默认情况下,主题自动创建为一个分区和 -1 的副本因子(表示使用代理默认值)。如果您的代理版本早于 2.4,则需要设置显式值。

故障头管理

在考虑如何管理故障头(原始头和异常头)时,框架会委托给 DeadLetterPublishingRecoverer 来决定是追加还是替换头。

默认情况下,它将 appendOriginalHeaders 显式设置为 false,并将 stripPreviousExceptionHeaders 保留为 DeadLetterPublishingRecover 使用的默认值。

这意味着使用默认配置仅保留第一个“原始”和最后一个异常头。这是为了避免在涉及许多重试步骤时创建过大的消息(例如,由于堆栈跟踪头)。

有关更多信息,请参阅 管理死信记录头

要将框架重新配置为对这些属性使用不同的设置,请通过覆盖扩展 RetryTopicConfigurationSupport@Configuration 类中的 configureCustomizers 方法来配置 DeadLetterPublishingRecovererer 自定义程序。有关更多详细信息,请参阅 配置全局设置和功能

@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
    customizersConfigurer.customizeDeadLetterPublishingRecoverer(dlpr -> {
        dlpr.setAppendOriginalHeaders(true);
        dlpr.setStripPreviousExceptionHeaders(false);
    });
}

从 2.8.4 版开始,如果您希望添加自定义头(除了工厂添加的重试信息头之外,您可以向工厂添加 headersFunction - factory.setHeadersFunction((rec, ex) -> { ... })

默认情况下,添加的任何头都是累积的 - Kafka 头可以包含多个值。从 2.9.5 版开始,如果函数返回的 Headers 包含类型为 DeadLetterPublishingRecoverer.SingleRecordHeader 的头,则该头的任何现有值都将被删除,并且只有新的单个值将保留。

自定义 DeadLetterPublishingRecoverer

失败消息头管理 中所示,可以自定义框架创建的默认 DeadLetterPublishingRecoverer 实例。但是,对于某些用例,需要子类化 DeadLetterPublishingRecoverer,例如覆盖 createProducerRecord() 以修改发送到重试(或死信)主题的内容。从 3.0.9 版本开始,您可以覆盖 RetryConfigurationSupport.configureDeadLetterPublishingContainerFactory() 方法以提供 DeadLetterPublisherCreator 实例,例如

@Override
protected Consumer<DeadLetterPublishingRecovererFactory>
        configureDeadLetterPublishingContainerFactory() {

    return (factory) -> factory.setDeadLetterPublisherCreator(
            (templateResolver, destinationResolver) ->
                    new CustomDLPR(templateResolver, destinationResolver));
}

建议您在构建自定义实例时使用提供的解析器。

基于抛出异常的消息到自定义 DLT 的路由

从 3.2.0 版本开始,可以根据在消息处理过程中抛出的异常类型将消息路由到自定义 DLT。为此,需要指定路由规则。路由自定义包括指定额外目标。目标又包含两个设置:suffixexceptions。当抛出 exceptions 中指定的异常类型时,包含 suffix 的 DLT 将被视为消息的目标主题,然后再考虑通用 DLT。使用注解或 RetryTopicConfiguration bean 的配置示例

@RetryableTopic(exceptionBasedDltRouting = {
    @ExceptionBasedDltDestination(
        suffix = "-deserialization", exceptions = {DeserializationException.class}
    )}
)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .dltRoutingRules(Map.of("-deserialization", Set.of(DeserializationException.class)))
            .create(kafkaOperations)
            .create(template);
}

suffix 在自定义 DLT 名称中位于通用 dltTopicSuffix 之前。考虑到提供的示例,导致 DeserializationException 的消息将路由到 my-annotated-topic-deserialization-dlt 而不是 my-annotated-topic-dlt。自定义 DLT 将遵循 主题自动创建 中所述的相同规则创建。