新增功能?
3.2 相比 3.1 的新增功能
本节介绍从 3.1 版本到 3.2 版本所做的更改。有关早期版本的更改,请参阅 更改历史记录。
Kafka 客户端版本
此版本需要 3.7.0 kafka-clients
。Kafka 客户端的 3.7.0 版本引入了新的消费者组协议。有关更多详细信息及其限制,请参阅 KIP-848。新的消费者组协议是早期访问版本,不适合在生产环境中使用。仅建议在此版本中用于测试目的。因此,适用于 Apache Kafka 的 Spring 仅在 kafka-client
本身提供的此类测试级别支持的范围内支持此新的消费者组协议。默认情况下,适用于 Apache Kafka 的 Spring 使用经典的消费者组协议,在测试新的消费者组协议时,需要通过消费者的 group.protocol
属性选择加入。
测试支持更改
kraft
模式在 EmbeddedKafka
中默认情况下处于禁用状态,并且希望使用 kraft
模式的用户必须启用它。这是由于在 kraft
模式下使用 EmbeddedKafka
时观察到某些不稳定性,尤其是在测试新的消费者组协议时。新的消费者组协议仅在 kraft
模式下受支持,因此,在测试新协议时,需要针对真实的 Kafka 集群进行测试,而不是基于 KafkaClusterTestKit
的集群,而 EmbeddedKafka
基于 KafkaClusterTestKit
。此外,在 kraft
模式下使用 EmbeddedKafka
运行多个 KafkaListener
方法时,观察到一些其他竞争条件。在解决这些问题之前,EmbeddedKafka
上的 kraft
默认值将保持为 false
。
Kafka Streams 交互式查询支持
一个新的 API KafkaStreamsInteractiveQuerySupport
用于访问 Kafka Streams 交互式查询中使用的可查询存储。有关更多详细信息,请参阅 Kafka Streams 交互式支持。
TransactionIdSuffixStrategy
引入了一个新的 TransactionIdSuffixStrategy
接口来管理 transactional.id
后缀。当设置 maxCache
大于零时,默认实现是 DefaultTransactionIdSuffixStrategy
可以重用特定范围内的 transactional.id
,否则将通过递增计数器动态生成后缀。有关更多信息,请参阅 固定 TransactionIdSuffix。
异步 @KafkaListener 返回
@KafkaListener
(和 @KafkaHandler
)方法现在可以返回异步返回类型,包括 CompletableFuture<?>
、Mono<?>
和 Kotlin suspend
函数。有关更多信息,请参阅 异步返回。
基于抛出的异常将消息路由到自定义 DLT
现在可以根据在消息处理期间抛出的异常类型将消息重定向到自定义 DLT。重定向规则通过 RetryableTopic.exceptionBasedDltRouting
或 RetryTopicConfigurationBuilder.dltRoutingRules
设置。自定义 DLT 以及其他重试和死信主题将自动创建。有关更多信息,请参阅 基于抛出的异常将消息路由到自定义 DLT。
弃用 ContainerProperties 的 transactionManager 属性
弃用 ContainerProperties
中的 transactionManager
属性,转而使用 KafkaAwareTransactionManager
,与通用的 PlatformTransactionManager
相比,这是一个更具体的类型。请参阅 ContainerProperties 和 事务同步。
回滚后处理
提供了一个新的 AfterRollbackProcessor
API processBatch
。有关更多信息,请参阅 回滚后处理器。
更改 @RetryableTopic SameIntervalTopicReuseStrategy 的默认值
将 @RetryableTopic
属性 SameIntervalTopicReuseStrategy
的默认值更改为 SINGLE_TOPIC
。请参阅 单个主题用于最大间隔指数延迟。
非阻塞重试支持类级别 @KafkaListener
非阻塞重试支持 类级别的 @KafkaListener。请参阅 非阻塞重试。
在 RetryTopicConfigurationProvider 中支持处理类级别的 @RetryableTopic。
提供了一个新的公共 API 来查找 RetryTopicConfiguration
。请参阅 查找 RetryTopicConfiguration
RetryTopicConfigurer 支持处理 MultiMethodKafkaListenerEndpoint。
RetryTopicConfigurer
支持处理和注册 MultiMethodKafkaListenerEndpoint
。MultiMethodKafkaListenerEndpoint
提供了 getter/setter
用于属性 defaultMethod
和 methods
。修改了专门用于 MethodKafkaListenerEndpoint
类型的 EndpointCustomizer
。EndpointHandlerMethod
添加了新的构造函数来构建提供的 bean 的实例。提供了一个新的类 EndpointHandlerMultiMethod
来处理重试端点的多方法。
新的 API 方法,用于根据用户提供的函数跳转到某个偏移量
ConsumerCallback
提供了一个新的 API,用于根据用户定义的函数跳转到某个偏移量,该函数以消费者中的当前偏移量作为参数。有关更多详细信息,请参阅 Seek API 文档。
@PartitionOffset 支持 SeekPosition
向 @PartitionOffset
添加 seekPosition
属性,以支持 TopicPartitionOffset.SeekPosition
。有关更多详细信息,请参阅 手动分配。
TopicPartitionOffset 中接受用于计算要跳转到的偏移量的函数的新构造函数
TopicPartitionOffset
有一个新的构造函数,它接受用户提供的函数来计算要跳转到的偏移量。使用此构造函数时,框架将使用当前消费者偏移量位置作为输入参数来调用该函数。有关更多详细信息,请参阅 Seek API 文档。
将 Spring Boot 应用程序名称用作默认客户端 ID 前缀
对于定义了应用程序名称的 Spring Boot 应用程序,此名称现在用作某些客户端类型的自动生成客户端 ID 的默认前缀。有关更多详细信息,请参阅 默认客户端 ID 前缀。
增强的 MessageListenerContainer 获取
ListenerContainerRegistry
提供了两个新的 API 来动态查找和过滤 MessageListenerContainer
实例。getListenerContainersMatching(Predicate<String> idMatcher)
用于按 ID 过滤,另一个是 getListenerContainersMatching(BiPredicate<String, MessageListenerContainer> matcher)
用于按 ID 和容器属性过滤。
有关更多信息,请参阅 @KafkaListener
生命周期管理的 API 文档。
通过提供更多跟踪标签来增强观测
KafkaTemplateObservation
提供了更多跟踪标签(低基数)。KafkaListenerObservation
提供了一个新的 API 来查找高基数键名和更多跟踪标签(高基数或低基数)。请参阅 Micrometer 观测