有什么新内容?
3.2 相比 3.1 的新功能
本节介绍了从 3.1 版本到 3.2 版本的更改。有关早期版本的更改,请参阅 变更历史。
Kafka 客户端版本
此版本需要 3.7.0 kafka-clients
。Kafka 客户端的 3.7.0 版本引入了新的消费者组协议。有关更多详细信息及其限制,请参阅 KIP-848。新的消费者组协议是早期访问版本,不适合在生产环境中使用。仅建议在此版本中用于测试目的。因此,Spring for Apache Kafka 仅在 kafka-client
本身提供的此类测试级别支持的范围内支持此新的消费者组协议。默认情况下,Spring for Apache Kafka 使用经典的消费者组协议,在测试新的消费者组协议时,需要通过消费者上的 group.protocol
属性选择加入。
测试支持变更
默认情况下,EmbeddedKafka
中的 kraft
模式处于禁用状态,想要使用 kraft
模式的用户必须启用它。这是因为在 kraft
模式下使用 EmbeddedKafka
时观察到某些不稳定性,尤其是在测试新的消费者组协议时。新的消费者组协议仅在 kraft
模式下受支持,因此,在测试新协议时,需要针对真实的 Kafka 集群进行测试,而不是基于 KafkaClusterTestKit
的集群,EmbeddedKafka
就是基于 KafkaClusterTestKit
的。此外,在 kraft
模式下使用 EmbeddedKafka
运行多个 KafkaListener
方法时,还观察到一些其他竞争条件。在解决这些问题之前,EmbeddedKafka
上的 kraft
默认值将保持为 false
。
Kafka Streams 交互式查询支持
一个新的 API KafkaStreamsInteractiveQuerySupport
用于访问 Kafka Streams 交互式查询中使用的可查询存储。有关更多详细信息,请参阅 Kafka Streams 交互式支持。
TransactionIdSuffixStrategy
引入了一个新的 TransactionIdSuffixStrategy
接口来管理 transactional.id
后缀。默认实现是 DefaultTransactionIdSuffixStrategy
,当设置 maxCache
大于零时,可以在特定范围内重用 transactional.id
,否则将通过递增计数器动态生成后缀。有关更多信息,请参阅 固定 TransactionIdSuffix。
异步 @KafkaListener 返回
@KafkaListener
(和 @KafkaHandler
)方法现在可以返回异步返回类型,包括 CompletableFuture<?>
、Mono<?>
和 Kotlin suspend
函数。有关更多信息,请参阅 异步返回。
基于抛出异常的自定义 DLT 路由
现在可以根据消息处理过程中抛出的异常类型将消息重定向到自定义 DLT。重定向规则通过 RetryableTopic.exceptionBasedDltRouting
或 RetryTopicConfigurationBuilder.dltRoutingRules
设置。自定义 DLT 以及其他重试和死信主题会自动创建。有关更多信息,请参阅 基于抛出异常的自定义 DLT 路由。
弃用 ContainerProperties transactionManager 属性
弃用 ContainerProperties
中的 transactionManager
属性,取而代之的是 KafkaAwareTransactionManager
,它比一般的 PlatformTransactionManager
类型更窄。请参阅 ContainerProperties 和 事务同步。
回滚处理后
提供了一个新的 AfterRollbackProcessor
API processBatch
。有关更多信息,请参阅 回滚后处理器。
更改 @RetryableTopic SameIntervalTopicReuseStrategy 默认值
将 @RetryableTopic
属性 SameIntervalTopicReuseStrategy
的默认值更改为 SINGLE_TOPIC
。请参阅 单个主题用于最大间隔指数延迟。
非阻塞重试支持类级别 @KafkaListener
非阻塞重试支持 类上的 @KafkaListener。请参阅 非阻塞重试。
支持在 RetryTopicConfigurationProvider 中处理类上的 @RetryableTopic。
提供了一个新的公共 API 来查找 RetryTopicConfiguration
。请参阅 查找 RetryTopicConfiguration
RetryTopicConfigurer 支持处理 MultiMethodKafkaListenerEndpoint。
RetryTopicConfigurer
支持处理和注册 MultiMethodKafkaListenerEndpoint
。MultiMethodKafkaListenerEndpoint
为属性 defaultMethod
和 methods
提供 getter/setter
。修改严格针对 MethodKafkaListenerEndpoint
类型的 EndpointCustomizer
。EndpointHandlerMethod
添加了新的构造函数,用于为提供的 bean 构造实例。提供新的类 EndpointHandlerMultiMethod
来处理重试端点的多方法。
新的 API 方法,用于根据用户提供的函数查找偏移量
ConsumerCallback
提供了一个新的 API,用于根据用户定义的函数(以消费者中的当前偏移量作为参数)来查找偏移量。有关更多详细信息,请参阅 查找 API 文档。
@PartitionOffset 对 SeekPosition 的支持
在 @PartitionOffset
中添加 seekPosition
属性以支持 TopicPartitionOffset.SeekPosition
。有关更多详细信息,请参阅 手动分配。
TopicPartitionOffset 中接受计算要查找的偏移量的函数的新构造函数
TopicPartitionOffset
有一个新的构造函数,它接受用户提供的函数来计算要查找的偏移量。使用此构造函数时,框架将使用当前消费者偏移量位置的输入参数调用该函数。有关更多详细信息,请参阅 查找 API 文档。
Spring Boot 应用程序名称作为默认客户端 ID 前缀
对于定义了应用程序名称的 Spring Boot 应用程序,此名称现在用作某些客户端类型的自动生成的客户端 ID 的默认前缀。有关更多详细信息,请参阅 默认客户端 ID 前缀。
增强了 MessageListenerContainer 的检索
ListenerContainerRegistry
提供了两个新的 API 来动态查找和过滤 MessageListenerContainer
实例。getListenerContainersMatching(Predicate<String> idMatcher)
用于按 ID 过滤,另一个是 getListenerContainersMatching(BiPredicate<String, MessageListenerContainer> matcher)
用于按 ID 和容器属性过滤。
有关更多信息,请参阅 @KafkaListener
生命周期管理的 API 文档。
通过提供更多跟踪标签来增强观察
KafkaTemplateObservation
提供了更多跟踪标签(低基数)。KafkaListenerObservation
提供了一个新的 API 来查找高基数键名和更多跟踪标签(高或低基数)。请参阅 Micrometer 观察