变更历史

3.1 相比 3.0 的新功能

本节介绍了从版本 3.0 到版本 3.1 的更改。有关早期版本的更改,请参阅 变更历史

Kafka 客户端版本

此版本需要 3.6.0 kafka-clients

EmbeddedKafkaBroker

现在提供了一个额外的实现来使用 Kraft 而不是 Zookeeper。有关更多信息,请参阅 嵌入式 Kafka 代理

JsonDeserializer

当反序列化异常发生时,SerializationException 消息不再包含 Can’t deserialize data [[123, 34, 98, 97, 122, …​ 形式的数据;每个数据字节的数值数组没有用处,对于大型数据来说可能会很冗长。当与 ErrorHandlingDeserializer 一起使用时,发送到错误处理程序的 DeserializationException 包含 data 属性,该属性包含无法反序列化的原始数据。当不与 ErrorHandlingDeserializer 一起使用时,KafkaConsumer 将持续为同一记录发出异常,显示主题/分区/偏移量以及 Jackson 引发的异常原因。

ContainerPostProcessor

可以通过在 @KafkaListener 注解上指定 ContainerPostProcessor 的 bean 名称,在监听器容器上应用后处理。这发生在容器创建之后,以及在容器工厂上配置的任何 ContainerCustomizer 配置之后。有关更多信息,请参见 容器工厂

ErrorHandlingDeserializer

您现在可以向此反序列化器添加 Validator;如果委托 Deserializer 成功反序列化对象,但该对象验证失败,则会抛出异常,类似于反序列化异常发生。这允许将原始原始数据传递给错误处理程序。有关更多信息,请参见 使用 ErrorHandlingDeserializer

可重试主题

@RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2", fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC) 时,将后缀 -retry-5000 更改为 -retry。如果您想保留后缀 -retry-5000,请使用 @RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2")。有关更多信息,请参见 主题命名

监听器容器更改

当手动分配分区时,使用 null 消费者 group.idAckMode 现在会自动强制转换为 MANUAL。有关更多信息,请参见 手动分配所有分区

3.0 中的新增功能(自 2.9 起)

Kafka 客户端版本

此版本需要 3.3.1 的 kafka-clients

精确一次语义

EOSMode.V1(也称为 ALPHA)不再受支持。

使用事务时,最小代理版本为 2.5。

有关更多信息,请参阅 精确一次语义KIP-447

观察

现在支持使用 Micrometer 为计时器和跟踪启用观察。有关更多信息,请参阅 观察

原生镜像

提供对创建原生镜像的支持。有关更多信息,请参阅 原生镜像

全局单个嵌入式 Kafka

嵌入式 Kafka (EmbeddedKafkaBroker) 现在可以作为整个测试计划的单个全局实例启动。有关更多信息,请参阅 对多个测试类使用相同的代理

可重试主题更改

此功能不再被视为实验性(就其 API 而言),该功能本身已从 2.7 版本开始支持,但 API 更改的可能性高于正常水平。

此版本中 非阻塞重试 基础设施 bean 的引导已更改,以避免在某些应用程序中发生的与应用程序初始化相关的某些计时问题。

您现在可以为重试容器设置不同的 concurrency;默认情况下,并发性与主容器相同。

@RetryableTopic 现在可以用作自定义注释的元注释,包括对 @AliasFor 属性的支持。

有关更多信息,请参阅 配置

重试主题的默认副本因子现在为 -1(使用代理默认值)。如果您的代理早于 2.4 版本,您现在需要显式设置该属性。

您现在可以在同一个应用程序上下文中,在同一个主题上配置多个 @RetryableTopic 监听器。以前,这是不可能的。有关更多信息,请参阅 多个监听器,相同主题

RetryTopicConfigurationSupport 中存在破坏性 API 更改;具体来说,如果您覆盖 destinationTopicResolverkafkaConsumerBackoffManager 和/或 retryTopicConfigurer 的 bean 定义方法;这些方法现在需要一个 ObjectProvider<RetryTopicComponentFactory> 参数。

监听器容器更改

容器现在发布与消费者身份验证和授权失败相关的事件。有关更多信息,请参见 应用程序事件

您现在可以自定义消费者线程使用的线程名称。有关更多信息,请参见 容器线程命名

添加了容器属性 restartAfterAuthException。有关更多信息,请参见 监听器容器属性

KafkaTemplate 更改

此类返回的期货现在是 CompletableFuture 而不是 ListenableFuture。有关更多信息,请参见 使用 KafkaTemplate

ReplyingKafkaTemplate 更改

此类返回的期货现在是 CompletableFuture 而不是 ListenableFuture。有关更多信息,请参见 使用 ReplyingKafkaTemplate使用 Message<?> 进行请求/回复

@KafkaListener 更改

您现在可以使用自定义关联标头,该标头将在任何回复消息中回显。有关更多信息,请参见 使用 ReplyingKafkaTemplate 结尾的说明。

您现在可以在整个批次处理完之前手动提交批次的一部分。有关更多信息,请参见 提交偏移量

KafkaHeaders 更改

KafkaHeaders 中的四个在 2.9.x 中已弃用的常量现已删除。

  • 请使用 KEY 代替 MESSAGE_KEY

  • 请使用 PARTITION 代替 PARTITION_ID

类似地,RECEIVED_MESSAGE_KEYRECEIVED_KEY 替换,RECEIVED_PARTITION_IDRECEIVED_PARTITION 替换。

测试更改

版本 3.0.7 引入了 MockConsumerFactoryMockProducerFactory。有关更多信息,请参见 模拟消费者和生产者

从版本 3.0.10 开始,嵌入式 Kafka 代理默认情况下将 Spring Boot 属性 spring.kafka.bootstrap-servers 设置为嵌入式代理的地址。

2.9 相比 2.8 的新功能

Kafka 客户端版本

此版本需要 3.2.0 的 kafka-clients

错误处理程序更改

现在可以将 DefaultErrorHandler 配置为暂停容器一个轮询,并使用来自先前轮询的剩余结果,而不是寻求剩余记录的偏移量。有关更多信息,请参见 DefaultErrorHandler

DefaultErrorHandler 现在具有 BackOffHandler 属性。有关更多信息,请参见 后退处理程序

监听器容器更改

interceptBeforeTx 现在适用于所有事务管理器(以前仅在使用 KafkaAwareTransactionManager 时应用)。请参见 [interceptBeforeTx]

提供了一个新的容器属性 pauseImmediate,它允许容器在处理完当前记录后暂停消费者,而不是在处理完先前轮询的所有记录后暂停消费者。请参见 [pauseImmediate]

与消费者身份验证和授权相关的事件

标头映射器更改

您现在可以配置哪些入站标头应被映射。此功能在 2.8.8 或更高版本中也可用。有关更多信息,请参见 消息标头

KafkaTemplate 更改

在 3.0 中,此类返回的期货将是 CompletableFuture 而不是 ListenableFuture。有关使用此版本时过渡的帮助,请参见 使用 KafkaTemplate

ReplyingKafkaTemplate 更改

该模板现在提供了一种方法来等待回复容器上的分配,以避免在回复容器初始化之前发送请求时出现竞争条件。此功能在 2.8.8 或更高版本中也可用。请参见 使用 ReplyingKafkaTemplate

在 3.0 中,此类返回的期货将是 CompletableFuture 而不是 ListenableFuture。有关使用此版本时过渡的帮助,请参见 使用 ReplyingKafkaTemplate使用 Message<?> 进行请求/回复

2.8 相比 2.7 的新功能

本节介绍了从 2.7 版本到 2.8 版本的更改。有关早期版本的更改,请参见 更改历史记录

Kafka 客户端版本

此版本需要 3.0.0 kafka-clients

包变更

与类型映射相关的类和接口已从 …​support.converter 移动到 …​support.mapping

  • AbstractJavaTypeMapper

  • ClassMapper

  • DefaultJackson2JavaTypeMapper

  • Jackson2JavaTypeMapper

乱序手动提交

监听器容器现在可以配置为接受乱序(通常是异步)的手动偏移量提交。容器将推迟提交,直到确认缺少的偏移量。有关更多信息,请参见 手动提交偏移量

@KafkaListener 变更

现在可以在方法本身指定监听器方法是否为批处理监听器。这允许同一个容器工厂用于记录和批处理监听器。

有关更多信息,请参见 [批处理监听器]

批处理监听器现在可以处理转换异常。

有关更多信息,请参见 批处理错误处理程序的转换错误

RecordFilterStrategy 在与批处理监听器一起使用时,现在可以一次性过滤整个批次。有关更多信息,请参见 [批处理监听器] 结尾的说明。

@KafkaListener 注解现在具有 filter 属性,用于覆盖容器工厂的 RecordFilterStrategy,仅针对此监听器。

@KafkaListener 注解现在具有 info 属性;它用于填充新的监听器容器属性 listenerInfo。然后,它用于填充每个记录中的 KafkaHeaders.LISTENER_INFO 标头,该标头可以在 RecordInterceptorRecordFilterStrategy 或监听器本身中使用。有关更多信息,请参见 监听器信息标头抽象监听器容器属性

KafkaTemplate 变更

现在,您可以接收单个记录,前提是您知道主题、分区和偏移量。有关更多信息,请参见 使用 KafkaTemplate 接收

添加了 CommonErrorHandler

传统的GenericErrorHandler及其用于记录和批处理监听器的子接口层次结构已被新的单一接口CommonErrorHandler取代,该接口的实现对应于大多数GenericErrorHandler的传统实现。有关更多信息,请参见容器错误处理程序将自定义传统错误处理程序实现迁移到CommonErrorHandler

监听器容器更改

interceptBeforeTx容器属性现在默认情况下为true

authorizationExceptionRetryInterval属性已重命名为authExceptionRetryInterval,现在除了以前AuthorizationException之外,还适用于AuthenticationException。这两种异常都被视为致命异常,容器默认情况下将停止,除非设置此属性。

有关更多信息,请参见使用KafkaMessageListenerContainer监听器容器属性

序列化器/反序列化器更改

现在提供了DelegatingByTopicSerializerDelegatingByTopicDeserializer。有关更多信息,请参见委托序列化器和反序列化器

DeadLetterPublishingRecover更改

属性stripPreviousExceptionHeaders现在默认情况下为true

现在有几种技术可以自定义添加到输出记录中的标头。

有关更多信息,请参见管理死信记录标头

可重试主题更改

现在,您可以对可重试主题和不可重试主题使用相同的工厂。有关更多信息,请参见指定监听器容器工厂

现在有一个可管理的全局致命异常列表,这些异常将使失败的记录直接进入DLT。请参阅异常分类器,了解如何管理它。

现在,您可以结合使用阻塞和非阻塞重试。有关更多信息,请参见结合阻塞和非阻塞重试

使用可重试主题功能时抛出的 KafkaBackOffException 现在在 DEBUG 级别记录。如果您需要将日志级别更改回 WARN 或设置为任何其他级别,请参阅 更改 KafkaBackOffException 日志级别

2.6 和 2.7 之间的更改

Kafka 客户端版本

此版本需要 2.7.0 kafka-clients。它也与 2.8.0 客户端兼容,从 2.7.1 版本开始;请参阅 覆盖 Spring Boot 依赖项

使用主题的非阻塞延迟重试

此重大新功能在此版本中添加。当严格排序不重要时,失败的传递可以发送到另一个主题以供稍后使用。可以配置一系列这样的重试主题,并具有递增的延迟。有关更多信息,请参阅 非阻塞重试

监听器容器更改

onlyLogRecordMetadata 容器属性现在默认情况下为 true

现在可以使用新的容器属性 stopImmediate

有关更多信息,请参阅 监听器容器属性

在传递尝试之间使用 BackOff 的错误处理程序(例如 SeekToCurrentErrorHandlerDefaultAfterRollbackProcessor)现在将在容器停止后不久退出回退间隔,而不是延迟停止。

扩展 FailedRecordProcessor 的错误处理程序和回滚后处理器现在可以使用一个或多个 RetryListener 配置,以接收有关重试和恢复进度的信息。

RecordInterceptor 现在具有在监听器返回后(正常或通过抛出异常)调用的附加方法。它还有一个子接口 ConsumerAwareRecordInterceptor。此外,现在有一个 BatchInterceptor 用于批处理监听器。有关更多信息,请参阅 消息监听器容器

@KafkaListener 更改

您现在可以验证 @KafkaHandler 方法(类级监听器)的有效负载参数。有关更多信息,请参阅 @KafkaListener @Payload 验证

现在,您可以在 MessagingMessageConverterBatchMessagingMessageConverter 上设置 rawRecordHeader 属性,这会导致原始 ConsumerRecord 被添加到转换后的 Message<?> 中。例如,如果您希望在监听器错误处理程序中使用 DeadLetterPublishingRecoverer,这将非常有用。有关更多信息,请参阅 监听器错误处理程序

您现在可以在应用程序初始化期间修改 @KafkaListener 注解。有关更多信息,请参阅 @KafkaListener 属性修改

DeadLetterPublishingRecover 更改

现在,如果键和值都无法反序列化,则原始值将发布到 DLT。以前,值已填充,但键 DeserializationException 仍然保留在标头中。如果您是继承了恢复器并覆盖了 createProducerRecord 方法,则存在一个破坏性 API 更改。

此外,恢复器会验证目标解析器选择的分区在发布之前是否实际存在。

有关更多信息,请参阅 发布死信记录

ChainedKafkaTransactionManager 已弃用

有关更多信息,请参阅 事务

ReplyingKafkaTemplate 更改

现在有一种机制可以检查回复,如果存在某些条件,则使 future 异常失败。

已添加对发送和接收 spring-messaging Message<?> 的支持。

有关更多信息,请参阅 使用 ReplyingKafkaTemplate

Kafka Streams 更改

默认情况下,StreamsBuilderFactoryBean 现在配置为不清理本地状态。有关更多信息,请参阅 配置

KafkaAdmin 更改

已添加新方法 createOrModifyTopicsdescribeTopics。已添加 KafkaAdmin.NewTopics 以便于在单个 bean 中配置多个主题。有关更多信息,请参阅 [configuring-topics]

MessageConverter 更改

现在可以将 spring-messaging SmartMessageConverter 添加到 MessagingMessageConverter 中,允许根据 contentType 标头进行内容协商。有关更多信息,请参见 Spring 消息消息转换

排序 @KafkaListener

有关更多信息,请参见 按顺序启动 @KafkaListener

ExponentialBackOffWithMaxRetries

提供了一个新的 BackOff 实现,使配置最大重试次数更加方便。有关更多信息,请参见 ExponentialBackOffWithMaxRetries 实现

条件委托错误处理程序

这些新的错误处理程序可以配置为根据异常类型委托给不同的错误处理程序。有关更多信息,请参见 委托错误处理程序

2.5 和 2.6 之间的更改

Kafka 客户端版本

此版本需要 2.6.0 kafka-clients

监听器容器更改

默认的 EOSMode 现在是 BETA。有关更多信息,请参见 精确一次语义

各种错误处理程序(扩展 FailedRecordProcessor)和 DefaultAfterRollbackProcessor 现在在恢复失败时重置 BackOff。此外,您现在可以根据失败的记录和/或异常选择要使用的 BackOff

您现在可以在容器属性中配置 adviceChain。有关更多信息,请参见 监听器容器属性

当容器配置为发布 ListenerContainerIdleEvent 时,它现在会在接收记录后发布 ListenerContainerNoLongerIdleEvent,此前它发布了一个空闲事件。有关更多信息,请参见 应用程序事件检测空闲和无响应的消费者

@KafkaListener 更改

在使用手动分区分配时,您现在可以指定一个通配符来确定应将哪些分区重置为初始偏移量。此外,如果监听器实现了 ConsumerSeekAware,则在手动分配后会调用 onPartitionsAssigned()。(也在 2.5.5 版本中添加)。有关更多信息,请参见 显式分区分配

已向 AbstractConsumerSeekAware 添加了便利方法,以简化搜索操作。有关更多信息,请参见 [seek]

错误处理程序更改

FailedRecordProcessor 的子类(例如 SeekToCurrentErrorHandlerDefaultAfterRollbackProcessorRecoveringBatchErrorHandler)现在可以配置为,如果异常类型与之前针对此记录发生的异常类型不同,则重置重试状态。

生产者工厂更改

您现在可以为生产者设置最大生存时间,超过该时间后,生产者将被关闭并重新创建。有关更多信息,请参阅 事务

您现在可以在创建 DefaultKafkaProducerFactory 后更新配置映射。例如,如果您需要在凭据更改后更新 SSL 密钥/信任库位置,这将很有用。有关更多信息,请参阅 使用 DefaultKafkaProducerFactory

2.4 和 2.5 之间的更改

本节介绍从版本 2.4 到版本 2.5 的更改。有关早期版本的更改,请参阅 更改历史记录

消费者/生产者工厂更改

默认的消费者和生产者工厂现在可以在创建或关闭消费者或生产者时调用回调。提供了用于本机 Micrometer 指标的实现。有关更多信息,请参阅 工厂侦听器

您现在可以在运行时更改引导服务器属性,从而启用故障转移到另一个 Kafka 集群。有关更多信息,请参阅 连接到 Kafka

StreamsBuilderFactoryBean 更改

工厂 Bean 现在可以在创建或销毁 KafkaStreams 时调用回调。提供了用于本机 Micrometer 指标的实现。有关更多信息,请参阅 KafkaStreams Micrometer 支持

Kafka 客户端版本

此版本需要 2.5.0 kafka-clients

类/包更改

SeekUtils 已从 o.s.k.support 包移动到 o.s.k.listener

传递尝试标头

现在可以使用某些错误处理程序和回滚处理器后添加一个跟踪传递尝试的标头。有关更多信息,请参见 传递尝试标头

@KafkaListener 更改

如果 @KafkaListener 返回类型为 Message<?>,则现在将自动填充默认回复标头(如果需要)。有关更多信息,请参见 回复类型 Message<?>

当传入记录的键为 null 时,KafkaHeaders.RECEIVED_MESSAGE_KEY 将不再填充 null 值;该标头将完全省略。

@KafkaListener 方法现在可以指定 ConsumerRecordMetadata 参数,而不是使用用于元数据(如主题、分区等)的离散标头。有关更多信息,请参见 消费者记录元数据

监听器容器更改

assignmentCommitOption 容器属性现在默认情况下为 LATEST_ONLY_NO_TX。有关更多信息,请参见 监听器容器属性

使用事务时,subBatchPerPartition 容器属性现在默认情况下为 true。有关更多信息,请参见 事务

现在提供了一个新的 RecoveringBatchErrorHandler

现在支持静态组成员资格。有关更多信息,请参见 消息监听器容器

如果配置了增量/协作重新平衡,如果偏移量未能使用非致命 RebalanceInProgressException 提交,则容器将尝试重新提交重新平衡完成后仍分配给此实例的分区的偏移量。

默认错误处理程序现在是记录监听器的 SeekToCurrentErrorHandler 和批处理监听器的 RecoveringBatchErrorHandler。有关更多信息,请参见 容器错误处理程序

现在可以控制标准错误处理程序故意抛出的异常的日志记录级别。有关更多信息,请参见 容器错误处理程序

已添加 getAssignmentsByClientId() 方法,使确定并发容器中的哪些消费者分配了哪些分区变得更加容易。有关更多信息,请参见 监听器容器属性

现在可以抑制在错误、调试日志等中记录整个 ConsumerRecord。请参见 监听器容器属性 中的 onlyLogRecordMetadata

KafkaTemplate 更改

KafkaTemplate 现在可以维护 micrometer 计时器。有关更多信息,请参见 监控

现在可以通过 ProducerConfig 属性配置 KafkaTemplate 来覆盖生产者工厂中的属性。有关更多信息,请参见 使用 KafkaTemplate

现在提供了一个 RoutingKafkaTemplate。有关更多信息,请参见 使用 RoutingKafkaTemplate

现在可以使用 KafkaSendCallback 代替 ListenerFutureCallback 来获取更窄的异常,从而更容易提取失败的 ProducerRecord。有关更多信息,请参见 使用 KafkaTemplate

Kafka 字符串序列化器/反序列化器

现在提供了新的 ToStringSerializer/StringDeserializer 以及相关的 SerDe。有关更多信息,请参见 字符串序列化

JsonDeserializer

JsonDeserializer 现在具有更大的灵活性来确定反序列化类型。有关更多信息,请参见 使用方法确定类型

委托序列化器/反序列化器

DelegatingSerializer 现在可以处理“标准”类型,当出站记录没有头时。有关更多信息,请参见 委托序列化器和反序列化器

测试更改

KafkaTestUtils.consumerProps() 帮助程序记录现在默认将 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG 设置为 earliest。有关更多信息,请参见 JUnit

2.3 和 2.4 之间的更改

Kafka 客户端版本

此版本需要 2.4.0 kafka-clients 或更高版本,并支持新的增量重新平衡功能。

ConsumerAwareRebalanceListener

ConsumerRebalanceListener 一样,此接口现在还有一个额外的 onPartitionsLost 方法。有关更多信息,请参阅 Apache Kafka 文档。

ConsumerRebalanceListener 不同,默认实现不会调用 onPartitionsRevoked。相反,监听器容器将在调用 onPartitionsLost 后调用该方法;因此,在实现 ConsumerAwareRebalanceListener 时,您不应该这样做。

有关更多信息,请参见 重新平衡监听器 末尾的“重要”说明。

GenericErrorHandler

isAckAfterHandle() 的默认实现现在默认返回 true。

KafkaTemplate

KafkaTemplate 现在支持非事务性发布以及事务性发布。有关更多信息,请参阅 KafkaTemplate 事务性和非事务性发布

AggregatingReplyingKafkaTemplate

releaseStrategy 现在是一个 BiConsumer。它现在在超时后(以及记录到达时)被调用;在超时后调用时,第二个参数为 true

有关更多信息,请参阅 聚合多个回复

监听器容器

ContainerProperties 提供了一个 authorizationExceptionRetryInterval 选项,允许监听器容器在 KafkaConsumer 抛出任何 AuthorizationException 后重试。有关更多信息,请参阅其 JavaDocs 和 使用 KafkaMessageListenerContainer

@KafkaListener

@KafkaListener 注解有一个新的属性 splitIterables;默认值为 true。当回复监听器返回一个 Iterable 时,此属性控制是否将返回结果作为单个记录发送,或者为每个元素发送一个记录。有关更多信息,请参阅 使用 @SendTo 转发监听器结果

批处理监听器现在可以使用 BatchToRecordAdapter 进行配置;例如,这允许在事务中处理批处理,而监听器一次获取一条记录。使用默认实现,ConsumerRecordRecoverer 可用于处理批处理中的错误,而不会停止整个批处理的处理 - 这在使用事务时可能很有用。有关更多信息,请参阅 批处理监听器的事务

Kafka Streams

StreamsBuilderFactoryBean 接受一个新的属性 KafkaStreamsInfrastructureCustomizer。这允许在创建流之前配置构建器和/或拓扑。有关更多信息,请参阅 Spring 管理

2.2 和 2.3 之间的更改

本节介绍从版本 2.2 到版本 2.3 的更改。

提示、技巧和示例

添加了一个新章节 提示、技巧和示例。请提交 GitHub 问题和/或拉取请求以在该章节中添加更多条目。

Kafka 客户端版本

此版本需要 2.3.0 kafka-clients 或更高版本。

类/包更改

TopicPartitionInitialOffset 已被弃用,建议使用 TopicPartitionOffset

配置变更

从 2.3.4 版本开始,missingTopicsFatal 容器属性默认值为 false。当该属性为 true 时,如果 Broker 宕机,应用程序将无法启动;许多用户受到此更改的影响;鉴于 Kafka 是一个高可用性平台,我们没有预料到启动没有活动 Broker 的应用程序会是一个常见用例。

生产者和消费者工厂变更

DefaultKafkaProducerFactory 现在可以配置为每个线程创建一个生产者。您也可以在构造函数中提供 Supplier<Serializer> 实例,作为配置类(需要无参数构造函数)或使用 Serializer 实例构造(然后在所有生产者之间共享)的替代方案。有关更多信息,请参阅 使用 DefaultKafkaProducerFactory

DefaultKafkaConsumerFactory 中也提供了 Supplier<Deserializer> 实例的相同选项。有关更多信息,请参阅 使用 KafkaMessageListenerContainer

监听器容器变更

以前,当使用监听器适配器(例如 @KafkaListener)调用监听器时,错误处理程序会收到 ListenerExecutionFailedException(实际监听器异常作为 cause)。由原生 GenericMessageListener 抛出的异常会原封不动地传递给错误处理程序。现在,ListenerExecutionFailedException 始终是参数(实际监听器异常作为 cause),它提供了对容器的 group.id 属性的访问。

由于监听器容器有自己的提交偏移量机制,它更喜欢 Kafka 的 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIGfalse。现在,它会自动将其设置为 false,除非在消费者工厂或容器的消费者属性覆盖中明确设置。

ackOnError 属性现在默认值为 false

现在可以在监听器方法中获取消费者的 group.id 属性。有关更多信息,请参阅 获取消费者 group.id

容器添加了一个新的属性recordInterceptor,允许在调用监听器之前检查或修改记录。如果需要调用多个拦截器,还可以使用CompositeRecordInterceptor。有关更多信息,请参阅消息监听器容器

ConsumerSeekAware添加了新的方法,允许您相对于开头、结尾或当前位置执行查找,以及查找大于或等于时间戳的第一个偏移量。有关更多信息,请参阅[seek]

现在提供了一个便利类AbstractConsumerSeekAware来简化查找。有关更多信息,请参阅[seek]

ContainerProperties提供了一个idleBetweenPolls选项,允许监听器容器中的主循环在KafkaConsumer.poll()调用之间休眠。有关更多信息,请参阅其 JavaDocs 和使用KafkaMessageListenerContainer

当使用AckMode.MANUAL(或MANUAL_IMMEDIATE)时,您现在可以通过在Acknowledgment上调用nack来导致重新传递。有关更多信息,请参阅提交偏移量

现在可以使用 Micrometer Timer 来监控监听器性能。有关更多信息,请参阅监控

容器现在发布了与启动相关的其他消费者生命周期事件。有关更多信息,请参阅应用程序事件

事务批处理监听器现在可以支持僵尸围栏。有关更多信息,请参阅事务

监听器容器工厂现在可以使用ContainerCustomizer进行配置,以便在创建和配置每个容器后进一步配置它。有关更多信息,请参阅容器工厂

错误处理程序更改

SeekToCurrentErrorHandler现在将某些异常视为致命异常,并为这些异常禁用重试,在第一次失败时调用恢复器。

SeekToCurrentErrorHandlerSeekToCurrentBatchErrorHandler现在可以配置为在传递尝试之间应用BackOff(线程休眠)。

从版本 2.3.2 开始,恢复的记录的偏移量将在错误处理程序在恢复失败的记录后返回时提交。

当与 ErrorHandlingDeserializer 结合使用时,DeadLetterPublishingRecoverer 现在将发送到死信主题的消息的有效负载设置为无法反序列化的原始值。以前,它是 null,用户代码需要从消息头中提取 DeserializationException。有关更多信息,请参阅 发布死信记录

TopicBuilder

提供了一个新的类 TopicBuilder,用于更方便地创建用于自动主题配置的 NewTopic @Bean。有关更多信息,请参阅 [configuring-topics]

Kafka Streams 更改

您现在可以对 @EnableKafkaStreams 创建的 StreamsBuilderFactoryBean 进行额外配置。有关更多信息,请参阅 Streams 配置

现在提供了一个 RecoveringDeserializationExceptionHandler,它允许恢复具有反序列化错误的记录。它可以与 DeadLetterPublishingRecoverer 结合使用,将这些记录发送到死信主题。有关更多信息,请参阅 从反序列化异常中恢复

提供了 HeaderEnricher 变换器,使用 SpEL 生成头值。有关更多信息,请参阅 头增强器

提供了 MessagingTransformer。这允许 Kafka Streams 拓扑与 Spring Messaging 组件(例如 Spring Integration 流)进行交互。有关更多信息,请参阅 MessagingProcessor 以及 [从 KStream 调用 Spring Integration 流]。

JSON 组件更改

现在,所有支持 JSON 的组件默认情况下都使用由 JacksonUtils.enhancedObjectMapper() 生成的 Jackson ObjectMapper 进行配置。JsonDeserializer 现在提供基于 TypeReference 的构造函数,以更好地处理目标泛型容器类型。此外,还引入了 JacksonMimeTypeModule 用于将 org.springframework.util.MimeType 序列化为普通字符串。有关更多信息,请参阅其 JavaDocs 和 序列化、反序列化和消息转换

我们提供了一个 ByteArrayJsonMessageConverter,以及一个所有 Json 转换器的新的超类 JsonMessageConverter。此外,现在可以使用 StringOrBytesSerializer;它可以在 ProducerRecord 中序列化 byte[]BytesString 值。有关更多信息,请参见 Spring 消息传递消息转换

JsonSerializerJsonDeserializerJsonSerde 现在具有流畅的 API,使程序化配置更简单。有关更多信息,请参见 javadoc、序列化、反序列化和消息转换 以及 流 JSON 序列化和反序列化

ReplyingKafkaTemplate

当回复超时时,future 将以 KafkaReplyTimeoutException 而不是 KafkaException 异常完成。

此外,现在提供了一个重载的 sendAndReceive 方法,允许在每个消息的基础上指定回复超时。

AggregatingReplyingKafkaTemplate

通过聚合来自多个接收者的回复来扩展 ReplyingKafkaTemplate。有关更多信息,请参见 聚合多个回复

事务更改

您现在可以在 KafkaTemplateKafkaTransactionManager 上覆盖生产者工厂的 transactionIdPrefix。有关更多信息,请参见 transactionIdPrefix

新的委托序列化器/反序列化器

框架现在提供了一个委托 SerializerDeserializer,利用标头来启用使用多个键/值类型生成和使用记录。有关更多信息,请参见 委托序列化器和反序列化器

新的重试反序列化器

框架现在提供了一个委托 RetryingDeserializer,用于在发生网络问题等瞬态错误时重试序列化。有关更多信息,请参见 重试反序列化器

2.1 和 2.2 之间的更改

Kafka 客户端版本

此版本需要 2.0.0 kafka-clients 或更高版本。

类和包更改

ContainerProperties 类已从 org.springframework.kafka.listener.config 移动到 org.springframework.kafka.listener

AckMode 枚举已从 AbstractMessageListenerContainer 移动到 ContainerProperties

setBatchErrorHandler()setErrorHandler() 方法已从 ContainerProperties 移动到 AbstractMessageListenerContainerAbstractKafkaListenerContainerFactory

回滚处理后

提供了一种新的 AfterRollbackProcessor 策略。有关更多信息,请参见 回滚后处理器

ConcurrentKafkaListenerContainerFactory 更改

您现在可以使用 ConcurrentKafkaListenerContainerFactory 创建和配置任何 ConcurrentMessageListenerContainer,而不仅仅是那些用于 @KafkaListener 注释的容器。有关更多信息,请参见 容器工厂

监听器容器更改

添加了一个新的容器属性 (missingTopicsFatal)。有关更多信息,请参见 使用 KafkaMessageListenerContainer

当消费者停止时,现在会发出 ConsumerStoppedEvent。有关更多信息,请参见 线程安全

批处理监听器可以选择接收完整的 ConsumerRecords<?, ?> 对象,而不是 List<ConsumerRecord<?, ?>。有关更多信息,请参见 [batch-listeners]

DefaultAfterRollbackProcessorSeekToCurrentErrorHandler 现在可以恢复(跳过)不断失败的记录,默认情况下,在 10 次失败后会这样做。可以将它们配置为将失败的记录发布到死信主题。

从 2.2.4 版本开始,在选择死信主题名称时可以使用消费者的组 ID。

已添加 ConsumerStoppingEvent。有关更多信息,请参见 应用程序事件

现在可以将 SeekToCurrentErrorHandler 配置为在容器配置为 AckMode.MANUAL_IMMEDIATE 时提交已恢复记录的偏移量(从 2.2.4 版本开始)。

@KafkaListener 变化

现在可以通过在注解上设置属性来覆盖监听器容器工厂的concurrencyautoStartup属性。现在可以添加配置来确定将哪些头(如果有)复制到回复消息中。有关更多信息,请参见@KafkaListener 注解

现在可以在您自己的注解上使用@KafkaListener作为元注解。有关更多信息,请参见@KafkaListener 作为元注解

现在更容易为@Payload验证配置Validator。有关更多信息,请参见@KafkaListener @Payload 验证

现在可以在注解上直接指定 Kafka 消费者属性;这些属性将覆盖消费者工厂中定义的具有相同名称的任何属性(从版本 2.2.4 开始)。有关更多信息,请参见注解属性

头映射变化

类型为MimeTypeMediaType的头现在在RecordHeader值中映射为简单字符串。以前,它们被映射为 JSON,并且只有MimeType被解码。MediaType无法解码。现在它们是简单的字符串,用于互操作性。

此外,DefaultKafkaHeaderMapper有一个新的addToStringClasses方法,允许通过使用toString()而不是 JSON 来指定应该映射的类型。有关更多信息,请参见消息头

嵌入式 Kafka 变化

KafkaEmbedded类及其KafkaRule接口已被弃用,取而代之的是EmbeddedKafkaBroker及其 JUnit 4 EmbeddedKafkaRule包装器。@EmbeddedKafka注解现在填充EmbeddedKafkaBroker bean 而不是已弃用的KafkaEmbedded。此更改允许在 JUnit 5 测试中使用@EmbeddedKafka@EmbeddedKafka注解现在具有属性ports,用于指定填充EmbeddedKafkaBroker的端口。有关更多信息,请参见测试应用程序

JsonSerializer/Deserializer 增强功能

现在可以使用生产者和消费者属性提供类型映射信息。

反序列化器上提供了新的构造函数,允许使用提供的目标类型覆盖类型头信息。

JsonDeserializer 现在默认情况下会删除任何类型信息头。

现在可以使用 Kafka 属性配置 JsonDeserializer 以忽略类型信息头(自 2.2.3 版本起)。

有关更多信息,请参阅 序列化、反序列化和消息转换

Kafka Streams 更改

流配置 Bean 现在必须是 KafkaStreamsConfiguration 对象,而不是 StreamsConfig 对象。

StreamsBuilderFactoryBean 已从 …​core 包移动到 …​config 包。

引入了 KafkaStreamBrancher,以便在 KStream 实例之上构建条件分支时提供更好的最终用户体验。

有关更多信息,请参阅 Apache Kafka Streams 支持配置

事务 ID

当监听器容器启动事务时,transactional.id 现在是 transactionIdPrefix 附加 <group.id>.<topic>.<partition>。此更改允许对僵尸进行适当的围栏,如本文所述

2.0 和 2.1 之间的更改

Kafka 客户端版本

此版本需要 1.0.0 kafka-clients 或更高版本。

1.1.x 客户端在 2.2 版本中得到原生支持。

JSON 改进

StringJsonMessageConverterJsonSerializer 现在在 Headers 中添加类型信息,使转换器和 JsonDeserializer 能够根据消息本身而不是固定配置的类型在接收时创建特定类型。有关更多信息,请参阅 序列化、反序列化和消息转换

容器停止错误处理程序

现在为记录和批处理监听器都提供了容器错误处理程序,这些处理程序将监听器抛出的任何异常视为致命错误/它们会停止容器。有关更多信息,请参阅 处理异常

暂停和恢复容器

监听器容器现在具有 pause()resume() 方法(自 2.1.3 版本起)。有关更多信息,请参阅 暂停和恢复监听器容器

有状态重试

从 2.1.3 版本开始,您可以配置有状态重试。有关更多信息,请参阅 有状态重试

客户端 ID

从 2.1.1 版本开始,您现在可以在 @KafkaListener 上设置 client.id 前缀。以前,要自定义客户端 ID,您需要为每个监听器使用单独的消费者工厂(和容器工厂)。该前缀后缀为 -n,以便在您使用并发时提供唯一的客户端 ID。

记录偏移量提交

默认情况下,主题偏移量提交的记录使用 DEBUG 日志级别执行。从 2.1.2 版本开始,ContainerProperties 中的一个新属性 commitLogLevel 允许您指定这些消息的日志级别。有关更多信息,请参阅 使用 KafkaMessageListenerContainer

默认 @KafkaHandler

从 2.1.3 版本开始,您可以将类级别 @KafkaListener 上的 @KafkaHandler 注释之一指定为默认注释。有关更多信息,请参阅 类上的 @KafkaListener

ReplyingKafkaTemplate

从 2.1.3 版本开始,提供了一个 KafkaTemplate 的子类来支持请求/回复语义。有关更多信息,请参阅 使用 ReplyingKafkaTemplate

ChainedKafkaTransactionManager

2.1.3 版本引入了 ChainedKafkaTransactionManager。(它现在已弃用)。

从 2.0 迁移指南

请参阅 2.0 到 2.1 迁移 指南。

1.3 和 2.0 之间的更改

Spring 框架和 Java 版本

Spring for Apache Kafka 项目现在需要 Spring 框架 5.0 和 Java 8。

@KafkaListener 更改

您现在可以使用 @SendTo 注释 @KafkaListener 方法(以及类和 @KafkaHandler 方法)。如果方法返回结果,则会将其转发到指定的主题。有关更多信息,请参阅 使用 @SendTo 转发监听器结果

消息监听器

消息监听器现在可以感知到 Consumer 对象。有关更多信息,请参阅 [message-listeners]

使用 ConsumerAwareRebalanceListener

重新平衡监听器现在可以在重新平衡通知期间访问 Consumer 对象。有关更多信息,请参阅 重新平衡监听器

1.2 和 1.3 之间的变化

对事务的支持

0.11.0.0 客户端库添加了对事务的支持。KafkaTransactionManager 和其他对事务的支持已添加。有关更多信息,请参阅 事务

对标头的支持

0.11.0.0 客户端库添加了对消息标头的支持。这些现在可以映射到 spring-messaging MessageHeaders 以及从 spring-messaging MessageHeaders 映射。有关更多信息,请参阅 消息标头

创建主题

0.11.0.0 客户端库提供了一个 AdminClient,您可以使用它来创建主题。KafkaAdmin 使用此客户端自动添加定义为 @Bean 实例的主题。

对 Kafka 时间戳的支持

KafkaTemplate 现在支持一个 API 来添加带有时间戳的记录。已引入新的 KafkaHeaders 来支持 timestamp。此外,还添加了新的 KafkaConditions.timestamp()KafkaMatchers.hasTimestamp() 测试实用程序。有关更多详细信息,请参阅 使用 KafkaTemplate@KafkaListener 注解测试应用程序

@KafkaListener 更改

您现在可以配置 KafkaListenerErrorHandler 来处理异常。有关更多信息,请参阅 处理异常

默认情况下,@KafkaListenerid 属性现在用作 group.id 属性,覆盖消费者工厂中配置的属性(如果存在)。此外,您可以在注解上显式配置 groupId。以前,您需要单独的容器工厂(和消费者工厂)才能为监听器使用不同的 group.id 值。要恢复使用工厂配置的 group.id 的先前行为,请将注解上的 idIsGroup 属性设置为 false

@EmbeddedKafka 注解

为了方便起见,提供了一个测试类级别的 @EmbeddedKafka 注解,用于将 KafkaEmbedded 注册为 bean。有关更多信息,请参阅 测试应用程序

Kerberos 配置

现在提供对配置 Kerberos 的支持。有关更多信息,请参阅 JAAS 和 Kerberos

1.1 和 1.2 之间的更改

此版本使用 0.10.2.x 客户端。

1.0 和 1.1 之间的更改

Kafka 客户端

此版本使用 Apache Kafka 0.10.x.x 客户端。

批处理监听器

可以将监听器配置为接收 consumer.poll() 操作返回的整个消息批次,而不是一次接收一条消息。

空有效负载

空有效负载用于在使用日志压缩时“删除”键。

初始偏移量

在显式分配分区时,您现在可以配置相对于消费者组的当前位置的初始偏移量,而不是相对于当前结束位置的绝对偏移量或相对偏移量。

查找

您现在可以查找每个主题或分区的当前位置。您可以使用它在初始化期间设置初始位置,此时组管理正在使用中,并且 Kafka 分配了分区。您也可以在检测到空闲容器时或在应用程序执行过程中的任意点进行查找。有关更多信息,请参阅 [查找]