变更历史
3.3 版本相比 3.2 版本的新功能
本节涵盖从 3.2 版到 3.3 版的更改。有关早期版本的更改,请参阅变更历史。
DLT 主题命名约定
DLT 主题的命名约定已标准化,统一使用“-dlt”后缀。此更改可确保兼容性,并避免在不同重试解决方案之间切换时发生冲突。希望保留“.DLT”后缀行为的用户需要通过设置相应的 DLT 名称属性明确选择加入。
增强的消费者组查找操作
ConsumerSeekCallback 接口中添加了一个新方法 getGroupId()。此方法允许通过仅定位所需的消费者组来进行更具选择性的查找操作。AbstractConsumerSeekAware 现在还可以在多组监听器场景中注册、检索和删除每个主题分区的回调,而不会遗漏任何回调。有关更多详细信息,请参阅新的 API (getSeekCallbacksFor(TopicPartition topicPartition)、getTopicsAndCallbacks())。有关更多详细信息,请参阅查找 API 文档。
使用 RecordFilterStrategy 配置 Kafka 监听器中空批次的处理
RecordFilterStrategy 现在支持忽略过滤产生的空批次。这可以通过重写默认方法 ignoreEmptyBatch() 进行配置,该方法默认为 false,确保即使所有 ConsumerRecords 都被过滤掉,KafkaListener 也会被调用。有关更多详细信息,请参阅消息接收过滤文档。
ConcurrentContainerStoppedEvent
当所有子容器都停止时,ConcurentContainerMessageListenerContainer 现在会发出 ConcurrentContainerStoppedEvent。有关更多详细信息,请参阅应用程序事件和 ConcurrentContainerStoppedEvent Javadoc。
回复中的原始记录键
使用 ReplyingKafkaTemplate 时,如果请求中的原始记录包含键,则该键也将成为回复的一部分。有关更多详细信息,请参阅参考文档的发送消息部分。
自定义 DeadLetterPublishingRecovererFactory 中的日志记录
使用 DeadLetterPublishingRecovererFactory 时,用户应用程序可以重写 maybeLogListenerException 方法来自定义日志记录行为。
自定义 Kafka Streams 的实现
使用 KafkaStreamsCustomizer 时,现在可以通过重写 initKafkaStreams 方法返回 KafkaStreams 对象的自定义实现。
批处理监听器的 KafkaHeaders.DELIVERY_ATTEMPT
使用 BatchListener 时,ConsumerRecord 的头字段中可以包含 KafkaHeaders.DELIVERY_ATTMPT 头。如果 DeliveryAttemptAwareRetryListener 设置为错误处理程序作为重试监听器,则每个 ConsumerRecord 都具有传递尝试头。有关更多详细信息,请参阅批处理监听器的 Kafka 头。
Kafka Metrics Listeners 和 TaskScheduler
MicrometerProducerListener、MicrometerConsumerListener 和 KafkaStreamsMicrometerListener 现在可以使用 TaskScheduler 进行配置。有关更多信息,请参阅 KafkaMetricsSupport Javadoc 和Micrometer 支持。
3.2 版本相比 3.1 版本的新功能
本节涵盖从 3.1 版到 3.2 版的更改。有关早期版本的更改,请参阅变更历史。
Kafka 客户端版本
此版本需要 3.7.0 的 kafka-clients。Kafka 客户端的 3.7.0 版本引入了新的消费者组协议。有关更多详细信息及其限制,请参阅KIP-848。新的消费者组协议是一个早期访问版本,不适用于生产环境。在此版本中,建议仅用于测试目的。因此,Spring for Apache Kafka 仅在 kafka-client 本身提供的测试级别支持范围内支持此新的消费者组协议。默认情况下,Spring for Apache Kafka 使用经典的消费者组协议,在测试新的消费者组协议时,需要通过消费者上的 group.protocol 属性选择加入。
测试支持更改
EmbeddedKafka 中的 kraft 模式默认禁用,希望使用 kraft 模式的用户必须启用它。这是由于在使用 EmbeddedKafka 的 kraft 模式时观察到某些不稳定情况,尤其是在测试新的消费者组协议时。新的消费者组协议仅在 kraft 模式下受支持,因此,在测试新协议时,需要针对真实的 Kafka 集群而不是基于 KafkaClusterTestKit 的集群进行测试,EmbeddedKafka 就是基于此的。此外,在使用 EmbeddedKafka 的 kraft 模式运行多个 KafkaListener 方法时,还观察到一些其他竞争条件。在这些问题解决之前,EmbeddedKafka 上的 kraft 默认值将保持为 false。
Kafka Streams 交互式查询支持
一个新的 API KafkaStreamsInteractiveQuerySupport 用于访问 Kafka Streams 交互式查询中使用的可查询存储。有关更多详细信息,请参阅Kafka Streams 交互式支持。
TransactionIdSuffixStrategy
引入了一个新的 TransactionIdSuffixStrategy 接口来管理 transactional.id 后缀。默认实现是 DefaultTransactionIdSuffixStrategy,当设置 maxCache 大于零时,可以在特定范围内重用 transactional.id,否则后缀将通过递增计数器动态生成。有关更多信息,请参阅固定 TransactionIdSuffix。
异步 @KafkaListener 返回
@KafkaListener(和 @KafkaHandler)方法现在可以返回异步返回类型,包括 CompletableFuture<?>、Mono<?> 和 Kotlin suspend 函数。有关更多信息,请参阅异步返回。
根据抛出的异常将消息路由到自定义 DLT
现在可以根据消息处理过程中抛出的异常类型将消息重定向到自定义 DLT。重定向规则可以通过 RetryableTopic.exceptionBasedDltRouting 或 RetryTopicConfigurationBuilder.dltRoutingRules 设置。自定义 DLT 以及其他重试和死信主题会自动创建。有关更多信息,请参阅根据抛出的异常将消息路由到自定义 DLT。
弃用 ContainerProperties transactionManager 属性
弃用 ContainerProperties 中的 transactionManager 属性,转而使用 KafkaAwareTransactionManager,后者是相对于通用 PlatformTransactionManager 的更窄类型。请参阅ContainerProperties 和事务同步。
回滚后处理
提供了新的 AfterRollbackProcessor API processBatch。有关更多信息,请参阅回滚后处理器。
更改 @RetryableTopic SameIntervalTopicReuseStrategy 默认值
将 @RetryableTopic 属性 SameIntervalTopicReuseStrategy 默认值更改为 SINGLE_TOPIC。请参阅最大间隔指数延迟的单个主题。
非阻塞重试支持类级别 @KafkaListener
非阻塞重试支持类上的 @KafkaListener。请参阅非阻塞重试。
在 RetryTopicConfigurationProvider 中支持处理类上的 @RetryableTopic。
提供一个新的公共 API 来查找 RetryTopicConfiguration。请参阅查找 RetryTopicConfiguration
RetryTopicConfigurer 支持处理 MultiMethodKafkaListenerEndpoint。
RetryTopicConfigurer 支持处理和注册 MultiMethodKafkaListenerEndpoint。MultiMethodKafkaListenerEndpoint 为属性 defaultMethod 和 methods 提供 getter/setter。修改严格用于 MethodKafkaListenerEndpoint 类型的 EndpointCustomizer。EndpointHandlerMethod 添加了新的构造函数,用于为提供的 bean 构造实例。提供新的类 EndpointHandlerMultiMethod 来处理重试端点的多方法。
根据用户提供的函数查找偏移量的新 API 方法
ConsumerCallback 提供了一个新的 API,可以根据用户定义的函数查找偏移量,该函数将消费者中的当前偏移量作为参数。有关更多详细信息,请参阅查找 API 文档。
@PartitionOffset 支持 SeekPosition
向 @PartitionOffset 添加 seekPosition 属性以支持 TopicPartitionOffset.SeekPosition。有关更多详细信息,请参阅手动分配。
TopicPartitionOffset 中接受一个用于计算要查找的偏移量的函数的新构造函数
TopicPartitionOffset 有一个新的构造函数,它接受一个用户提供的函数来计算要查找的偏移量。当使用此构造函数时,框架会调用该函数,并以当前消费者偏移量位置作为输入参数。有关更多详细信息,请参阅查找 API 文档。
Spring Boot 应用程序名称作为默认客户端 ID 前缀
对于定义了应用程序名称的 Spring Boot 应用程序,此名称现在用作某些客户端类型自动生成的客户端 ID 的默认前缀。有关更多详细信息,请参阅默认客户端 ID 前缀。
增强的 MessageListenerContainers 检索
ListenerContainerRegistry 提供了两个新的 API,用于动态查找和过滤 MessageListenerContainer 实例。getListenerContainersMatching(Predicate<String> idMatcher) 按 ID 过滤,另一个是 getListenerContainersMatching(BiPredicate<String, MessageListenerContainer> matcher) 按 ID 和容器属性过滤。
有关更多信息,请参阅@KafkaListener 生命周期管理的 API 文档。
通过提供更多跟踪标签增强观察
KafkaTemplateObservation 提供了更多跟踪标签(低基数)。KafkaListenerObservation 提供了新的 API 来查找高基数键名和更多跟踪标签(高或低基数)。请参阅Micrometer 观察
3.1 版本相比 3.0 版本的新功能
本节涵盖从 3.0 版到 3.1 版的更改。有关早期版本的更改,请参阅变更历史。
EmbeddedKafkaBroker
现在提供了一个额外的实现,以使用 Kraft 而不是 Zookeeper。有关更多信息,请参阅嵌入式 Kafka 代理。
JsonDeserializer
当发生反序列化异常时,SerializationException 消息不再包含格式为 Can’t deserialize data [[123, 34, 98, 97, 122, … 的数据;每个数据字节的数值数组没有用,对于大量数据可能会很冗长。当与 ErrorHandlingDeserializer 一起使用时,发送给错误处理程序的 DeserializationException 包含 data 属性,该属性包含无法反序列化的原始数据。当不与 ErrorHandlingDeserializer 一起使用时,KafkaConsumer 将持续为同一记录发出异常,显示主题/分区/偏移量以及 Jackson 抛出的原因。
ContainerPostProcessor
可以通过在 @KafkaListener 注解上指定 ContainerPostProcessor 的 bean 名称来对监听器容器应用后处理。这发生在容器创建之后以及在容器工厂上配置的任何 ContainerCustomizer 配置之后。有关更多信息,请参阅容器工厂。
ErrorHandlingDeserializer
现在可以将 Validator 添加到此反序列化器中;如果委托 Deserializer 成功反序列化对象,但该对象未能通过验证,则会抛出类似于发生反序列化异常的异常。这允许将原始原始数据传递给错误处理程序。有关更多信息,请参阅使用 ErrorHandlingDeserializer。
可重试主题
当 @RetryableTopic(backOff = @BackOff(delay = 5000), attempts = "2", fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC) 时,将后缀 -retry-5000 更改为 -retry。如果您想保留后缀 -retry-5000,请使用 @RetryableTopic(backOff = @BackOff(delay = 5000), attempts = "2")。有关更多信息,请参阅主题命名。
监听器容器更改
当手动分配分区,且消费者 group.id 为 null 时,AckMode 现在会自动强制转换为 MANUAL。有关更多信息,请参阅手动分配所有分区。
3.0 版本相比 2.9 版本的新功能
观察
现在支持使用 Micrometer 启用计时器和跟踪的观察。有关更多信息,请参阅观察。
原生镜像
提供了创建原生镜像的支持。有关更多信息,请参阅原生镜像。
全局单个嵌入式 Kafka
嵌入式 Kafka (EmbeddedKafkaBroker) 现在可以作为整个测试计划的单个全局实例启动。有关更多信息,请参阅为多个测试类使用相同的代理。
可重试主题更改
此功能不再被视为实验性功能(就其 API 而言),该功能本身自 2.7 版本以来就已受支持,但出现 API 破坏性更改的可能性高于正常情况。
此版本中非阻塞重试基础设施 bean 的引导方式已更改,以避免在某些应用程序中出现应用程序初始化相关的时序问题。
您现在可以为重试容器设置不同的 concurrency;默认情况下,并发性与主容器相同。
@RetryableTopic 现在可以用作自定义注解上的元注解,包括支持 @AliasFor 属性。
有关更多信息,请参阅配置。
重试主题的默认复制因子现在是 -1(使用代理默认值)。如果您的代理版本早于 2.4,则现在需要明确设置该属性。
您现在可以在同一个应用程序上下文中为同一个主题配置多个 @RetryableTopic 监听器。以前这是不可能的。有关更多信息,请参阅多个监听器,相同主题。
RetryTopicConfigurationSupport 中存在破坏性 API 更改;具体来说,如果您重写 destinationTopicResolver、kafkaConsumerBackoffManager 和/或 retryTopicConfigurer 的 bean 定义方法;这些方法现在需要一个 ObjectProvider<RetryTopicComponentFactory> 参数。
监听器容器更改
现在,容器会发布与消费者身份验证和授权失败相关的事件。有关更多信息,请参阅应用程序事件。
您现在可以自定义消费者线程使用的线程名称。有关更多信息,请参阅容器线程命名。
已添加容器属性 restartAfterAuthException。有关更多信息,请参阅监听器容器属性。
KafkaTemplate 更改
此类返回的 Future 现在是 CompletableFuture,而不是 ListenableFuture。请参阅使用 KafkaTemplate。
ReplyingKafkaTemplate 更改
此类返回的 Future 现在是 CompletableFuture,而不是 ListenableFuture。请参阅使用 ReplyingKafkaTemplate 和使用 Message<?> 进行请求/回复。
@KafkaListener 更改
您现在可以使用自定义关联头,该头将在任何回复消息中回显。有关更多信息,请参阅使用 ReplyingKafkaTemplate 末尾的注释。
您现在可以在处理整个批次之前手动提交批次的部分内容。有关更多信息,请参阅提交偏移量。
KafkaHeaders 更改
KafkaHeaders 中在 2.9.x 中弃用的四个常量现已移除。
-
使用
KEY代替MESSAGE_KEY。 -
使用
PARTITION代替PARTITION_ID
同样,RECEIVED_KEY 替换 RECEIVED_MESSAGE_KEY,RECEIVED_PARTITION 替换 RECEIVED_PARTITION_ID。
测试更改
版本 3.0.7 引入了 MockConsumerFactory 和 MockProducerFactory。有关更多信息,请参阅模拟消费者和生产者。
从 3.0.10 版本开始,嵌入式 Kafka 代理默认将 Spring Boot 属性 spring.kafka.bootstrap-servers 设置为嵌入式代理的地址。
2.9 版本相比 2.8 版本的新功能
错误处理程序更改
DefaultErrorHandler 现在可以配置为暂停容器一次轮询,并使用上一次轮询的剩余结果,而不是查找剩余记录的偏移量。有关更多信息,请参阅DefaultErrorHandler。
DefaultErrorHandler 现在有一个 BackOffHandler 属性。有关更多信息,请参阅Back Off 处理程序。
监听器容器更改
interceptBeforeTx 现在适用于所有事务管理器(以前仅在使用 KafkaAwareTransactionManager 时应用)。请参阅[interceptBeforeTx]。
提供了一个新的容器属性 pauseImmediate,它允许容器在处理当前记录后暂停消费者,而不是在处理完上一次轮询中的所有记录之后。请参阅[pauseImmediate]。
与消费者身份验证和授权相关的事件
Header Mapper 更改
您现在可以配置哪些入站头应该被映射。在 2.8.8 或更高版本中也可用。有关更多信息,请参阅消息头。
KafkaTemplate 更改
在 3.0 版本中,此类返回的 Future 将是 CompletableFuture,而不是 ListenableFuture。有关在使用此版本时进行过渡的帮助,请参阅使用 KafkaTemplate。
ReplyingKafkaTemplate 更改
该模板现在提供了一种在回复容器上等待分配的方法,以避免在回复容器初始化之前发送请求时出现竞争条件。在 2.8.8 或更高版本中也可用。请参阅使用 ReplyingKafkaTemplate。
在 3.0 版本中,此类返回的 Future 将是 CompletableFuture,而不是 ListenableFuture。有关在使用此版本时进行过渡的帮助,请参阅使用 ReplyingKafkaTemplate 和使用 Message<?> 进行请求/回复。
2.8 版本相比 2.7 版本的新功能
本节涵盖从 2.7 版到 2.8 版的更改。有关早期版本的更改,请参阅变更历史。
包更改
与类型映射相关的类和接口已从 ...support.converter 移动到 ...support.mapping。
-
AbstractJavaTypeMapper -
ClassMapper -
DefaultJackson2JavaTypeMapper -
Jackson2JavaTypeMapper
乱序手动提交
现在可以配置监听器容器以接受乱序(通常是异步)的手动偏移量提交。容器将推迟提交,直到确认缺失的偏移量。有关更多信息,请参阅手动提交偏移量。
@KafkaListener 更改
现在可以在方法本身上指定监听器方法是否是批处理监听器。这允许同一个容器工厂用于记录监听器和批处理监听器。
有关更多信息,请参阅[批处理监听器]。
批处理监听器现在可以处理转换异常。
有关更多信息,请参阅使用批处理错误处理程序进行转换错误。
RecordFilterStrategy 在与批处理监听器一起使用时,现在可以在一次调用中过滤整个批处理。有关更多信息,请参阅[批处理监听器]末尾的注释。
@KafkaListener 注解现在具有 filter 属性,用于仅为此监听器覆盖容器工厂的 RecordFilterStrategy。
@KafkaListener 注解现在具有 info 属性;这用于填充新的监听器容器属性 listenerInfo。然后,这用于在每个记录中填充 KafkaHeaders.LISTENER_INFO 头,该头可以在 RecordInterceptor、RecordFilterStrategy 或监听器本身中使用。有关更多信息,请参阅监听器信息头和AbstractMessageListenerContainer 属性。
KafkaTemplate 更改
现在,给定主题、分区和偏移量,您可以接收单个记录。有关更多信息,请参阅使用 KafkaTemplate 接收。
添加 CommonErrorHandler
旧的 GenericErrorHandler 及其用于记录和批处理监听器的子接口层次结构已被新的单个接口 CommonErrorHandler 取代,其实现对应于 GenericErrorHandler 的大多数旧实现。有关更多信息,请参阅容器错误处理程序和将自定义旧版错误处理程序实现迁移到 CommonErrorHandler。
监听器容器更改
interceptBeforeTx 容器属性现在默认为 true。
authorizationExceptionRetryInterval 属性已重命名为 authExceptionRetryInterval,现在除了以前的 AuthorizationException 之外,还适用于 AuthenticationException。这两个异常都被认为是致命的,容器默认会停止,除非设置此属性。
有关更多信息,请参阅使用 KafkaMessageListenerContainer 和监听器容器属性。
序列化器/反序列化器更改
现在提供了 DelegatingByTopicSerializer 和 DelegatingByTopicDeserializer。有关更多信息,请参阅委托序列化器和反序列化器。
DeadLetterPublishingRecover 更改
属性 stripPreviousExceptionHeaders 现在默认为 true。
现在有几种技术可以自定义添加到输出记录中的头。
有关更多信息,请参阅管理死信记录头。
可重试主题更改
现在可以将同一个工厂用于可重试和不可重试的主题。有关更多信息,请参阅指定 ListenerContainerFactory。
现在有一个可管理的致命异常全局列表,这些异常将使失败的记录直接进入 DLT。请参阅异常分类器了解如何管理它。
您现在可以结合使用阻塞和非阻塞重试。有关更多信息,请参阅结合阻塞和非阻塞重试。
现在,在使用可重试主题功能时抛出的 KafkaBackOffException 会以 DEBUG 级别记录。如果需要将日志级别改回 WARN 或设置为任何其他级别,请参阅更改 KafkaBackOffException 日志级别。
2.6 与 2.7 之间的更改
Kafka 客户端版本
此版本需要 2.7.0 的 kafka-clients。它也与 2.7.1 版以来的 2.8.0 客户端兼容;请参阅覆盖 Spring Boot 依赖项。
使用主题的非阻塞延迟重试
此版本中添加了这一重要新功能。当严格排序不重要时,失败的传递可以发送到另一个主题,以便稍后消费。可以配置一系列此类重试主题,并增加延迟。有关更多信息,请参阅非阻塞重试。
监听器容器更改
onlyLogRecordMetadata 容器属性现在默认为 true。
现在提供了一个新的容器属性 stopImmediate。
有关更多信息,请参阅监听器容器属性。
在传递尝试之间使用 BackOff 的错误处理程序(例如 SeekToCurrentErrorHandler 和 DefaultAfterRollbackProcessor)现在会在容器停止后不久退出退避间隔,而不是延迟停止。
扩展 FailedRecordProcessor 的错误处理程序和回滚后处理器现在可以配置一个或多个 RetryListener 以接收有关重试和恢复进度的信息。
RecordInterceptor 现在有在监听器返回(正常或抛出异常)后调用的额外方法。它还有一个子接口 ConsumerAwareRecordInterceptor。此外,现在还有一个用于批处理监听器的 BatchInterceptor。有关更多信息,请参阅消息监听器容器。
@KafkaListener 更改
您现在可以验证 @KafkaHandler 方法(类级别监听器)的有效负载参数。有关更多信息,请参阅@KafkaListener @Payload 验证。
您现在可以在 MessagingMessageConverter 和 BatchMessagingMessageConverter 上设置 rawRecordHeader 属性,这将导致原始 ConsumerRecord 添加到转换后的 Message<?> 中。例如,如果您希望在监听器错误处理程序中使用 DeadLetterPublishingRecoverer,这将很有用。有关更多信息,请参阅监听器错误处理程序。
您现在可以在应用程序初始化期间修改 @KafkaListener 注解。有关更多信息,请参阅@KafkaListener 属性修改。
DeadLetterPublishingRecover 更改
现在,如果键和值都反序列化失败,原始值将发布到 DLT。以前,值被填充,但键 DeserializationException 留在头中。如果您子类化了恢复器并重写了 createProducerRecord 方法,则存在一个破坏性 API 更改。
此外,恢复器在发布到 DLT 之前会验证目标解析器选择的分区是否实际存在。
有关更多信息,请参阅发布死信记录。
ChainedKafkaTransactionManager 已弃用
有关更多信息,请参阅事务。
ReplyingKafkaTemplate 更改
现在有一个机制可以检查回复并在存在某些条件时使 future 异常失败。
已添加对发送和接收 spring-messaging Message<?> 的支持。
有关更多信息,请参阅使用 ReplyingKafkaTemplate。
Kafka Streams 更改
默认情况下,StreamsBuilderFactoryBean 现在配置为不清理本地状态。有关更多信息,请参阅配置。
KafkaAdmin 更改
已添加新方法 createOrModifyTopics 和 describeTopics。已添加 KafkaAdmin.NewTopics 以方便在单个 bean 中配置多个主题。有关更多信息,请参阅[配置主题]。
MessageConverter 更改
现在可以将 spring-messaging SmartMessageConverter 添加到 MessagingMessageConverter 中,从而允许基于 contentType 头进行内容协商。有关更多信息,请参阅Spring Messaging 消息转换。
排序 @KafkaListener
有关更多信息,请参阅按顺序启动 @KafkaListener。
ExponentialBackOffWithMaxRetries
提供了一种新的 BackOff 实现,使配置最大重试次数更加方便。有关更多信息,请参阅ExponentialBackOffWithMaxRetries 实现。
条件委托错误处理程序
这些新的错误处理程序可以配置为委托给不同的错误处理程序,具体取决于异常类型。有关更多信息,请参阅委托错误处理程序。
2.5 与 2.6 之间的更改
侦听器容器的更改
默认的 EOSMode 现在是 BETA。有关更多信息,请参阅精确一次语义。
各种错误处理程序(扩展 FailedRecordProcessor)和 DefaultAfterRollbackProcessor 现在会在恢复失败时重置 BackOff。此外,您现在可以根据失败的记录和/或异常选择要使用的 BackOff。
您现在可以在容器属性中配置 adviceChain。有关更多信息,请参阅监听器容器属性。
当容器配置为发布 ListenerContainerIdleEvent 时,它现在会在发布空闲事件后收到记录时发布 ListenerContainerNoLongerIdleEvent。有关更多信息,请参阅应用程序事件和检测空闲和无响应的消费者。
@KafkaListener 更改
当使用手动分区分配时,您现在可以指定一个通配符来确定哪些分区应该重置为初始偏移量。此外,如果监听器实现了 ConsumerSeekAware,则 onPartitionsAssigned() 在手动分配后被调用。(在 2.5.5 版本中也已添加)。有关更多信息,请参阅显式分区分配。
已向 AbstractConsumerSeekAware 添加了便利方法,以简化查找。有关更多信息,请参阅[查找]。
ErrorHandler 更改
FailedRecordProcessor 的子类(例如 SeekToCurrentErrorHandler、DefaultAfterRollbackProcessor、RecoveringBatchErrorHandler)现在可以配置为在异常类型与先前在此记录中发生的异常类型不同时重置重试状态。
生产者工厂更改
您现在可以设置生产者的最大生命周期,在此之后它们将被关闭并重新创建。有关更多信息,请参阅事务。
您现在可以在创建 DefaultKafkaProducerFactory 后更新配置映射。例如,如果您在凭据更改后必须更新 SSL 密钥/信任存储位置,这可能很有用。有关更多信息,请参阅使用 DefaultKafkaProducerFactory。
2.4 与 2.5 之间的更改
本节涵盖从 2.4 版到 2.5 版的更改。有关早期版本的更改,请参阅变更历史。
消费者/生产者工厂更改
默认的消费者和生产者工厂现在可以在创建或关闭消费者或生产者时调用回调。提供了本机 Micrometer 指标的实现。有关更多信息,请参阅工厂监听器。
您现在可以在运行时更改引导服务器属性,从而实现故障转移到另一个 Kafka 集群。有关更多信息,请参阅连接到 Kafka。
StreamsBuilderFactoryBean 更改
工厂 bean 现在可以在创建或销毁 KafkaStreams 时调用回调。提供了本机 Micrometer 指标的实现。有关更多信息,请参阅KafkaStreams Micrometer 支持。
传递尝试头
现在有一个选项,在使用某些错误处理程序和回滚后处理器时,添加一个跟踪传递尝试的头。有关更多信息,请参阅传递尝试头。
@KafkaListener 更改
当 @KafkaListener 返回类型为 Message<?> 时,如果需要,现在会自动填充默认回复头。有关更多信息,请参阅回复类型 Message<?>。
当传入记录的键为 null 时,KafkaHeaders.RECEIVED_MESSAGE_KEY 不再填充 null 值;该头完全被省略。
@KafkaListener 方法现在可以指定 ConsumerRecordMetadata 参数,而不是使用单独的头来获取元数据,例如主题、分区等。有关更多信息,请参阅消费者记录元数据。
监听器容器更改
assignmentCommitOption 容器属性现在默认为 LATEST_ONLY_NO_TX。有关更多信息,请参阅监听器容器属性。
在使用事务时,subBatchPerPartition 容器属性现在默认为 true。有关更多信息,请参阅事务。
现在提供了一个新的 RecoveringBatchErrorHandler。
现在支持静态组员资格。有关更多信息,请参阅消息监听器容器。
当配置增量/协作重新平衡时,如果偏移量提交因非致命的 RebalanceInProgressException 而失败,容器将尝试重新提交在重新平衡完成后仍分配给此实例的分区的偏移量。
现在,记录监听器的默认错误处理程序是 SeekToCurrentErrorHandler,批处理监听器的默认错误处理程序是 RecoveringBatchErrorHandler。有关更多信息,请参阅容器错误处理程序。
您现在可以控制标准错误处理程序有意抛出的异常的日志级别。有关更多信息,请参阅容器错误处理程序。
已添加 getAssignmentsByClientId() 方法,使确定并发容器中哪些消费者分配到哪些分区变得更容易。有关更多信息,请参阅监听器容器属性。
您现在可以禁止在错误、调试日志等中记录整个 ConsumerRecord。请参阅监听器容器属性中的 onlyLogRecordMetadata。
KafkaTemplate 更改
KafkaTemplate 现在可以维护 Micrometer 计时器。有关更多信息,请参阅监控。
KafkaTemplate 现在可以配置 ProducerConfig 属性,以覆盖生产者工厂中的属性。有关更多信息,请参阅使用 KafkaTemplate。
现在提供了一个 RoutingKafkaTemplate。有关更多信息,请参阅使用 RoutingKafkaTemplate。
您现在可以使用 KafkaSendCallback 而不是 ListenerFutureCallback 来获取更窄的异常,从而更容易提取失败的 ProducerRecord。有关更多信息,请参阅使用 KafkaTemplate。
Kafka 字符串序列化器/反序列化器
现在提供了新的 ToStringSerializer/StringDeserializer 以及相关的 SerDe。有关更多信息,请参阅字符串序列化。
JsonDeserializer
JsonDeserializer 现在具有更大的灵活性来确定反序列化类型。有关更多信息,请参阅使用方法确定类型。
委托序列化器/反序列化器
当出站记录没有头时,DelegatingSerializer 现在可以处理“标准”类型。有关更多信息,请参阅委托序列化器和反序列化器。
测试更改
KafkaTestUtils.consumerProps() 辅助记录现在默认将 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG 设置为 earliest。有关更多信息,请参阅JUnit。
2.3 与 2.4 之间的更改
ConsumerAwareRebalanceListener
与 ConsumerRebalanceListener 类似,此接口现在具有一个额外的方法 onPartitionsLost。有关更多信息,请参阅 Apache Kafka 文档。
与 ConsumerRebalanceListener 不同,默认实现不调用 onPartitionsRevoked。相反,监听器容器将在调用 onPartitionsLost 后调用该方法;因此,在实现 ConsumerAwareRebalanceListener 时,您不应该做同样的事情。
有关更多信息,请参阅再平衡监听器末尾的 IMPORTANT 注释。
KafkaTemplate
KafkaTemplate 现在支持非事务性发布与事务性发布。有关更多信息,请参阅KafkaTemplate 事务性和非事务性发布。
AggregatingReplyingKafkaTemplate
releaseStrategy 现在是一个 BiConsumer。它现在在超时后(以及记录到达时)调用;第二个参数在超时后调用时为 true。
有关更多信息,请参阅聚合多个回复。
监听器容器
ContainerProperties 提供了一个 authorizationExceptionRetryInterval 选项,允许监听器容器在 KafkaConsumer 抛出任何 AuthorizationException 后重试。有关更多信息,请参阅其 Javadoc 和使用 KafkaMessageListenerContainer。
@KafkaListener
@KafkaListener 注解有一个新属性 splitIterables;默认为 true。当回复监听器返回 Iterable 时,此属性控制返回结果是作为单个记录发送还是为每个元素发送一个记录。有关更多信息,请参阅使用 @SendTo 转发监听器结果
批处理监听器现在可以配置 BatchToRecordAdapter;例如,这允许在事务中处理批处理,而监听器一次获取一条记录。使用默认实现,ConsumerRecordRecoverer 可用于处理批处理中的错误,而无需停止整个批处理的进程 - 这在使用事务时可能很有用。有关更多信息,请参阅使用批处理监听器的事务。
Kafka Streams
StreamsBuilderFactoryBean 接受一个新的属性 KafkaStreamsInfrastructureCustomizer。这允许在创建流之前配置构建器和/或拓扑。有关更多信息,请参阅Spring 管理。
2.2 与 2.3 之间的更改
本节涵盖从 2.2 版到 2.3 版的更改。
提示、技巧和示例
已添加新章节提示、技巧和示例。请提交 GitHub 问题和/或拉取请求以添加更多条目到该章节。
配置更改
从 2.3.4 版本开始,missingTopicsFatal 容器属性默认值为 false。当此属性为 true 时,如果代理关闭,应用程序将无法启动;许多用户受到此更改的影响;鉴于 Kafka 是一个高可用平台,我们没有预料到在没有活动代理的情况下启动应用程序会是一个常见用例。
生产者和消费者工厂更改
DefaultKafkaProducerFactory 现在可以配置为为每个线程创建一个生产者。您还可以在构造函数中提供 Supplier<Serializer> 实例,作为配置类(需要无参构造函数)或使用 Serializer 实例(然后由所有生产者共享)的替代方案。有关更多信息,请参阅使用 DefaultKafkaProducerFactory。
DefaultKafkaConsumerFactory 中提供了相同的选项,使用 Supplier<Deserializer> 实例。有关更多信息,请参阅使用 KafkaMessageListenerContainer。
监听器容器更改
以前,当使用监听器适配器(例如 @KafkaListener)调用监听器时,错误处理程序会收到 ListenerExecutionFailedException(实际监听器异常作为 cause)。由原生 GenericMessageListener 抛出的异常会原封不动地传递给错误处理程序。现在 ListenerExecutionFailedException 始终是参数(实际监听器异常作为 cause),它提供了对容器 group.id 属性的访问。
因为监听器容器有自己的提交偏移量机制,所以它倾向于 Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 为 false。现在它会自动将其设置为 false,除非在消费者工厂或容器的消费者属性覆盖中明确设置。
ackOnError 属性现在默认为 false。
现在可以在监听器方法中获取消费者的 group.id 属性。有关更多信息,请参阅获取消费者 group.id。
容器有一个新属性 recordInterceptor,允许在调用监听器之前检查或修改记录。还提供了一个 CompositeRecordInterceptor,以防您需要调用多个拦截器。有关更多信息,请参阅消息监听器容器。
ConsumerSeekAware 具有新的方法,允许您相对于开始、结束或当前位置执行查找,并查找大于或等于时间戳的第一个偏移量。有关更多信息,请参阅[查找]。
现在提供了一个方便的类 AbstractConsumerSeekAware 来简化查找。有关更多信息,请参阅[查找]。
ContainerProperties 提供了一个 idleBetweenPolls 选项,允许监听器容器中的主循环在 KafkaConsumer.poll() 调用之间休眠。有关更多信息,请参阅其 Javadoc 和使用 KafkaMessageListenerContainer。
当使用 AckMode.MANUAL(或 MANUAL_IMMEDIATE)时,您现在可以通过在 Acknowledgment 上调用 nack 来导致重新传递。有关更多信息,请参阅提交偏移量。
现在可以使用 Micrometer Timer 监控监听器性能。有关更多信息,请参阅监控。
容器现在发布与启动相关的额外消费者生命周期事件。有关更多信息,请参阅应用程序事件。
事务批处理监听器现在可以支持僵尸围栏。有关更多信息,请参阅事务。
监听器容器工厂现在可以配置 ContainerCustomizer,以便在创建和配置每个容器后进一步配置它们。有关更多信息,请参阅容器工厂。
ErrorHandler 更改
SeekToCurrentErrorHandler 现在将某些异常视为致命异常,并禁用它们的重试,在第一次失败时调用恢复器。
SeekToCurrentErrorHandler 和 SeekToCurrentBatchErrorHandler 现在可以配置为在传递尝试之间应用 BackOff(线程睡眠)。
从 2.3.2 版本开始,当错误处理程序在恢复失败记录后返回时,恢复的记录的偏移量将被提交。
DeadLetterPublishingRecoverer 在与 ErrorHandlingDeserializer 结合使用时,现在将发送到死信主题的消息有效负载设置为无法反序列化的原始值。以前,它是 null,用户代码需要从消息头中提取 DeserializationException。有关更多信息,请参阅发布死信记录。
TopicBuilder
提供了一个新类 TopicBuilder,用于更方便地创建 NewTopic @Bean,用于自动主题配置。有关更多信息,请参阅[配置主题]。
Kafka Streams 更改
您现在可以对 @EnableKafkaStreams 创建的 StreamsBuilderFactoryBean 进行额外配置。有关更多信息,请参阅流配置。
现在提供了 RecoveringDeserializationExceptionHandler,它允许恢复具有反序列化错误的记录。它可以与 DeadLetterPublishingRecoverer 结合使用,将这些记录发送到死信主题。有关更多信息,请参阅从反序列化异常中恢复。
已提供 HeaderEnricher 转换器,使用 SpEL 生成头值。有关更多信息,请参阅Header Enricher。
已提供 MessagingTransformer。这允许 Kafka Streams 拓扑与 Spring Messaging 组件(例如 Spring Integration 流)进行交互。有关更多信息,请参阅MessagingProcessor 和从 KStream 调用 Spring Integration 流。
JSON 组件更改
现在所有 JSON 感知组件都默认配置了一个由 JacksonUtils.enhancedObjectMapper() 生成的 Jackson ObjectMapper。JsonDeserializer 现在提供了基于 TypeReference 的构造函数,以便更好地处理目标泛型容器类型。此外,还引入了一个 JacksonMimeTypeModule,用于将 org.springframework.util.MimeType 序列化为纯字符串。有关更多信息,请参阅其 JavaDocs 和序列化、反序列化和消息转换。
已提供 ByteArrayJsonMessageConverter 以及所有 Json 转换器的新超类 JsonMessageConverter。此外,现在还提供了 StringOrBytesSerializer;它可以序列化 ProducerRecord 中的 byte[]、Bytes 和 String 值。有关更多信息,请参阅Spring Messaging 消息转换。
JsonSerializer、JsonDeserializer 和 JsonSerde 现在具有流式 API,可简化编程配置。有关更多信息,请参阅 javadocs、序列化、反序列化和消息转换以及流 JSON 序列化和反序列化。
ReplyingKafkaTemplate
当回复超时时,future 会以 KafkaReplyTimeoutException 而不是 KafkaException 异常完成。
此外,现在提供了一个重载的 sendAndReceive 方法,允许根据每个消息指定回复超时。
AggregatingReplyingKafkaTemplate
通过聚合来自多个接收器的回复来扩展 ReplyingKafkaTemplate。有关更多信息,请参阅聚合多个回复。
事务更改
您现在可以在 KafkaTemplate 和 KafkaTransactionManager 上覆盖生产者工厂的 transactionIdPrefix。有关更多信息,请参阅transactionIdPrefix。
新委托序列化器/反序列化器
该框架现在提供了一个委托 Serializer 和 Deserializer,利用头文件来启用生产和消费具有多种键/值类型的记录。有关更多信息,请参阅委托序列化器和反序列化器。
新重试反序列化器
该框架现在提供了一个委托 RetryingDeserializer,用于在可能发生网络问题等瞬时错误时重试序列化。有关更多信息,请参阅重试反序列化器。
2.1 与 2.2 之间的更改
类和包更改
ContainerProperties 类已从 org.springframework.kafka.listener.config 移动到 org.springframework.kafka.listener。
AckMode 枚举已从 AbstractMessageListenerContainer 移动到 ContainerProperties。
setBatchErrorHandler() 和 setErrorHandler() 方法已从 ContainerProperties 移动到 AbstractMessageListenerContainer 和 AbstractKafkaListenerContainerFactory。
回滚后处理
提供了一个新的 AfterRollbackProcessor 策略。有关更多信息,请参阅回滚后处理器。
ConcurrentKafkaListenerContainerFactory 更改
您现在可以使用 ConcurrentKafkaListenerContainerFactory 创建和配置任何 ConcurrentMessageListenerContainer,而不仅仅是用于 @KafkaListener 注解的容器。有关更多信息,请参阅容器工厂。
监听器容器更改
已添加一个新的容器属性 (missingTopicsFatal)。有关更多信息,请参阅使用 KafkaMessageListenerContainer。
当消费者停止时,现在会发出 ConsumerStoppedEvent。有关更多信息,请参阅线程安全。
批处理监听器可以选择接收完整的 ConsumerRecords<?, ?> 对象,而不是 List<ConsumerRecord<?, ?>。有关更多信息,请参阅[批处理监听器]。
DefaultAfterRollbackProcessor 和 SeekToCurrentErrorHandler 现在可以恢复(跳过)持续失败的记录,并且默认在 10 次失败后执行此操作。它们可以配置为将失败的记录发布到死信主题。
从 2.2.4 版本开始,消费者组 ID 可以在选择死信主题名称时使用。
已添加 ConsumerStoppingEvent。有关更多信息,请参阅应用程序事件。
当容器配置为 AckMode.MANUAL_IMMEDIATE 时,SeekToCurrentErrorHandler 现在可以配置为提交恢复的记录的偏移量(从 2.2.4 开始)。
@KafkaListener 更改
您现在可以通过在注解上设置属性来覆盖监听器容器工厂的 concurrency 和 autoStartup 属性。您现在可以添加配置以确定哪些头(如果有)被复制到回复消息中。有关更多信息,请参阅@KafkaListener 注解。
您现在可以在自己的注解上将 @KafkaListener 用作元注解。有关更多信息,请参阅@KafkaListener 作为元注解。
现在更容易配置用于 @Payload 验证的 Validator。有关更多信息,请参阅@KafkaListener @Payload 验证。
您现在可以直接在注解上指定 kafka 消费者属性;这些属性将覆盖消费者工厂中定义的同名属性(从 2.2.4 版本开始)。有关更多信息,请参阅注解属性。
头映射更改
类型为 MimeType 和 MediaType 的头现在在 RecordHeader 值中映射为简单字符串。以前,它们被映射为 JSON,并且只有 MimeType 被解码。MediaType 无法解码。它们现在是简单的字符串,以实现互操作性。
此外,JsonKafkaHeaderMapper 有一个新的 addToStringClasses 方法,允许指定应使用 toString() 而不是 JSON 映射的类型。有关更多信息,请参阅消息头。
嵌入式 Kafka 更改
KafkaEmbedded 类及其 KafkaRule 接口已弃用,取而代之的是 EmbeddedKafkaBroker 及其 JUnit 4 EmbeddedKafkaRule 包装器。@EmbeddedKafka 注解现在填充一个 EmbeddedKafkaBroker bean,而不是已弃用的 KafkaEmbedded。此更改允许在 JUnit 5 测试中使用 @EmbeddedKafka。@EmbeddedKafka 注解现在具有 ports 属性,用于指定填充 EmbeddedKafkaBroker 的端口。有关更多信息,请参阅测试应用程序。
JsonSerializer/Deserializer 增强功能
您现在可以通过使用生产者和消费者属性来提供类型映射信息。
反序列化器上提供了新的构造函数,允许使用提供的目标类型覆盖类型头信息。
JsonDeserializer 现在默认删除任何类型信息头。
您现在可以配置 JsonDeserializer 以通过使用 Kafka 属性忽略类型信息头(从 2.2.3 开始)。
有关更多信息,请参阅序列化、反序列化和消息转换。
Kafka Streams 更改
流配置 bean 现在必须是 KafkaStreamsConfiguration 对象,而不是 StreamsConfig 对象。
StreamsBuilderFactoryBean 已从包 ...core 移动到 ...config。
引入了 KafkaStreamBrancher,以改善在 KStream 实例之上构建条件分支时的用户体验。
有关更多信息,请参阅Apache Kafka Streams 支持和配置。
事务 ID
当监听器容器启动事务时,transactional.id 现在是 transactionIdPrefix 附加 <group.id>.<topic>.<partition>。此更改允许正确隔离僵尸进程,如此处所述。
2.0 与 2.1 之间的更改
JSON 改进
StringJsonMessageConverter 和 JsonSerializer 现在在 Headers 中添加类型信息,允许转换器和 JsonDeserializer 在接收时创建特定类型,基于消息本身而不是固定的配置类型。有关更多信息,请参阅序列化、反序列化和消息转换。
容器停止错误处理程序
现在为记录和批处理监听器提供了容器错误处理程序,它们将监听器抛出的任何异常视为致命错误,并停止容器。有关更多信息,请参阅处理异常。
暂停和恢复容器
监听器容器现在具有 pause() 和 resume() 方法(从 2.1.3 版本开始)。有关更多信息,请参阅暂停和恢复监听器容器。
有状态重试
从 2.1.3 版本开始,您可以配置有状态重试。有关更多信息,请参阅有状态重试。
客户端 ID
从版本 2.1.1 开始,您现在可以在 @KafkaListener 上设置 client.id 前缀。以前,为了自定义客户端 ID,每个监听器都需要单独的消费者工厂(和容器工厂)。当您使用并发时,前缀会以 -n 作为后缀,以提供唯一的客户端 ID。
日志记录偏移量提交
默认情况下,主题偏移量提交的日志记录以 DEBUG 日志级别执行。从版本 2.1.2 开始,ContainerProperties 中的一个新属性 commitLogLevel 允许您指定这些消息的日志级别。有关更多信息,请参阅使用 KafkaMessageListenerContainer。
默认 @KafkaHandler
从版本 2.1.3 开始,您可以将类级别 @KafkaListener 上的一个 @KafkaHandler 注解指定为默认注解。有关更多信息,请参阅类上的 @KafkaListener。
ReplyingKafkaTemplate
从版本 2.1.3 开始,提供了一个 KafkaTemplate 的子类来支持请求/回复语义。有关更多信息,请参阅使用 ReplyingKafkaTemplate。
从 2.0 迁移指南
请参阅2.0 到 2.1 迁移指南。
1.3 和 2.0 之间的变化
@KafkaListener 变更
您现在可以使用 @SendTo 注解 @KafkaListener 方法(以及类和 @KafkaHandler 方法)。如果方法返回结果,它将被转发到指定的主题。有关更多信息,请参阅使用 @SendTo 转发监听器结果。
消息监听器
消息监听器现在可以感知 Consumer 对象。有关更多信息,请参阅[消息监听器]。
使用 ConsumerAwareRebalanceListener
重新平衡监听器现在可以在重新平衡通知期间访问 Consumer 对象。有关更多信息,请参阅重新平衡监听器。
1.2 和 1.3 之间的变化
事务支持
0.11.0.0 客户端库增加了对事务的支持。KafkaTransactionManager 和其他事务支持已添加。有关更多信息,请参阅事务。
Header 支持
0.11.0.0 客户端库增加了对消息 header 的支持。这些现在可以映射到 spring-messaging MessageHeaders 和从 spring-messaging MessageHeaders 映射。有关更多信息,请参阅消息 Header。
Kafka 时间戳支持
KafkaTemplate 现在支持一个 API 来添加带有时间戳的记录。已引入与 timestamp 支持相关的新 KafkaHeaders。此外,还添加了新的 KafkaConditions.timestamp() 和 KafkaMatchers.hasTimestamp() 测试实用程序。有关更多详细信息,请参阅使用 KafkaTemplate、@KafkaListener 注解和测试应用程序。
@KafkaListener 变更
您现在可以配置 KafkaListenerErrorHandler 来处理异常。有关更多信息,请参阅处理异常。
默认情况下,@KafkaListener 的 id 属性现在用作 group.id 属性,覆盖消费者工厂中配置的属性(如果存在)。此外,您可以在注解上显式配置 groupId。以前,您需要一个单独的容器工厂(和消费者工厂)才能为监听器使用不同的 group.id 值。要恢复使用工厂配置的 group.id 的先前行为,请将注解上的 idIsGroup 属性设置为 false。
@EmbeddedKafka 注解
为方便起见,提供了一个测试类级别 @EmbeddedKafka 注解,用于将 KafkaEmbedded 注册为 bean。有关更多信息,请参阅测试应用程序。
Kerberos 配置
现在提供 Kerberos 配置支持。有关更多信息,请参阅JAAS 和 Kerberos。
1.0 和 1.1 之间的变化
查找
您现在可以查找每个主题或分区的位置。当使用组管理并且 Kafka 分配分区时,您可以使用它在初始化期间设置初始位置。当检测到空闲容器时,或者在应用程序执行的任何任意点,您也可以查找。有关更多信息,请参阅[查找]。