配置
从版本 2.9 开始,对于默认配置,应在使用 @Configuration
注解的类中使用 @EnableKafkaRetryTopic
注解。这使得该功能能够正确引导并允许访问注入某些功能组件以在运行时查找。
如果添加此注解,则无需也添加 @EnableKafka ,因为 @EnableKafkaRetryTopic 使用 @EnableKafka 进行了元注解。 |
此外,从该版本开始,为了更高级地配置功能组件和全局功能,应在 @Configuration
类中扩展 RetryTopicConfigurationSupport
类,并覆盖相应的方法。有关更多详细信息,请参阅 配置全局设置和功能。
默认情况下,重试主题的容器将与主容器具有相同的并发性。从版本 3.0 开始,您可以为重试容器设置不同的 并发性
(在注解中或在 RetryConfigurationBuilder
中)。
只能使用上述技术中的一种,并且只有一个 @Configuration 类可以扩展 RetryTopicConfigurationSupport 。 |
使用 @RetryableTopic
注解
要为使用 @KafkaListener
注解的方法配置重试主题和 dlt,您只需向其中添加 @RetryableTopic
注解,Spring for Apache Kafka 就会使用默认配置引导所有必要的主题和消费者。
@RetryableTopic(kafkaTemplate = "myRetryableTopicKafkaTemplate")
@KafkaListener(topics = "my-annotated-topic", groupId = "myGroupId")
public void processMessage(MyPojo message) {
// ... message processing
}
从 3.2 开始,类上的 @KafkaListener 将支持 @RetryableTopic
@RetryableTopic(listenerContainerFactory = "my-retry-topic-factory")
@KafkaListener(topics = "my-annotated-topic")
public class ClassLevelRetryListener {
@KafkaHandler
public void processMessage(MyPojo message) {
// ... message processing
}
}
您可以通过使用 @DltHandler
注解来指定类中处理 dlt 消息的方法。如果未提供 DltHandler 方法,则会创建一个默认消费者,该消费者仅记录消费情况。
@DltHandler
public void processMessage(MyPojo message) {
// ... message processing, persistence, etc
}
如果未指定 kafkaTemplate 名称,则将查找名称为 defaultRetryTopicKafkaTemplate 的 Bean。如果未找到 Bean,则会抛出异常。 |
从版本 3.0 开始,@RetryableTopic
注解可用作自定义注解的元注解;例如
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@RetryableTopic
static @interface MetaAnnotatedRetryableTopic {
@AliasFor(attribute = "concurrency", annotation = RetryableTopic.class)
String parallelism() default "3";
}
使用 RetryTopicConfiguration
Bean
您还可以通过在使用 @Configuration
注解的类中创建 RetryTopicConfiguration
Bean 来配置非阻塞重试支持。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.create(template);
}
这将为使用默认配置使用 @KafkaListener
注解的方法中的所有主题创建重试主题和 dlt 以及相应的消费者。KafkaTemplate
实例是消息转发所必需的。
为了更细粒度地控制如何处理每个主题的非阻塞重试,可以提供多个 RetryTopicConfiguration
Bean。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(3000)
.maxAttempts(5)
.concurrency(1)
.includeTopics("my-topic", "my-other-topic")
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(1000, 2, 5000)
.maxAttempts(4)
.excludeTopics("my-topic", "my-other-topic")
.retryOn(MyException.class)
.create(template);
}
重试主题和 dlt 的消费者将分配到一个消费者组,该组的组 ID 是您在 @KafkaListener 注解的 groupId 参数中提供的组 ID 与主题后缀的组合。如果未提供任何组 ID,则它们都将属于同一组,并且重试主题上的重新平衡会导致主主题上发生不必要的重新平衡。 |
如果消费者配置了 ErrorHandlingDeserializer 来处理反序列化异常,则务必将 KafkaTemplate 及其生产者与一个序列化程序配置在一起,该序列化程序可以处理普通对象以及反序列化异常产生的原始 byte[] 值。模板的泛型值类型应为 Object 。一种技术是使用 DelegatingByTypeSerializer ;以下是一个示例 |
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
MyNormalObject.class, new JsonSerializer<Object>())));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
可以对同一主题使用多个 @KafkaListener 注解,无论是否手动分配分区以及是否使用非阻塞重试,但对于给定主题,只会使用一个配置。最好为这些主题的配置使用单个 RetryTopicConfiguration Bean;如果对同一主题使用多个 @RetryableTopic 注解,则所有注解都应具有相同的值,否则其中一个注解将应用于该主题的所有监听器,而其他注解的值将被忽略。 |
配置全局设置和功能
从 2.9 开始,之前用于配置组件的 Bean 覆盖方法已被删除(未弃用,因为 API 的上述实验性质)。这不会更改 RetryTopicConfiguration
Bean 的方法 - 仅更改基础设施组件的配置。现在,应在(单个)@Configuration
类中扩展 RetryTopicConfigurationSupport
类,并覆盖适当的方法。以下是一个示例
@EnableKafka
@Configuration
public class MyRetryTopicConfiguration extends RetryTopicConfigurationSupport {
@Override
protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
blockingRetries
.retryOn(MyBlockingRetriesException.class, MyOtherBlockingRetriesException.class)
.backOff(new FixedBackOff(3000, 3));
}
@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
// Use the new 2.9 mechanism to avoid re-fetching the same records after a pause
customizersConfigurer.customizeErrorHandler(eh -> {
eh.setSeekAfterError(false);
});
}
}
使用此配置方法时,不应使用 @EnableKafkaRetryTopic 注解,以防止上下文由于重复 Bean 而无法启动。请改用简单的 @EnableKafka 注解。
|
当autoCreateTopics
为true时,主主题和重试主题将使用指定的partitions数量和replication factor创建。从3.0版本开始,默认的replication factor为-1
,表示使用broker的默认值。如果您的broker版本早于2.4,则需要设置一个显式值。要为特定主题(例如主主题或DLT)覆盖这些值,只需添加一个具有所需属性的NewTopic
@Bean
;这将覆盖自动创建属性。
默认情况下,记录使用接收到的记录的原始分区发布到重试主题。如果重试主题的分区数少于主主题,则应适当地配置框架;如下例所示。 |
@EnableKafka
@Configuration
public class Config extends RetryTopicConfigurationSupport {
@Override
protected Consumer<DeadLetterPublishingRecovererFactory> configureDeadLetterPublishingContainerFactory() {
return dlprf -> dlprf.setPartitionResolver((cr, nextTopic) -> null);
}
...
}
函数的参数是消费者记录和下一个主题的名称。您可以返回特定的分区号,或者返回null
以指示KafkaProducer
应该确定分区。
默认情况下,当记录在重试主题中转换时,会保留重试头(尝试次数、时间戳)的所有值。从2.9.6版本开始,如果您只想保留这些头的最后一个值,请使用上面显示的configureDeadLetterPublishingContainerFactory()
方法将工厂的retainAllRetryHeaderValues
属性设置为false
。
查找RetryTopicConfiguration
尝试通过以下两种方式之一提供RetryTopicConfiguration
的实例:从@RetryableTopic
注解创建实例,或者如果不存在注解,则从bean容器中获取实例。
如果在容器中找到bean,则会进行检查以确定提供的主题是否应由任何此类实例处理。
如果提供了@RetryableTopic
注解,则会查找一个DltHandler
注解的方法。
从3.2版本开始,提供新的API,在类上使用@RetryableTopic
注解时创建RetryTopicConfiguration
。
@Bean
public RetryTopicConfiguration myRetryTopic() {
RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory);
return provider.findRetryConfigurationFor(topics, null, AnnotatedClass.class, bean);
}
@RetryableTopic
public static class AnnotatedClass {
// NoOps
}