功能
大多数功能都适用于@RetryableTopic
注解和RetryTopicConfiguration
bean。
BackOff 配置
BackOff 配置依赖于Spring Retry
项目中的BackOffPolicy
接口。
它包括
-
固定 Back Off
-
指数 Back Off
-
随机指数 Back Off
-
均匀随机 Back Off
-
无 Back Off
-
自定义 Back Off
@RetryableTopic(attempts = 5,
backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 5000))
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(3_000)
.maxAttempts(4)
.create(template);
}
您也可以提供 Spring Retry 的SleepingBackOffPolicy
接口的自定义实现
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.customBackoff(new MyCustomBackOffPolicy())
.maxAttempts(5)
.create(template);
}
默认的 Back Off 策略是FixedBackOffPolicy ,最多尝试 3 次,间隔 1000 毫秒。
|
ExponentialBackOffPolicy 的默认最大延迟为 30 秒。如果您的 Back Off 策略需要比这更大的延迟值,请相应地调整maxDelay 属性。
|
第一次尝试计入maxAttempts ,因此如果您提供maxAttempts 值为 4,则将有原始尝试加上 3 次重试。
|
全局超时
您可以设置重试过程的全局超时。如果达到该时间,消费者下次抛出异常时,消息将直接发送到 DLT,或者如果 DLT 不可用,则直接结束处理。
@RetryableTopic(backoff = @Backoff(2_000), timeout = 5_000)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(2_000)
.timeoutAfter(5_000)
.create(template);
}
默认情况下没有设置超时,也可以通过提供 -1 作为超时值来实现。 |
异常分类器
您可以指定要重试哪些异常,以及哪些异常不要重试。您还可以将其设置为遍历原因以查找嵌套异常。
@RetryableTopic(include = {MyRetryException.class, MyOtherRetryException.class}, traversingCauses = true)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
throw new RuntimeException(new MyRetryException()); // will retry
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.notRetryOn(MyDontRetryException.class)
.create(template);
}
默认行为是对所有异常进行重试,并且不遍历原因。 |
从 2.8.3 版本开始,存在一个全局的致命异常列表,这些异常会导致记录被发送到 DLT,且不会进行任何重试。有关致命异常的默认列表,请参阅 DefaultErrorHandler。您可以通过在扩展 RetryTopicConfigurationSupport
的 @Configuration
类中覆盖 configureNonBlockingRetries
方法,向此列表添加或删除异常。有关更多信息,请参阅 配置全局设置和功能。
@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
要禁用致命异常的分类,只需清除提供的列表。 |
包含和排除主题
您可以通过 .includeTopic(String topic)、.includeTopics(Collection<String> topics) .excludeTopic(String topic) 和 .excludeTopics(Collection<String> topics) 方法来决定哪些主题将由 RetryTopicConfiguration
bean 处理,哪些主题将不会被处理。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.includeTopics(List.of("my-included-topic", "my-other-included-topic"))
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.excludeTopic("my-excluded-topic")
.create(template);
}
默认行为是包含所有主题。 |
主题自动创建
除非另有说明,否则框架将使用 KafkaAdmin
bean 使用的 NewTopic
bean 自动创建所需的主题。您可以指定创建主题的分区数和副本因子,也可以关闭此功能。从 3.0 版本开始,默认副本因子为 -1
,表示使用代理默认值。如果您的代理版本早于 2.4,则需要设置显式值。
请注意,如果您没有使用 Spring Boot,则需要提供一个 KafkaAdmin bean 才能使用此功能。 |
@RetryableTopic(numPartitions = 2, replicationFactor = 3)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@RetryableTopic(autoCreateTopics = false)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.autoCreateTopicsWith(2, 3)
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.doNotAutoCreateRetryTopics()
.create(template);
}
默认情况下,主题使用一个分区和 -1 的副本因子(表示使用代理默认值)自动创建。如果您的代理版本早于 2.4,则需要设置显式值。 |
失败头管理
在考虑如何管理失败头(原始头和异常头)时,框架会委托给 DeadLetterPublishingRecoverer
来决定是追加还是替换头。
默认情况下,它明确将 appendOriginalHeaders
设置为 false
,并将 stripPreviousExceptionHeaders
留给 DeadLetterPublishingRecover
使用的默认值。
这意味着使用默认配置仅保留第一个“原始”头和最后一个异常头。这是为了避免在涉及许多重试步骤时创建过大的消息(例如,由于堆栈跟踪头)。
有关更多信息,请参阅 管理死信记录头。
要重新配置框架以使用这些属性的不同设置,请通过覆盖RetryTopicConfigurationSupport
扩展的@Configuration
类中的configureCustomizers
方法来配置DeadLetterPublishingRecovererer
自定义程序。有关更多详细信息,请参见配置全局设置和功能。
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
customizersConfigurer.customizeDeadLetterPublishingRecoverer(dlpr -> {
dlpr.setAppendOriginalHeaders(true);
dlpr.setStripPreviousExceptionHeaders(false);
});
}
从版本 2.8.4 开始,如果您希望添加自定义标头(除了工厂添加的重试信息标头之外,您可以向工厂添加一个headersFunction
- factory.setHeadersFunction((rec, ex) -> { ... })
。
默认情况下,添加的任何标头都是累积的 - Kafka 标头可以包含多个值。从版本 2.9.5 开始,如果函数返回的Headers
包含类型为DeadLetterPublishingRecoverer.SingleRecordHeader
的标头,则该标头的任何现有值都将被删除,并且只有新的单个值将保留。
自定义 DeadLetterPublishingRecoverer
如故障标头管理中所示,可以自定义框架创建的默认DeadLetterPublishingRecoverer
实例。但是,对于某些用例,有必要对DeadLetterPublishingRecoverer
进行子类化,例如,要覆盖createProducerRecord()
以修改发送到重试(或死信)主题的内容。从版本 3.0.9 开始,您可以覆盖RetryConfigurationSupport.configureDeadLetterPublishingContainerFactory()
方法以提供DeadLetterPublisherCreator
实例,例如
@Override
protected Consumer<DeadLetterPublishingRecovererFactory>
configureDeadLetterPublishingContainerFactory() {
return (factory) -> factory.setDeadLetterPublisherCreator(
(templateResolver, destinationResolver) ->
new CustomDLPR(templateResolver, destinationResolver));
}
建议您在构建自定义实例时使用提供的解析器。
基于异常将消息路由到自定义 DLT
从版本 3.2.0 开始,可以根据在处理消息期间抛出的异常类型将消息路由到自定义 DLT。为此,需要指定路由。路由自定义包括指定附加目的地。目的地反过来包含两个设置:suffix
和exceptions
。当exceptions
中指定的异常类型被抛出时,包含suffix
的 DLT 将被视为消息的目标主题,然后再考虑通用 DLT。使用注释或RetryTopicConfiguration
bean 的配置示例
@RetryableTopic(exceptionBasedDltRouting = {
@ExceptionBasedDltDestination(
suffix = "-deserialization", exceptions = {DeserializationException.class}
)}
)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.dltRoutingRules(Map.of("-deserialization", Set.of(DeserializationException.class)))
.create(kafkaOperations)
.create(template);
}
suffix
在自定义 DLT 名称中的通用dltTopicSuffix
之前。考虑到提供的示例,导致DeserializationException
的消息将被路由到my-annotated-topic-deserialization-dlt
,而不是my-annotated-topic-dlt
。自定义 DLT 将按照主题自动创建中说明的相同规则创建。