有什么新内容?

3.2 相比 3.1 的新功能

本节介绍了从 3.1 版本到 3.2 版本的更改。有关早期版本的更改,请参阅 变更历史

Kafka 客户端版本

此版本需要 3.7.0 kafka-clients。Kafka 客户端的 3.7.0 版本引入了新的消费者组协议。有关更多详细信息及其限制,请参阅 KIP-848。新的消费者组协议是早期访问版本,不适合在生产环境中使用。仅建议在此版本中用于测试目的。因此,Spring for Apache Kafka 仅在 kafka-client 本身提供的此类测试级别支持的范围内支持此新的消费者组协议。默认情况下,Spring for Apache Kafka 使用经典的消费者组协议,在测试新的消费者组协议时,需要通过消费者上的 group.protocol 属性选择加入。

测试支持变更

默认情况下,EmbeddedKafka 中的 kraft 模式处于禁用状态,想要使用 kraft 模式的用户必须启用它。这是因为在 kraft 模式下使用 EmbeddedKafka 时观察到某些不稳定性,尤其是在测试新的消费者组协议时。新的消费者组协议仅在 kraft 模式下受支持,因此,在测试新协议时,需要针对真实的 Kafka 集群进行测试,而不是基于 KafkaClusterTestKit 的集群,EmbeddedKafka 就是基于 KafkaClusterTestKit 的。此外,在 kraft 模式下使用 EmbeddedKafka 运行多个 KafkaListener 方法时,还观察到一些其他竞争条件。在解决这些问题之前,EmbeddedKafka 上的 kraft 默认值将保持为 false

Kafka Streams 交互式查询支持

一个新的 API KafkaStreamsInteractiveQuerySupport 用于访问 Kafka Streams 交互式查询中使用的可查询存储。有关更多详细信息,请参阅 Kafka Streams 交互式支持

TransactionIdSuffixStrategy

引入了一个新的 TransactionIdSuffixStrategy 接口来管理 transactional.id 后缀。默认实现是 DefaultTransactionIdSuffixStrategy,当设置 maxCache 大于零时,可以在特定范围内重用 transactional.id,否则将通过递增计数器动态生成后缀。有关更多信息,请参阅 固定 TransactionIdSuffix

异步 @KafkaListener 返回

@KafkaListener(和 @KafkaHandler)方法现在可以返回异步返回类型,包括 CompletableFuture<?>Mono<?> 和 Kotlin suspend 函数。有关更多信息,请参阅 异步返回

基于抛出异常的自定义 DLT 路由

现在可以根据消息处理过程中抛出的异常类型将消息重定向到自定义 DLT。重定向规则通过 RetryableTopic.exceptionBasedDltRoutingRetryTopicConfigurationBuilder.dltRoutingRules 设置。自定义 DLT 以及其他重试和死信主题会自动创建。有关更多信息,请参阅 基于抛出异常的自定义 DLT 路由

弃用 ContainerProperties transactionManager 属性

弃用 ContainerProperties 中的 transactionManager 属性,取而代之的是 KafkaAwareTransactionManager,它比一般的 PlatformTransactionManager 类型更窄。请参阅 ContainerProperties事务同步

回滚处理后

提供了一个新的 AfterRollbackProcessor API processBatch。有关更多信息,请参阅 回滚后处理器

更改 @RetryableTopic SameIntervalTopicReuseStrategy 默认值

@RetryableTopic 属性 SameIntervalTopicReuseStrategy 的默认值更改为 SINGLE_TOPIC。请参阅 单个主题用于最大间隔指数延迟

非阻塞重试支持类级别 @KafkaListener

非阻塞重试支持 类上的 @KafkaListener。请参阅 非阻塞重试

支持在 RetryTopicConfigurationProvider 中处理类上的 @RetryableTopic。

提供了一个新的公共 API 来查找 RetryTopicConfiguration。请参阅 查找 RetryTopicConfiguration

RetryTopicConfigurer 支持处理 MultiMethodKafkaListenerEndpoint。

RetryTopicConfigurer 支持处理和注册 MultiMethodKafkaListenerEndpointMultiMethodKafkaListenerEndpoint 为属性 defaultMethodmethods 提供 getter/setter。修改严格针对 MethodKafkaListenerEndpoint 类型的 EndpointCustomizerEndpointHandlerMethod 添加了新的构造函数,用于为提供的 bean 构造实例。提供新的类 EndpointHandlerMultiMethod 来处理重试端点的多方法。

新的 API 方法,用于根据用户提供的函数查找偏移量

ConsumerCallback 提供了一个新的 API,用于根据用户定义的函数(以消费者中的当前偏移量作为参数)来查找偏移量。有关更多详细信息,请参阅 查找 API 文档

@PartitionOffset 对 SeekPosition 的支持

@PartitionOffset 中添加 seekPosition 属性以支持 TopicPartitionOffset.SeekPosition。有关更多详细信息,请参阅 手动分配

TopicPartitionOffset 中接受计算要查找的偏移量的函数的新构造函数

TopicPartitionOffset 有一个新的构造函数,它接受用户提供的函数来计算要查找的偏移量。使用此构造函数时,框架将使用当前消费者偏移量位置的输入参数调用该函数。有关更多详细信息,请参阅 查找 API 文档

Spring Boot 应用程序名称作为默认客户端 ID 前缀

对于定义了应用程序名称的 Spring Boot 应用程序,此名称现在用作某些客户端类型的自动生成的客户端 ID 的默认前缀。有关更多详细信息,请参阅 默认客户端 ID 前缀

增强了 MessageListenerContainer 的检索

ListenerContainerRegistry 提供了两个新的 API 来动态查找和过滤 MessageListenerContainer 实例。getListenerContainersMatching(Predicate<String> idMatcher) 用于按 ID 过滤,另一个是 getListenerContainersMatching(BiPredicate<String, MessageListenerContainer> matcher) 用于按 ID 和容器属性过滤。

有关更多信息,请参阅 @KafkaListener 生命周期管理的 API 文档

通过提供更多跟踪标签来增强观察

KafkaTemplateObservation 提供了更多跟踪标签(低基数)。KafkaListenerObservation 提供了一个新的 API 来查找高基数键名和更多跟踪标签(高或低基数)。请参阅 Micrometer 观察