变更历史
3.1 相比 3.0 的新功能
本节介绍了从版本 3.0 到版本 3.1 的更改。有关早期版本的更改,请参阅 变更历史。
EmbeddedKafkaBroker
现在提供了一个额外的实现来使用 Kraft
而不是 Zookeeper。有关更多信息,请参阅 嵌入式 Kafka 代理。
JsonDeserializer
当反序列化异常发生时,SerializationException
消息不再包含 Can’t deserialize data [[123, 34, 98, 97, 122, …
形式的数据;每个数据字节的数值数组没有用处,对于大型数据来说可能会很冗长。当与 ErrorHandlingDeserializer
一起使用时,发送到错误处理程序的 DeserializationException
包含 data
属性,该属性包含无法反序列化的原始数据。当不与 ErrorHandlingDeserializer
一起使用时,KafkaConsumer
将持续为同一记录发出异常,显示主题/分区/偏移量以及 Jackson 引发的异常原因。
ContainerPostProcessor
可以通过在 @KafkaListener
注解上指定 ContainerPostProcessor
的 bean 名称,在监听器容器上应用后处理。这发生在容器创建之后,以及在容器工厂上配置的任何 ContainerCustomizer
配置之后。有关更多信息,请参见 容器工厂。
ErrorHandlingDeserializer
您现在可以向此反序列化器添加 Validator
;如果委托 Deserializer
成功反序列化对象,但该对象验证失败,则会抛出异常,类似于反序列化异常发生。这允许将原始原始数据传递给错误处理程序。有关更多信息,请参见 使用 ErrorHandlingDeserializer
。
可重试主题
当 @RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2", fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
时,将后缀 -retry-5000
更改为 -retry
。如果您想保留后缀 -retry-5000
,请使用 @RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2")
。有关更多信息,请参见 主题命名。
监听器容器更改
当手动分配分区时,使用 null
消费者 group.id
,AckMode
现在会自动强制转换为 MANUAL
。有关更多信息,请参见 手动分配所有分区。
3.0 中的新增功能(自 2.9 起)
观察
现在支持使用 Micrometer 为计时器和跟踪启用观察。有关更多信息,请参阅 观察。
原生镜像
提供对创建原生镜像的支持。有关更多信息,请参阅 原生镜像。
全局单个嵌入式 Kafka
嵌入式 Kafka (EmbeddedKafkaBroker
) 现在可以作为整个测试计划的单个全局实例启动。有关更多信息,请参阅 对多个测试类使用相同的代理。
可重试主题更改
此功能不再被视为实验性(就其 API 而言),该功能本身已从 2.7 版本开始支持,但 API 更改的可能性高于正常水平。
此版本中 非阻塞重试 基础设施 bean 的引导已更改,以避免在某些应用程序中发生的与应用程序初始化相关的某些计时问题。
您现在可以为重试容器设置不同的 concurrency
;默认情况下,并发性与主容器相同。
@RetryableTopic
现在可以用作自定义注释的元注释,包括对 @AliasFor
属性的支持。
有关更多信息,请参阅 配置。
重试主题的默认副本因子现在为 -1
(使用代理默认值)。如果您的代理早于 2.4 版本,您现在需要显式设置该属性。
您现在可以在同一个应用程序上下文中,在同一个主题上配置多个 @RetryableTopic
监听器。以前,这是不可能的。有关更多信息,请参阅 多个监听器,相同主题。
RetryTopicConfigurationSupport
中存在破坏性 API 更改;具体来说,如果您覆盖 destinationTopicResolver
、kafkaConsumerBackoffManager
和/或 retryTopicConfigurer
的 bean 定义方法;这些方法现在需要一个 ObjectProvider<RetryTopicComponentFactory>
参数。
监听器容器更改
容器现在发布与消费者身份验证和授权失败相关的事件。有关更多信息,请参见 应用程序事件。
您现在可以自定义消费者线程使用的线程名称。有关更多信息,请参见 容器线程命名。
添加了容器属性 restartAfterAuthException
。有关更多信息,请参见 监听器容器属性。
KafkaTemplate
更改
此类返回的期货现在是 CompletableFuture
而不是 ListenableFuture
。有关更多信息,请参见 使用 KafkaTemplate
。
ReplyingKafkaTemplate
更改
此类返回的期货现在是 CompletableFuture
而不是 ListenableFuture
。有关更多信息,请参见 使用 ReplyingKafkaTemplate
和 使用 Message<?>
进行请求/回复。
@KafkaListener
更改
您现在可以使用自定义关联标头,该标头将在任何回复消息中回显。有关更多信息,请参见 使用 ReplyingKafkaTemplate
结尾的说明。
您现在可以在整个批次处理完之前手动提交批次的一部分。有关更多信息,请参见 提交偏移量。
KafkaHeaders
更改
KafkaHeaders
中的四个在 2.9.x 中已弃用的常量现已删除。
-
请使用
KEY
代替MESSAGE_KEY
。 -
请使用
PARTITION
代替PARTITION_ID
类似地,RECEIVED_MESSAGE_KEY
被 RECEIVED_KEY
替换,RECEIVED_PARTITION_ID
被 RECEIVED_PARTITION
替换。
测试更改
版本 3.0.7 引入了 MockConsumerFactory
和 MockProducerFactory
。有关更多信息,请参见 模拟消费者和生产者。
从版本 3.0.10 开始,嵌入式 Kafka 代理默认情况下将 Spring Boot 属性 spring.kafka.bootstrap-servers
设置为嵌入式代理的地址。
2.9 相比 2.8 的新功能
错误处理程序更改
现在可以将 DefaultErrorHandler
配置为暂停容器一个轮询,并使用来自先前轮询的剩余结果,而不是寻求剩余记录的偏移量。有关更多信息,请参见 DefaultErrorHandler。
DefaultErrorHandler
现在具有 BackOffHandler
属性。有关更多信息,请参见 后退处理程序。
监听器容器更改
interceptBeforeTx
现在适用于所有事务管理器(以前仅在使用 KafkaAwareTransactionManager
时应用)。请参见 [interceptBeforeTx]。
提供了一个新的容器属性 pauseImmediate
,它允许容器在处理完当前记录后暂停消费者,而不是在处理完先前轮询的所有记录后暂停消费者。请参见 [pauseImmediate]。
与消费者身份验证和授权相关的事件
标头映射器更改
您现在可以配置哪些入站标头应被映射。此功能在 2.8.8 或更高版本中也可用。有关更多信息,请参见 消息标头。
KafkaTemplate
更改
在 3.0 中,此类返回的期货将是 CompletableFuture
而不是 ListenableFuture
。有关使用此版本时过渡的帮助,请参见 使用 KafkaTemplate
。
ReplyingKafkaTemplate
更改
该模板现在提供了一种方法来等待回复容器上的分配,以避免在回复容器初始化之前发送请求时出现竞争条件。此功能在 2.8.8 或更高版本中也可用。请参见 使用 ReplyingKafkaTemplate
。
在 3.0 中,此类返回的期货将是 CompletableFuture
而不是 ListenableFuture
。有关使用此版本时过渡的帮助,请参见 使用 ReplyingKafkaTemplate
和 使用 Message<?>
进行请求/回复。
2.8 相比 2.7 的新功能
本节介绍了从 2.7 版本到 2.8 版本的更改。有关早期版本的更改,请参见 更改历史记录。
包变更
与类型映射相关的类和接口已从 …support.converter
移动到 …support.mapping
。
-
AbstractJavaTypeMapper
-
ClassMapper
-
DefaultJackson2JavaTypeMapper
-
Jackson2JavaTypeMapper
乱序手动提交
监听器容器现在可以配置为接受乱序(通常是异步)的手动偏移量提交。容器将推迟提交,直到确认缺少的偏移量。有关更多信息,请参见 手动提交偏移量。
@KafkaListener
变更
现在可以在方法本身指定监听器方法是否为批处理监听器。这允许同一个容器工厂用于记录和批处理监听器。
有关更多信息,请参见 [批处理监听器]。
批处理监听器现在可以处理转换异常。
有关更多信息,请参见 批处理错误处理程序的转换错误。
RecordFilterStrategy
在与批处理监听器一起使用时,现在可以一次性过滤整个批次。有关更多信息,请参见 [批处理监听器] 结尾的说明。
@KafkaListener
注解现在具有 filter
属性,用于覆盖容器工厂的 RecordFilterStrategy
,仅针对此监听器。
KafkaTemplate
变更
现在,您可以接收单个记录,前提是您知道主题、分区和偏移量。有关更多信息,请参见 使用 KafkaTemplate
接收。
添加了 CommonErrorHandler
传统的GenericErrorHandler
及其用于记录和批处理监听器的子接口层次结构已被新的单一接口CommonErrorHandler
取代,该接口的实现对应于大多数GenericErrorHandler
的传统实现。有关更多信息,请参见容器错误处理程序和将自定义传统错误处理程序实现迁移到CommonErrorHandler
。
监听器容器更改
interceptBeforeTx
容器属性现在默认情况下为true
。
authorizationExceptionRetryInterval
属性已重命名为authExceptionRetryInterval
,现在除了以前AuthorizationException
之外,还适用于AuthenticationException
。这两种异常都被视为致命异常,容器默认情况下将停止,除非设置此属性。
有关更多信息,请参见使用KafkaMessageListenerContainer
和监听器容器属性。
序列化器/反序列化器更改
现在提供了DelegatingByTopicSerializer
和DelegatingByTopicDeserializer
。有关更多信息,请参见委托序列化器和反序列化器。
DeadLetterPublishingRecover
更改
属性stripPreviousExceptionHeaders
现在默认情况下为true
。
现在有几种技术可以自定义添加到输出记录中的标头。
有关更多信息,请参见管理死信记录标头。
可重试主题更改
现在,您可以对可重试主题和不可重试主题使用相同的工厂。有关更多信息,请参见指定监听器容器工厂。
现在有一个可管理的全局致命异常列表,这些异常将使失败的记录直接进入DLT。请参阅异常分类器,了解如何管理它。
现在,您可以结合使用阻塞和非阻塞重试。有关更多信息,请参见结合阻塞和非阻塞重试。
使用可重试主题功能时抛出的 KafkaBackOffException 现在在 DEBUG 级别记录。如果您需要将日志级别更改回 WARN 或设置为任何其他级别,请参阅 更改 KafkaBackOffException 日志级别。
2.6 和 2.7 之间的更改
Kafka 客户端版本
此版本需要 2.7.0 kafka-clients
。它也与 2.8.0 客户端兼容,从 2.7.1 版本开始;请参阅 覆盖 Spring Boot 依赖项。
使用主题的非阻塞延迟重试
此重大新功能在此版本中添加。当严格排序不重要时,失败的传递可以发送到另一个主题以供稍后使用。可以配置一系列这样的重试主题,并具有递增的延迟。有关更多信息,请参阅 非阻塞重试。
监听器容器更改
onlyLogRecordMetadata
容器属性现在默认情况下为 true
。
现在可以使用新的容器属性 stopImmediate
。
有关更多信息,请参阅 监听器容器属性。
在传递尝试之间使用 BackOff
的错误处理程序(例如 SeekToCurrentErrorHandler
和 DefaultAfterRollbackProcessor
)现在将在容器停止后不久退出回退间隔,而不是延迟停止。
扩展 FailedRecordProcessor
的错误处理程序和回滚后处理器现在可以使用一个或多个 RetryListener
配置,以接收有关重试和恢复进度的信息。
RecordInterceptor
现在具有在监听器返回后(正常或通过抛出异常)调用的附加方法。它还有一个子接口 ConsumerAwareRecordInterceptor
。此外,现在有一个 BatchInterceptor
用于批处理监听器。有关更多信息,请参阅 消息监听器容器。
@KafkaListener
更改
您现在可以验证 @KafkaHandler
方法(类级监听器)的有效负载参数。有关更多信息,请参阅 @KafkaListener
@Payload
验证。
现在,您可以在 MessagingMessageConverter
和 BatchMessagingMessageConverter
上设置 rawRecordHeader
属性,这会导致原始 ConsumerRecord
被添加到转换后的 Message<?>
中。例如,如果您希望在监听器错误处理程序中使用 DeadLetterPublishingRecoverer
,这将非常有用。有关更多信息,请参阅 监听器错误处理程序。
您现在可以在应用程序初始化期间修改 @KafkaListener
注解。有关更多信息,请参阅 @KafkaListener
属性修改。
DeadLetterPublishingRecover
更改
现在,如果键和值都无法反序列化,则原始值将发布到 DLT。以前,值已填充,但键 DeserializationException
仍然保留在标头中。如果您是继承了恢复器并覆盖了 createProducerRecord
方法,则存在一个破坏性 API 更改。
此外,恢复器会验证目标解析器选择的分区在发布之前是否实际存在。
有关更多信息,请参阅 发布死信记录。
ChainedKafkaTransactionManager
已弃用
有关更多信息,请参阅 事务。
ReplyingKafkaTemplate
更改
现在有一种机制可以检查回复,如果存在某些条件,则使 future 异常失败。
已添加对发送和接收 spring-messaging
Message<?>
的支持。
有关更多信息,请参阅 使用 ReplyingKafkaTemplate
。
Kafka Streams 更改
默认情况下,StreamsBuilderFactoryBean
现在配置为不清理本地状态。有关更多信息,请参阅 配置。
KafkaAdmin
更改
已添加新方法 createOrModifyTopics
和 describeTopics
。已添加 KafkaAdmin.NewTopics
以便于在单个 bean 中配置多个主题。有关更多信息,请参阅 [configuring-topics]。
MessageConverter
更改
现在可以将 spring-messaging
SmartMessageConverter
添加到 MessagingMessageConverter
中,允许根据 contentType
标头进行内容协商。有关更多信息,请参见 Spring 消息消息转换。
排序 @KafkaListener
有关更多信息,请参见 按顺序启动 @KafkaListener
。
ExponentialBackOffWithMaxRetries
提供了一个新的 BackOff
实现,使配置最大重试次数更加方便。有关更多信息,请参见 ExponentialBackOffWithMaxRetries
实现。
条件委托错误处理程序
这些新的错误处理程序可以配置为根据异常类型委托给不同的错误处理程序。有关更多信息,请参见 委托错误处理程序。
2.5 和 2.6 之间的更改
监听器容器更改
默认的 EOSMode
现在是 BETA
。有关更多信息,请参见 精确一次语义。
各种错误处理程序(扩展 FailedRecordProcessor
)和 DefaultAfterRollbackProcessor
现在在恢复失败时重置 BackOff
。此外,您现在可以根据失败的记录和/或异常选择要使用的 BackOff
。
您现在可以在容器属性中配置 adviceChain
。有关更多信息,请参见 监听器容器属性。
当容器配置为发布 ListenerContainerIdleEvent
时,它现在会在接收记录后发布 ListenerContainerNoLongerIdleEvent
,此前它发布了一个空闲事件。有关更多信息,请参见 应用程序事件 和 检测空闲和无响应的消费者。
@KafkaListener 更改
在使用手动分区分配时,您现在可以指定一个通配符来确定应将哪些分区重置为初始偏移量。此外,如果监听器实现了 ConsumerSeekAware
,则在手动分配后会调用 onPartitionsAssigned()
。(也在 2.5.5 版本中添加)。有关更多信息,请参见 显式分区分配。
已向 AbstractConsumerSeekAware
添加了便利方法,以简化搜索操作。有关更多信息,请参见 [seek]。
错误处理程序更改
FailedRecordProcessor
的子类(例如 SeekToCurrentErrorHandler
、DefaultAfterRollbackProcessor
、RecoveringBatchErrorHandler
)现在可以配置为,如果异常类型与之前针对此记录发生的异常类型不同,则重置重试状态。
生产者工厂更改
您现在可以为生产者设置最大生存时间,超过该时间后,生产者将被关闭并重新创建。有关更多信息,请参阅 事务。
您现在可以在创建 DefaultKafkaProducerFactory
后更新配置映射。例如,如果您需要在凭据更改后更新 SSL 密钥/信任库位置,这将很有用。有关更多信息,请参阅 使用 DefaultKafkaProducerFactory
。
2.4 和 2.5 之间的更改
本节介绍从版本 2.4 到版本 2.5 的更改。有关早期版本的更改,请参阅 更改历史记录。
消费者/生产者工厂更改
默认的消费者和生产者工厂现在可以在创建或关闭消费者或生产者时调用回调。提供了用于本机 Micrometer 指标的实现。有关更多信息,请参阅 工厂侦听器。
您现在可以在运行时更改引导服务器属性,从而启用故障转移到另一个 Kafka 集群。有关更多信息,请参阅 连接到 Kafka。
StreamsBuilderFactoryBean
更改
工厂 Bean 现在可以在创建或销毁 KafkaStreams
时调用回调。提供了用于本机 Micrometer 指标的实现。有关更多信息,请参阅 KafkaStreams Micrometer 支持。
传递尝试标头
现在可以使用某些错误处理程序和回滚处理器后添加一个跟踪传递尝试的标头。有关更多信息,请参见 传递尝试标头。
@KafkaListener 更改
如果 @KafkaListener
返回类型为 Message<?>
,则现在将自动填充默认回复标头(如果需要)。有关更多信息,请参见 回复类型 Message<?>。
当传入记录的键为 null
时,KafkaHeaders.RECEIVED_MESSAGE_KEY
将不再填充 null
值;该标头将完全省略。
@KafkaListener
方法现在可以指定 ConsumerRecordMetadata
参数,而不是使用用于元数据(如主题、分区等)的离散标头。有关更多信息,请参见 消费者记录元数据。
监听器容器更改
assignmentCommitOption
容器属性现在默认情况下为 LATEST_ONLY_NO_TX
。有关更多信息,请参见 监听器容器属性。
使用事务时,subBatchPerPartition
容器属性现在默认情况下为 true
。有关更多信息,请参见 事务。
现在提供了一个新的 RecoveringBatchErrorHandler
。
现在支持静态组成员资格。有关更多信息,请参见 消息监听器容器。
如果配置了增量/协作重新平衡,如果偏移量未能使用非致命 RebalanceInProgressException
提交,则容器将尝试重新提交重新平衡完成后仍分配给此实例的分区的偏移量。
默认错误处理程序现在是记录监听器的 SeekToCurrentErrorHandler
和批处理监听器的 RecoveringBatchErrorHandler
。有关更多信息,请参见 容器错误处理程序。
现在可以控制标准错误处理程序故意抛出的异常的日志记录级别。有关更多信息,请参见 容器错误处理程序。
已添加 getAssignmentsByClientId()
方法,使确定并发容器中的哪些消费者分配了哪些分区变得更加容易。有关更多信息,请参见 监听器容器属性。
现在可以抑制在错误、调试日志等中记录整个 ConsumerRecord
。请参见 监听器容器属性 中的 onlyLogRecordMetadata
。
KafkaTemplate 更改
KafkaTemplate
现在可以维护 micrometer 计时器。有关更多信息,请参见 监控。
现在可以通过 ProducerConfig
属性配置 KafkaTemplate
来覆盖生产者工厂中的属性。有关更多信息,请参见 使用 KafkaTemplate
。
现在提供了一个 RoutingKafkaTemplate
。有关更多信息,请参见 使用 RoutingKafkaTemplate
。
现在可以使用 KafkaSendCallback
代替 ListenerFutureCallback
来获取更窄的异常,从而更容易提取失败的 ProducerRecord
。有关更多信息,请参见 使用 KafkaTemplate
。
Kafka 字符串序列化器/反序列化器
现在提供了新的 ToStringSerializer
/StringDeserializer
以及相关的 SerDe
。有关更多信息,请参见 字符串序列化。
JsonDeserializer
JsonDeserializer
现在具有更大的灵活性来确定反序列化类型。有关更多信息,请参见 使用方法确定类型。
委托序列化器/反序列化器
DelegatingSerializer
现在可以处理“标准”类型,当出站记录没有头时。有关更多信息,请参见 委托序列化器和反序列化器。
测试更改
KafkaTestUtils.consumerProps()
帮助程序记录现在默认将 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
设置为 earliest
。有关更多信息,请参见 JUnit。
2.3 和 2.4 之间的更改
ConsumerAwareRebalanceListener
与 ConsumerRebalanceListener
一样,此接口现在还有一个额外的 onPartitionsLost
方法。有关更多信息,请参阅 Apache Kafka 文档。
与 ConsumerRebalanceListener
不同,默认实现不会调用 onPartitionsRevoked
。相反,监听器容器将在调用 onPartitionsLost
后调用该方法;因此,在实现 ConsumerAwareRebalanceListener
时,您不应该这样做。
有关更多信息,请参见 重新平衡监听器 末尾的“重要”说明。
KafkaTemplate
KafkaTemplate
现在支持非事务性发布以及事务性发布。有关更多信息,请参阅 KafkaTemplate
事务性和非事务性发布。
AggregatingReplyingKafkaTemplate
releaseStrategy
现在是一个 BiConsumer
。它现在在超时后(以及记录到达时)被调用;在超时后调用时,第二个参数为 true
。
有关更多信息,请参阅 聚合多个回复。
监听器容器
ContainerProperties
提供了一个 authorizationExceptionRetryInterval
选项,允许监听器容器在 KafkaConsumer
抛出任何 AuthorizationException
后重试。有关更多信息,请参阅其 JavaDocs 和 使用 KafkaMessageListenerContainer
。
@KafkaListener
@KafkaListener
注解有一个新的属性 splitIterables
;默认值为 true。当回复监听器返回一个 Iterable
时,此属性控制是否将返回结果作为单个记录发送,或者为每个元素发送一个记录。有关更多信息,请参阅 使用 @SendTo
转发监听器结果。
批处理监听器现在可以使用 BatchToRecordAdapter
进行配置;例如,这允许在事务中处理批处理,而监听器一次获取一条记录。使用默认实现,ConsumerRecordRecoverer
可用于处理批处理中的错误,而不会停止整个批处理的处理 - 这在使用事务时可能很有用。有关更多信息,请参阅 批处理监听器的事务。
Kafka Streams
StreamsBuilderFactoryBean
接受一个新的属性 KafkaStreamsInfrastructureCustomizer
。这允许在创建流之前配置构建器和/或拓扑。有关更多信息,请参阅 Spring 管理。
2.2 和 2.3 之间的更改
本节介绍从版本 2.2 到版本 2.3 的更改。
提示、技巧和示例
添加了一个新章节 提示、技巧和示例。请提交 GitHub 问题和/或拉取请求以在该章节中添加更多条目。
配置变更
从 2.3.4 版本开始,missingTopicsFatal
容器属性默认值为 false。当该属性为 true 时,如果 Broker 宕机,应用程序将无法启动;许多用户受到此更改的影响;鉴于 Kafka 是一个高可用性平台,我们没有预料到启动没有活动 Broker 的应用程序会是一个常见用例。
生产者和消费者工厂变更
DefaultKafkaProducerFactory
现在可以配置为每个线程创建一个生产者。您也可以在构造函数中提供 Supplier<Serializer>
实例,作为配置类(需要无参数构造函数)或使用 Serializer
实例构造(然后在所有生产者之间共享)的替代方案。有关更多信息,请参阅 使用 DefaultKafkaProducerFactory
。
DefaultKafkaConsumerFactory
中也提供了 Supplier<Deserializer>
实例的相同选项。有关更多信息,请参阅 使用 KafkaMessageListenerContainer
。
监听器容器变更
以前,当使用监听器适配器(例如 @KafkaListener
)调用监听器时,错误处理程序会收到 ListenerExecutionFailedException
(实际监听器异常作为 cause
)。由原生 GenericMessageListener
抛出的异常会原封不动地传递给错误处理程序。现在,ListenerExecutionFailedException
始终是参数(实际监听器异常作为 cause
),它提供了对容器的 group.id
属性的访问。
由于监听器容器有自己的提交偏移量机制,它更喜欢 Kafka 的 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
为 false
。现在,它会自动将其设置为 false,除非在消费者工厂或容器的消费者属性覆盖中明确设置。
ackOnError
属性现在默认值为 false
。
现在可以在监听器方法中获取消费者的 group.id
属性。有关更多信息,请参阅 获取消费者 group.id
。
容器添加了一个新的属性recordInterceptor
,允许在调用监听器之前检查或修改记录。如果需要调用多个拦截器,还可以使用CompositeRecordInterceptor
。有关更多信息,请参阅消息监听器容器。
ConsumerSeekAware
添加了新的方法,允许您相对于开头、结尾或当前位置执行查找,以及查找大于或等于时间戳的第一个偏移量。有关更多信息,请参阅[seek]。
现在提供了一个便利类AbstractConsumerSeekAware
来简化查找。有关更多信息,请参阅[seek]。
ContainerProperties
提供了一个idleBetweenPolls
选项,允许监听器容器中的主循环在KafkaConsumer.poll()
调用之间休眠。有关更多信息,请参阅其 JavaDocs 和使用KafkaMessageListenerContainer
。
当使用AckMode.MANUAL
(或MANUAL_IMMEDIATE
)时,您现在可以通过在Acknowledgment
上调用nack
来导致重新传递。有关更多信息,请参阅提交偏移量。
现在可以使用 Micrometer Timer
来监控监听器性能。有关更多信息,请参阅监控。
容器现在发布了与启动相关的其他消费者生命周期事件。有关更多信息,请参阅应用程序事件。
事务批处理监听器现在可以支持僵尸围栏。有关更多信息,请参阅事务。
监听器容器工厂现在可以使用ContainerCustomizer
进行配置,以便在创建和配置每个容器后进一步配置它。有关更多信息,请参阅容器工厂。
错误处理程序更改
SeekToCurrentErrorHandler
现在将某些异常视为致命异常,并为这些异常禁用重试,在第一次失败时调用恢复器。
SeekToCurrentErrorHandler
和SeekToCurrentBatchErrorHandler
现在可以配置为在传递尝试之间应用BackOff
(线程休眠)。
从版本 2.3.2 开始,恢复的记录的偏移量将在错误处理程序在恢复失败的记录后返回时提交。
当与 ErrorHandlingDeserializer
结合使用时,DeadLetterPublishingRecoverer
现在将发送到死信主题的消息的有效负载设置为无法反序列化的原始值。以前,它是 null
,用户代码需要从消息头中提取 DeserializationException
。有关更多信息,请参阅 发布死信记录。
TopicBuilder
提供了一个新的类 TopicBuilder
,用于更方便地创建用于自动主题配置的 NewTopic
@Bean
。有关更多信息,请参阅 [configuring-topics]。
Kafka Streams 更改
您现在可以对 @EnableKafkaStreams
创建的 StreamsBuilderFactoryBean
进行额外配置。有关更多信息,请参阅 Streams 配置。
现在提供了一个 RecoveringDeserializationExceptionHandler
,它允许恢复具有反序列化错误的记录。它可以与 DeadLetterPublishingRecoverer
结合使用,将这些记录发送到死信主题。有关更多信息,请参阅 从反序列化异常中恢复。
提供了 HeaderEnricher
变换器,使用 SpEL 生成头值。有关更多信息,请参阅 头增强器。
提供了 MessagingTransformer
。这允许 Kafka Streams 拓扑与 Spring Messaging 组件(例如 Spring Integration 流)进行交互。有关更多信息,请参阅 MessagingProcessor
以及 [从 KStream
调用 Spring Integration 流]。
JSON 组件更改
现在,所有支持 JSON 的组件默认情况下都使用由 JacksonUtils.enhancedObjectMapper()
生成的 Jackson ObjectMapper
进行配置。JsonDeserializer
现在提供基于 TypeReference
的构造函数,以更好地处理目标泛型容器类型。此外,还引入了 JacksonMimeTypeModule
用于将 org.springframework.util.MimeType
序列化为普通字符串。有关更多信息,请参阅其 JavaDocs 和 序列化、反序列化和消息转换。
我们提供了一个 ByteArrayJsonMessageConverter
,以及一个所有 Json 转换器的新的超类 JsonMessageConverter
。此外,现在可以使用 StringOrBytesSerializer
;它可以在 ProducerRecord
中序列化 byte[]
、Bytes
和 String
值。有关更多信息,请参见 Spring 消息传递消息转换。
JsonSerializer
、JsonDeserializer
和 JsonSerde
现在具有流畅的 API,使程序化配置更简单。有关更多信息,请参见 javadoc、序列化、反序列化和消息转换 以及 流 JSON 序列化和反序列化。
ReplyingKafkaTemplate
当回复超时时,future 将以 KafkaReplyTimeoutException
而不是 KafkaException
异常完成。
此外,现在提供了一个重载的 sendAndReceive
方法,允许在每个消息的基础上指定回复超时。
AggregatingReplyingKafkaTemplate
通过聚合来自多个接收者的回复来扩展 ReplyingKafkaTemplate
。有关更多信息,请参见 聚合多个回复。
事务更改
您现在可以在 KafkaTemplate
和 KafkaTransactionManager
上覆盖生产者工厂的 transactionIdPrefix
。有关更多信息,请参见 transactionIdPrefix
。
新的委托序列化器/反序列化器
框架现在提供了一个委托 Serializer
和 Deserializer
,利用标头来启用使用多个键/值类型生成和使用记录。有关更多信息,请参见 委托序列化器和反序列化器。
新的重试反序列化器
框架现在提供了一个委托 RetryingDeserializer
,用于在发生网络问题等瞬态错误时重试序列化。有关更多信息,请参见 重试反序列化器。
2.1 和 2.2 之间的更改
类和包更改
ContainerProperties
类已从 org.springframework.kafka.listener.config
移动到 org.springframework.kafka.listener
。
AckMode
枚举已从 AbstractMessageListenerContainer
移动到 ContainerProperties
。
setBatchErrorHandler()
和 setErrorHandler()
方法已从 ContainerProperties
移动到 AbstractMessageListenerContainer
和 AbstractKafkaListenerContainerFactory
。
回滚处理后
提供了一种新的 AfterRollbackProcessor
策略。有关更多信息,请参见 回滚后处理器。
ConcurrentKafkaListenerContainerFactory
更改
您现在可以使用 ConcurrentKafkaListenerContainerFactory
创建和配置任何 ConcurrentMessageListenerContainer
,而不仅仅是那些用于 @KafkaListener
注释的容器。有关更多信息,请参见 容器工厂。
监听器容器更改
添加了一个新的容器属性 (missingTopicsFatal
)。有关更多信息,请参见 使用 KafkaMessageListenerContainer
。
当消费者停止时,现在会发出 ConsumerStoppedEvent
。有关更多信息,请参见 线程安全。
批处理监听器可以选择接收完整的 ConsumerRecords<?, ?>
对象,而不是 List<ConsumerRecord<?, ?>
。有关更多信息,请参见 [batch-listeners]。
DefaultAfterRollbackProcessor
和 SeekToCurrentErrorHandler
现在可以恢复(跳过)不断失败的记录,默认情况下,在 10 次失败后会这样做。可以将它们配置为将失败的记录发布到死信主题。
从 2.2.4 版本开始,在选择死信主题名称时可以使用消费者的组 ID。
已添加 ConsumerStoppingEvent
。有关更多信息,请参见 应用程序事件。
现在可以将 SeekToCurrentErrorHandler
配置为在容器配置为 AckMode.MANUAL_IMMEDIATE
时提交已恢复记录的偏移量(从 2.2.4 版本开始)。
@KafkaListener 变化
现在可以通过在注解上设置属性来覆盖监听器容器工厂的concurrency
和autoStartup
属性。现在可以添加配置来确定将哪些头(如果有)复制到回复消息中。有关更多信息,请参见@KafkaListener
注解。
现在可以在您自己的注解上使用@KafkaListener
作为元注解。有关更多信息,请参见@KafkaListener
作为元注解。
现在更容易为@Payload
验证配置Validator
。有关更多信息,请参见@KafkaListener
@Payload
验证。
现在可以在注解上直接指定 Kafka 消费者属性;这些属性将覆盖消费者工厂中定义的具有相同名称的任何属性(从版本 2.2.4 开始)。有关更多信息,请参见注解属性。
头映射变化
类型为MimeType
和MediaType
的头现在在RecordHeader
值中映射为简单字符串。以前,它们被映射为 JSON,并且只有MimeType
被解码。MediaType
无法解码。现在它们是简单的字符串,用于互操作性。
此外,DefaultKafkaHeaderMapper
有一个新的addToStringClasses
方法,允许通过使用toString()
而不是 JSON 来指定应该映射的类型。有关更多信息,请参见消息头。
嵌入式 Kafka 变化
KafkaEmbedded
类及其KafkaRule
接口已被弃用,取而代之的是EmbeddedKafkaBroker
及其 JUnit 4 EmbeddedKafkaRule
包装器。@EmbeddedKafka
注解现在填充EmbeddedKafkaBroker
bean 而不是已弃用的KafkaEmbedded
。此更改允许在 JUnit 5 测试中使用@EmbeddedKafka
。@EmbeddedKafka
注解现在具有属性ports
,用于指定填充EmbeddedKafkaBroker
的端口。有关更多信息,请参见测试应用程序。
JsonSerializer/Deserializer 增强功能
现在可以使用生产者和消费者属性提供类型映射信息。
反序列化器上提供了新的构造函数,允许使用提供的目标类型覆盖类型头信息。
JsonDeserializer
现在默认情况下会删除任何类型信息头。
现在可以使用 Kafka 属性配置 JsonDeserializer
以忽略类型信息头(自 2.2.3 版本起)。
有关更多信息,请参阅 序列化、反序列化和消息转换。
Kafka Streams 更改
流配置 Bean 现在必须是 KafkaStreamsConfiguration
对象,而不是 StreamsConfig
对象。
StreamsBuilderFactoryBean
已从 …core
包移动到 …config
包。
引入了 KafkaStreamBrancher
,以便在 KStream
实例之上构建条件分支时提供更好的最终用户体验。
有关更多信息,请参阅 Apache Kafka Streams 支持 和 配置。
事务 ID
当监听器容器启动事务时,transactional.id
现在是 transactionIdPrefix
附加 <group.id>.<topic>.<partition>
。此更改允许对僵尸进行适当的围栏,如本文所述。
2.0 和 2.1 之间的更改
JSON 改进
StringJsonMessageConverter
和 JsonSerializer
现在在 Headers
中添加类型信息,使转换器和 JsonDeserializer
能够根据消息本身而不是固定配置的类型在接收时创建特定类型。有关更多信息,请参阅 序列化、反序列化和消息转换。
容器停止错误处理程序
现在为记录和批处理监听器都提供了容器错误处理程序,这些处理程序将监听器抛出的任何异常视为致命错误/它们会停止容器。有关更多信息,请参阅 处理异常。
暂停和恢复容器
监听器容器现在具有 pause()
和 resume()
方法(自 2.1.3 版本起)。有关更多信息,请参阅 暂停和恢复监听器容器。
有状态重试
从 2.1.3 版本开始,您可以配置有状态重试。有关更多信息,请参阅 有状态重试。
客户端 ID
从 2.1.1 版本开始,您现在可以在 @KafkaListener
上设置 client.id
前缀。以前,要自定义客户端 ID,您需要为每个监听器使用单独的消费者工厂(和容器工厂)。该前缀后缀为 -n
,以便在您使用并发时提供唯一的客户端 ID。
记录偏移量提交
默认情况下,主题偏移量提交的记录使用 DEBUG
日志级别执行。从 2.1.2 版本开始,ContainerProperties
中的一个新属性 commitLogLevel
允许您指定这些消息的日志级别。有关更多信息,请参阅 使用 KafkaMessageListenerContainer
。
默认 @KafkaHandler
从 2.1.3 版本开始,您可以将类级别 @KafkaListener
上的 @KafkaHandler
注释之一指定为默认注释。有关更多信息,请参阅 类上的 @KafkaListener
。
ReplyingKafkaTemplate
从 2.1.3 版本开始,提供了一个 KafkaTemplate
的子类来支持请求/回复语义。有关更多信息,请参阅 使用 ReplyingKafkaTemplate
。
从 2.0 迁移指南
请参阅 2.0 到 2.1 迁移 指南。
1.3 和 2.0 之间的更改
@KafkaListener
更改
您现在可以使用 @SendTo
注释 @KafkaListener
方法(以及类和 @KafkaHandler
方法)。如果方法返回结果,则会将其转发到指定的主题。有关更多信息,请参阅 使用 @SendTo
转发监听器结果。
消息监听器
消息监听器现在可以感知到 Consumer
对象。有关更多信息,请参阅 [message-listeners]。
使用 ConsumerAwareRebalanceListener
重新平衡监听器现在可以在重新平衡通知期间访问 Consumer
对象。有关更多信息,请参阅 重新平衡监听器。
1.2 和 1.3 之间的变化
对事务的支持
0.11.0.0 客户端库添加了对事务的支持。KafkaTransactionManager
和其他对事务的支持已添加。有关更多信息,请参阅 事务。
对标头的支持
0.11.0.0 客户端库添加了对消息标头的支持。这些现在可以映射到 spring-messaging
MessageHeaders
以及从 spring-messaging
MessageHeaders
映射。有关更多信息,请参阅 消息标头。
对 Kafka 时间戳的支持
KafkaTemplate
现在支持一个 API 来添加带有时间戳的记录。已引入新的 KafkaHeaders
来支持 timestamp
。此外,还添加了新的 KafkaConditions.timestamp()
和 KafkaMatchers.hasTimestamp()
测试实用程序。有关更多详细信息,请参阅 使用 KafkaTemplate
、@KafkaListener
注解 和 测试应用程序。
@KafkaListener
更改
您现在可以配置 KafkaListenerErrorHandler
来处理异常。有关更多信息,请参阅 处理异常。
默认情况下,@KafkaListener
的 id
属性现在用作 group.id
属性,覆盖消费者工厂中配置的属性(如果存在)。此外,您可以在注解上显式配置 groupId
。以前,您需要单独的容器工厂(和消费者工厂)才能为监听器使用不同的 group.id
值。要恢复使用工厂配置的 group.id
的先前行为,请将注解上的 idIsGroup
属性设置为 false
。
@EmbeddedKafka
注解
为了方便起见,提供了一个测试类级别的 @EmbeddedKafka
注解,用于将 KafkaEmbedded
注册为 bean。有关更多信息,请参阅 测试应用程序。
Kerberos 配置
现在提供对配置 Kerberos 的支持。有关更多信息,请参阅 JAAS 和 Kerberos。
1.0 和 1.1 之间的更改
查找
您现在可以查找每个主题或分区的当前位置。您可以使用它在初始化期间设置初始位置,此时组管理正在使用中,并且 Kafka 分配了分区。您也可以在检测到空闲容器时或在应用程序执行过程中的任意点进行查找。有关更多信息,请参阅 [查找]。