DLT 策略

该框架提供了一些用于处理 DLT 的策略。您可以提供 DLT 处理方法、使用默认的日志记录方法,或者根本不使用 DLT。此外,您还可以选择 DLT 处理失败时的行为。

DLT 处理方法

您可以指定用于处理主题 DLT 的方法,以及该处理失败时的行为。

为此,您可以在带有 @RetryableTopic 注解的类的某个方法中使用 @DltHandler 注解。请注意,该方法将用于该类中所有带有 @RetryableTopic 注解的方法。

@RetryableTopic
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}

@DltHandler
public void processDltMessage(MyPojo message) {
    // ... message processing, persistence, etc
}

DLT 处理器方法也可以通过 RetryTopicConfigurationBuilder.dltHandlerMethod(String, String) 方法提供,将应处理 DLT 消息的 bean 名称和方法名称作为参数传入。

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
            .create(template);
}

@Component
public class MyCustomDltProcessor {

    private final MyDependency myDependency;

    public MyCustomDltProcessor(MyDependency myDependency) {
        this.myDependency = myDependency;
    }

    public void processDltMessage(MyPojo message) {
        // ... message processing, persistence, etc
    }
}
如果没有提供 DLT 处理器,则使用默认的 RetryTopicConfigurer.LoggingDltListenerHandlerMethod

从版本 2.8 开始,如果您根本不希望此应用程序从 DLT 消费,包括通过默认处理器(或者您希望延迟消费),您可以控制 DLT 容器是否启动,这独立于容器工厂的 autoStartup 属性。

使用 @RetryableTopic 注解时,将 autoStartDltHandler 属性设置为 false;使用配置构建器时,使用 autoStartDltHandler(false)

稍后,您可以通过 KafkaListenerEndpointRegistry 启动 DLT 处理器。

DLT 失败行为

如果 DLT 处理失败,有两种可能的行为:ALWAYS_RETRY_ON_ERRORFAIL_ON_ERROR

前者将记录转发回 DLT 主题,这样它就不会阻塞其他 DLT 记录的处理。后者在不转发消息的情况下结束执行。

@RetryableTopic(dltProcessingFailureStrategy =
            DltStrategy.FAIL_ON_ERROR)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
            .doNotRetryOnDltFailure()
            .create(template);
}
默认行为是 ALWAYS_RETRY_ON_ERROR
从版本 2.8.3 开始,如果记录导致抛出致命异常,例如 DeserializationExceptionALWAYS_RETRY_ON_ERROR 将不会将记录路由回 DLT,因为通常此类异常总是会被抛出。

被认为是致命的异常有:

  • DeserializationException(反序列化异常)

  • MessageConversionException(消息转换异常)

  • ConversionException(转换异常)

  • MethodArgumentResolutionException(方法参数解析异常)

  • NoSuchMethodException(方法未找到异常)

  • ClassCastException(类转换异常)

您可以使用 DestinationTopicResolver bean 上的方法向此列表添加或从中删除异常。

有关更多信息,请参阅异常分类器

配置不使用 DLT

该框架还提供了不为主题配置 DLT 的可能性。在这种情况下,在重试耗尽后,处理简单地结束。

@RetryableTopic(dltProcessingFailureStrategy =
            DltStrategy.NO_DLT)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .doNotConfigureDlt()
            .create(template);
}
© . This site is unofficial and not affiliated with VMware.