变更历史

3.1 相较于 3.0 的新增功能

本节介绍了从 3.0 版到 3.1 版所做的更改。有关早期版本的更改,请参阅 变更历史

Kafka 客户端版本

此版本需要 3.6.0 kafka-clients

EmbeddedKafkaBroker

现在提供了额外的实现来使用 Kraft 而不是 Zookeeper。有关更多信息,请参阅 嵌入式 Kafka 代理

JsonDeserializer

当发生反序列化异常时,SerializationException 消息不再包含 Can’t deserialize data [[123, 34, 98, 97, 122, …​ 形式的数据;每个数据字节的数值数组并没有用,对于大型数据来说可能会很冗长。当与 ErrorHandlingDeserializer 一起使用时,发送到错误处理程序的 DeserializationException 包含 data 属性,该属性包含无法反序列化的原始数据。当不与 ErrorHandlingDeserializer 一起使用时,KafkaConsumer 将持续为同一记录发出异常,显示主题/分区/偏移量以及 Jackson 引发的异常原因。

ContainerPostProcessor

可以通过在 @KafkaListener 注解上指定 ContainerPostProcessor 的 Bean 名称,对监听器容器应用后处理。这发生在容器创建之后以及在容器工厂上配置的任何已配置的 ContainerCustomizer 之后。有关更多信息,请参阅 容器工厂

ErrorHandlingDeserializer

您现在可以向此反序列化程序添加 Validator;如果委托 Deserializer 成功地反序列化了对象,但该对象验证失败,则会抛出一个类似于反序列化异常的异常。这允许将原始原始数据传递到错误处理程序。有关更多信息,请参阅 使用 ErrorHandlingDeserializer

可重试主题

@RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2", fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC) 时,将后缀 -retry-5000 更改为 -retry。如果要保留后缀 -retry-5000,请使用 @RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2")。有关更多信息,请参阅 主题命名

监听器容器更改

当手动分配分区时,使用 null 消费者 group.idAckMode 现在会自动强制转换为 MANUAL。有关更多信息,请参阅 手动分配所有分区

3.0 相较于 2.9 的新增功能

Kafka 客户端版本

此版本需要 3.3.1 kafka-clients

精确一次语义

不再支持 EOSMode.V1(又名 ALPHA)。

使用事务时,最低代理版本为 2.5。

有关更多信息,请参阅 精确一次语义KIP-447

观测

现在支持使用 Micrometer 启用计时器和跟踪的观测。有关更多信息,请参阅 观测

原生镜像

提供了对创建原生镜像的支持。有关更多信息,请参阅 原生镜像

全局单个嵌入式 Kafka

嵌入式 Kafka(EmbeddedKafkaBroker)现在可以作为整个测试计划的单个全局实例启动。有关更多信息,请参阅 对多个测试类使用相同的代理

可重试主题更改

此功能不再被视为实验性的(就其 API 而言),此功能本身已从 2.7 版开始支持,但 API 更改的可能性高于正常水平。

在此版本中,非阻塞重试 基础设施 Bean 的引导已更改,以避免某些应用程序在应用程序初始化方面发生的一些时间问题。

您现在可以为重试容器设置不同的 并发性;默认情况下,并发性与主容器相同。

@RetryableTopic 现在可以用作自定义注释上的元注释,包括对 @AliasFor 属性的支持。

有关更多信息,请参阅 配置

重试主题的默认复制因子现在为 -1(使用代理默认值)。如果您的代理早于 2.4 版,则现在需要显式设置该属性。

您现在可以在同一应用程序上下文中同一主题上配置多个 @RetryableTopic 监听器。以前,这是不可能的。有关更多信息,请参阅 多个监听器,相同主题

RetryTopicConfigurationSupport 中存在重大 API 更改;具体来说,如果您覆盖了 destinationTopicResolverkafkaConsumerBackoffManager 和/或 retryTopicConfigurer 的 bean 定义方法;这些方法现在需要一个 ObjectProvider<RetryTopicComponentFactory> 参数。

监听器容器更改

容器现在发布与消费者身份验证和授权失败相关的事件。有关更多信息,请参见 应用程序事件

您现在可以自定义消费者线程使用的线程名称。有关更多信息,请参见 容器线程命名

添加了容器属性 restartAfterAuthException。有关更多信息,请参见 监听器容器属性

KafkaTemplate 更改

此类返回的 Future 现在是 CompletableFuture 而不是 ListenableFuture。请参见 使用 KafkaTemplate

ReplyingKafkaTemplate 更改

此类返回的 Future 现在是 CompletableFuture 而不是 ListenableFuture。请参见 使用 ReplyingKafkaTemplate使用 Message<?> 进行请求/回复

@KafkaListener 更改

您现在可以使用自定义关联标头,该标头将在任何回复消息中进行回显。有关更多信息,请参见 使用 ReplyingKafkaTemplate 末尾的注释。

您现在可以手动提交批处理的一部分,然后再处理整个批处理。有关更多信息,请参见 提交偏移量

KafkaHeaders 更改

KafkaHeaders 中四个在 2.9.x 中已弃用的常量现已删除。

  • 请使用 KEY 代替 MESSAGE_KEY

  • 请使用 PARTITION 代替 PARTITION_ID

类似地,RECEIVED_MESSAGE_KEYRECEIVED_KEY 替换,RECEIVED_PARTITION_IDRECEIVED_PARTITION 替换。

测试更改

3.0.7 版本引入了 MockConsumerFactoryMockProducerFactory。有关更多信息,请参见 模拟消费者和生产者

从 3.0.10 版本开始,嵌入式 Kafka 代理默认情况下会将 Spring Boot 属性 spring.kafka.bootstrap-servers 设置为嵌入式代理的地址。

2.9 相比 2.8 的新增功能

Kafka 客户端版本

此版本需要 3.2.0 kafka-clients

错误处理程序更改

DefaultErrorHandler 现在可以配置为暂停容器一个轮询并使用上一个轮询的剩余结果,而不是跳转到剩余记录的偏移量。有关更多信息,请参见 DefaultErrorHandler

DefaultErrorHandler 现在具有 BackOffHandler 属性。有关更多信息,请参见 回退处理程序

监听器容器更改

interceptBeforeTx 现在适用于所有事务管理器(以前仅在使用 KafkaAwareTransactionManager 时应用)。请参见 [interceptBeforeTx]

提供了一个新的容器属性 pauseImmediate,它允许容器在处理完当前记录后暂停消费者,而不是在处理完上一个轮询的所有记录后暂停。请参见 [pauseImmediate]

与消费者身份验证和授权相关的事件

标头映射器更改

您现在可以配置要映射哪些入站标头。在 2.8.8 或更高版本中也可用。有关更多信息,请参见 消息标头

KafkaTemplate 更改

在 3.0 中,此类返回的 Future 将是 CompletableFuture 而不是 ListenableFuture。请参见 使用 KafkaTemplate 以在使用此版本时进行过渡。

ReplyingKafkaTemplate 更改

模板现在提供了一种方法来等待回复容器上的分配,以避免在回复容器初始化之前发送请求时出现竞争。在 2.8.8 或更高版本中也可用。请参见 使用 ReplyingKafkaTemplate

在 3.0 中,此类返回的 Future 将是 CompletableFuture 而不是 ListenableFuture。请参见 使用 ReplyingKafkaTemplate使用 Message<?> 进行请求/回复 以在使用此版本时进行过渡。

2.8 相比 2.7 的新增功能

本节介绍从 2.7 版到 2.8 版所做的更改。有关早期版本的更改,请参见 更改历史记录

Kafka 客户端版本

此版本需要 3.0.0 kafka-clients

包更改

与类型映射相关的类和接口已从 …​support.converter 移动到 …​support.mapping

  • AbstractJavaTypeMapper

  • ClassMapper

  • DefaultJackson2JavaTypeMapper

  • Jackson2JavaTypeMapper

无序手动提交

监听器容器现在可以配置为接受无序(通常是异步)的手动偏移量提交。容器将推迟提交,直到确认缺少的偏移量。有关更多信息,请参见 手动提交偏移量

@KafkaListener 更改

现在可以在方法本身指定监听器方法是否为批处理监听器。这允许对记录和批处理监听器都使用相同的容器工厂。

有关更多信息,请参见 [batch-listeners]

批处理监听器现在可以处理转换异常。

有关更多信息,请参见 批处理错误处理程序的转换错误

RecordFilterStrategy 在与批处理监听器一起使用时,现在可以在一次调用中过滤整个批处理。有关更多信息,请参见 [batch-listeners] 末尾的注释。

@KafkaListener 注释现在具有 filter 属性,用于覆盖仅此监听器的容器工厂的 RecordFilterStrategy

@KafkaListener 注释现在具有 info 属性;它用于填充新的监听器容器属性 listenerInfo。然后,它用于在每个记录中填充 KafkaHeaders.LISTENER_INFO 标头,该标头可以在 RecordInterceptorRecordFilterStrategy 或监听器本身中使用。有关更多信息,请参见 监听器信息标头抽象监听器容器属性

KafkaTemplate 更改

您现在可以接收单个记录,前提是给定主题、分区和偏移量。有关更多信息,请参见 使用 KafkaTemplate 接收

添加 CommonErrorHandler

旧的 GenericErrorHandler 及其用于记录和批处理监听器的子接口层次结构已被新的单个接口 CommonErrorHandler 替换,该接口的实现对应于 GenericErrorHandler 的大多数旧实现。有关更多信息,请参见 容器错误处理程序将自定义旧错误处理程序实现迁移到 CommonErrorHandler

监听器容器更改

interceptBeforeTx 容器属性现在默认为 true

authorizationExceptionRetryInterval 属性已重命名为 authExceptionRetryInterval,现在除了以前适用的 AuthorizationException 之外,还适用于 AuthenticationException。这两个异常都被视为致命异常,默认情况下容器将停止,除非设置此属性。

有关更多信息,请参见 使用 KafkaMessageListenerContainer监听器容器属性

序列化程序/反序列化程序更改

现在提供了 DelegatingByTopicSerializerDelegatingByTopicDeserializer。有关更多信息,请参见 委托序列化程序和反序列化程序

DeadLetterPublishingRecover 更改

属性 stripPreviousExceptionHeaders 现在默认为 true

现在有几种技术可以自定义添加到输出记录中的标头。

有关更多信息,请参见 管理死信记录标头

可重试主题更改

现在,您可以对可重试主题和不可重试主题使用相同的工厂。有关更多信息,请参见 指定 ListenerContainerFactory

现在有一个可管理的全局致命异常列表,这些异常将使失败的记录直接进入 DLT。请参阅 异常分类器 以了解如何管理它。

您现在可以结合使用阻塞和非阻塞重试。有关更多信息,请参见 组合阻塞和非阻塞重试

使用可重试主题功能时抛出的 KafkaBackOffException 现在记录在 DEBUG 级别。如果您需要将日志级别更改回 WARN 或将其设置为任何其他级别,请参见 更改 KafkaBackOffException 日志级别

2.6 和 2.7 之间的更改

Kafka 客户端版本

此版本需要 2.7.0 版的 kafka-clients。从 2.7.1 版开始,它也兼容 2.8.0 版的客户端;请参阅 覆盖 Spring Boot 依赖项

使用主题的非阻塞延迟重试

此版本新增了此重要功能。当严格排序不重要时,失败的传递可以发送到另一个主题,以便稍后使用。可以配置一系列此类重试主题,并设置递增的延迟。有关更多信息,请参阅 非阻塞重试

侦听器容器更改

onlyLogRecordMetadata 容器属性现在默认为 true

现在可以使用新的容器属性 stopImmediate

有关更多信息,请参阅 侦听器容器属性

在传递尝试之间使用 BackOff 的错误处理程序(例如 SeekToCurrentErrorHandlerDefaultAfterRollbackProcessor)现在将在容器停止后立即退出回退间隔,而不是延迟停止。

扩展 FailedRecordProcessor 的错误处理程序和回滚后处理器现在可以配置一个或多个 RetryListener 来接收有关重试和恢复进度的信息。

RecordInterceptor 现在具有在侦听器返回(正常或通过抛出异常)后调用的其他方法。它还有一个子接口 ConsumerAwareRecordInterceptor。此外,现在还有一个用于批处理侦听器的 BatchInterceptor。有关更多信息,请参阅 消息侦听器容器

@KafkaListener 更改

您现在可以验证 @KafkaHandler 方法(类级侦听器)的有效负载参数。有关更多信息,请参阅 @KafkaListener @Payload 验证

您现在可以设置 MessagingMessageConverterBatchMessagingMessageConverter 上的 rawRecordHeader 属性,这会导致原始 ConsumerRecord 添加到转换后的 Message<?> 中。例如,如果您希望在侦听器错误处理程序中使用 DeadLetterPublishingRecoverer,这将非常有用。有关更多信息,请参阅 侦听器错误处理程序

您现在可以在应用程序初始化期间修改 @KafkaListener 注解。有关更多信息,请参阅 @KafkaListener 属性修改

DeadLetterPublishingRecover 更改

现在,如果键和值都反序列化失败,则原始值将发布到 DLT。以前,值已填充,但键 DeserializationException 仍保留在标头中。如果您是 recoverer 的子类并覆盖了 createProducerRecord 方法,则存在一个破坏性 API 更改。

此外,recoverer 会在发布到目标分区之前验证目标解析器选择的分区是否存在。

有关更多信息,请参阅 发布死信记录

ChainedKafkaTransactionManager 已弃用

有关更多信息,请参阅 事务

ReplyingKafkaTemplate 更改

现在有一种机制可以检查回复,如果存在某些条件,则使 future 异常失败。

已添加对发送和接收 spring-messaging Message<?> 的支持。

有关更多信息,请参阅 使用 ReplyingKafkaTemplate

Kafka Streams 更改

默认情况下,StreamsBuilderFactoryBean 现在配置为不清理本地状态。有关更多信息,请参阅 配置

KafkaAdmin 更改

已添加新方法 createOrModifyTopicsdescribeTopics。已添加 KafkaAdmin.NewTopics 以便于在单个 bean 中配置多个主题。有关更多信息,请参阅 [configuring-topics]

MessageConverter 更改

现在可以将 spring-messaging SmartMessageConverter 添加到 MessagingMessageConverter 中,从而允许基于 contentType 标头进行内容协商。有关更多信息,请参阅 Spring 消息消息转换

排序 @KafkaListener

有关更多信息,请参阅 按顺序启动 @KafkaListener

ExponentialBackOffWithMaxRetries

提供了一个新的 BackOff 实现,使配置最大重试次数更加方便。有关更多信息,请参阅 ExponentialBackOffWithMaxRetries 实现

条件委托错误处理程序

可以将这些新的错误处理程序配置为委托给不同的错误处理程序,具体取决于异常类型。有关更多信息,请参阅 委托错误处理程序

2.5 版和 2.6 版之间的更改

Kafka 客户端版本

此版本需要 2.6.0 版的 kafka-clients

侦听器容器更改

默认 EOSMode 现在为 BETA。有关更多信息,请参阅 恰好一次语义

各种错误处理程序(扩展 FailedRecordProcessor)和 DefaultAfterRollbackProcessor 现在会在恢复失败时重置 BackOff。此外,您现在可以根据失败的记录和/或异常选择要使用的 BackOff

您现在可以在容器属性中配置 adviceChain。有关更多信息,请参阅 侦听器容器属性

当容器配置为发布 ListenerContainerIdleEvent 时,现在会在发布空闲事件后接收到记录时发布 ListenerContainerNoLongerIdleEvent。有关更多信息,请参阅 应用程序事件检测空闲和无响应的使用者

@KafkaListener 更改

使用手动分区分配时,您现在可以指定一个通配符来确定应将哪些分区重置为初始偏移量。此外,如果侦听器实现了 ConsumerSeekAware,则在手动分配后会调用 onPartitionsAssigned()。(也添加到 2.5.5 版中)。有关更多信息,请参阅 显式分区分配

已向 AbstractConsumerSeekAware 添加便利方法,以简化搜索操作。有关更多信息,请参阅 [seek]

错误处理程序更改

FailedRecordProcessor 的子类(例如 SeekToCurrentErrorHandlerDefaultAfterRollbackProcessorRecoveringBatchErrorHandler)现在可以配置为重置重试状态,如果异常类型与之前使用此记录发生的异常类型不同。

生产者工厂更改

您现在可以设置生产者的最大生存期,在此生存期之后,它们将被关闭并重新创建。有关更多信息,请参阅 事务

您现在可以在创建 DefaultKafkaProducerFactory 后更新配置映射。例如,如果您必须在凭据更改后更新 SSL 密钥/信任库位置,这将非常有用。有关更多信息,请参阅 使用 DefaultKafkaProducerFactory

2.4 版和 2.5 版之间的更改

本节介绍从 2.4 版到 2.5 版所做的更改。有关早期版本的更改,请参阅 更改历史记录

使用者/生产者工厂更改

默认的使用者和生产者工厂现在可以在创建或关闭使用者或生产者时调用回调。提供了用于原生 Micrometer 指标的实现。有关更多信息,请参阅 工厂侦听器

您现在可以在运行时更改引导服务器属性,从而启用故障转移到另一个 Kafka 集群。有关更多信息,请参阅 连接到 Kafka

StreamsBuilderFactoryBean 更改

工厂 bean 现在可以在创建或销毁 KafkaStreams 时调用回调。提供了用于原生 Micrometer 指标的实现。有关更多信息,请参阅 KafkaStreams Micrometer 支持

Kafka 客户端版本

此版本需要 2.5.0 版的 kafka-clients

类/包更改

SeekUtils 已从 o.s.k.support 包移动到 o.s.k.listener

传递尝试标头

现在有一个选项可以添加一个标头,该标头在使用某些错误处理程序和回滚后处理器时跟踪传递尝试。有关更多信息,请参阅 传递尝试标头

@KafkaListener 更改

如果 @KafkaListener 返回类型为 Message<?>,则现在将自动填充默认回复标头(如果需要)。有关更多信息,请参阅 回复类型 Message<?>

当传入记录具有 null 键时,KafkaHeaders.RECEIVED_MESSAGE_KEY 不再填充 null 值;标头完全被省略。

@KafkaListener 方法现在可以指定 ConsumerRecordMetadata 参数,而不是使用诸如主题、分区等元数据的离散标头。有关更多信息,请参阅 使用者记录元数据

侦听器容器更改

assignmentCommitOption 容器属性现在默认为 LATEST_ONLY_NO_TX。有关更多信息,请参阅 侦听器容器属性

当使用事务时,subBatchPerPartition 容器属性现在默认值为 true。有关更多信息,请参阅事务

现在提供了一个新的 RecoveringBatchErrorHandler

现在支持静态组成员资格。有关更多信息,请参阅消息侦听器容器

如果配置了增量/协作重新平衡,如果偏移量提交失败并出现非致命性 RebalanceInProgressException,则容器将在重新平衡完成后尝试重新提交分配给此实例的分区的偏移量。

对于记录侦听器,默认错误处理程序现在是 SeekToCurrentErrorHandler,对于批处理侦听器,默认错误处理程序是 RecoveringBatchErrorHandler。有关更多信息,请参阅容器错误处理程序

现在可以控制标准错误处理程序故意抛出的异常的日志记录级别。有关更多信息,请参阅容器错误处理程序

已添加 getAssignmentsByClientId() 方法,从而更轻松地确定并发容器中的哪些消费者分配了哪些分区。有关更多信息,请参阅侦听器容器属性

现在可以禁止在错误、调试日志等中记录整个 ConsumerRecord。请参阅侦听器容器属性中的 onlyLogRecordMetadata

KafkaTemplate 更改

KafkaTemplate 现在可以维护 Micrometer 计时器。有关更多信息,请参阅监控

现在可以使用 ProducerConfig 属性配置 KafkaTemplate 以覆盖生产者工厂中的属性。有关更多信息,请参阅使用 KafkaTemplate

现在提供了 RoutingKafkaTemplate。有关更多信息,请参阅使用 RoutingKafkaTemplate

现在可以使用 KafkaSendCallback 代替 ListenerFutureCallback 来获取更窄的异常,从而更轻松地提取失败的 ProducerRecord。有关更多信息,请参阅使用 KafkaTemplate

Kafka 字符串序列化器/反序列化器

现在提供了新的 ToStringSerializer/StringDeserializer 以及关联的 SerDe。有关更多信息,请参阅字符串序列化

JsonDeserializer

JsonDeserializer 现在具有更多灵活性来确定反序列化类型。有关更多信息,请参阅使用方法确定类型

委托序列化器/反序列化器

当出站记录没有标头时,DelegatingSerializer 现在可以处理“标准”类型。有关更多信息,请参阅委托序列化器和反序列化器

测试更改

KafkaTestUtils.consumerProps() 帮助程序记录现在默认将 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG 设置为 earliest。有关更多信息,请参阅JUnit

2.3 和 2.4 之间的更改

Kafka 客户端版本

此版本需要 2.4.0 或更高版本的 kafka-clients,并支持新的增量重新平衡功能。

ConsumerAwareRebalanceListener

ConsumerRebalanceListener 一样,此接口现在还有一个附加方法 onPartitionsLost。有关更多信息,请参阅 Apache Kafka 文档。

ConsumerRebalanceListener 不同,默认实现**不会**调用 onPartitionsRevoked。相反,侦听器容器将在调用 onPartitionsLost 后调用该方法;因此,在实现 ConsumerAwareRebalanceListener 时不应执行相同的操作。

有关更多信息,请参阅重新平衡侦听器末尾的重要说明。

GenericErrorHandler

isAckAfterHandle() 默认实现现在默认返回 true。

KafkaTemplate

KafkaTemplate 现在支持与事务并行进行的非事务发布。有关更多信息,请参阅KafkaTemplate 事务和非事务发布

AggregatingReplyingKafkaTemplate

releaseStrategy 现在是 BiConsumer。它现在在超时后(以及记录到达时)被调用;在超时后调用的情况下,第二个参数为 true

有关更多信息,请参阅聚合多个回复

侦听器容器

ContainerProperties 提供了一个 authorizationExceptionRetryInterval 选项,允许侦听器容器在 KafkaConsumer 抛出任何 AuthorizationException 后重试。有关更多信息,请参阅其 JavaDoc 和使用 KafkaMessageListenerContainer

@KafkaListener

@KafkaListener 注解有一个新的属性 splitIterables;默认为 true。当回复侦听器返回 Iterable 时,此属性控制返回结果是作为单个记录发送还是为每个元素发送记录。有关更多信息,请参阅使用 @SendTo 转发侦听器结果

批处理侦听器现在可以使用 BatchToRecordAdapter 进行配置;例如,这允许在事务中处理批处理,而侦听器一次获取一条记录。使用默认实现,可以使用 ConsumerRecordRecoverer 处理批处理中的错误,而不会停止整个批处理的处理 - 这在使用事务时可能很有用。有关更多信息,请参阅带有批处理侦听器的事务

Kafka Streams

StreamsBuilderFactoryBean 接受一个新的属性 KafkaStreamsInfrastructureCustomizer。这允许在创建流之前配置构建器和/或拓扑。有关更多信息,请参阅Spring 管理

2.2 和 2.3 之间的更改

本节介绍从版本 2.2 到版本 2.3 所做的更改。

提示、技巧和示例

添加了一个新的章节提示、技巧和示例。请提交 GitHub 问题和/或拉取请求以获取该章节中的其他条目。

Kafka 客户端版本

此版本需要 2.3.0 或更高版本的 kafka-clients

类/包更改

TopicPartitionInitialOffset 已弃用,取而代之的是 TopicPartitionOffset

配置更改

从版本 2.3.4 开始,missingTopicsFatal 容器属性默认为 false。如果代理已关闭,当此属性为 true 时,应用程序将无法启动;许多用户受到此更改的影响;鉴于 Kafka 是一个高可用性平台,我们没有预料到启动没有活动代理的应用程序将是常见用例。

生产者和消费者工厂更改

现在可以将 DefaultKafkaProducerFactory 配置为为每个线程创建一个生产者。您还可以在构造函数中提供 Supplier<Serializer> 实例,作为配置类(需要无参数构造函数)或使用 Serializer 实例进行构造(然后在所有生产者之间共享)的替代方法。有关更多信息,请参阅使用 DefaultKafkaProducerFactory

DefaultKafkaConsumerFactory 中的 Supplier<Deserializer> 实例也提供了相同的选项。有关更多信息,请参阅使用 KafkaMessageListenerContainer

侦听器容器更改

以前,当使用侦听器适配器(例如 @KafkaListener)调用侦听器时,错误处理程序会接收 ListenerExecutionFailedException(实际侦听器异常作为 cause)。本机 GenericMessageListener 抛出的异常将未经更改地传递给错误处理程序。现在 ListenerExecutionFailedException 始终是参数(实际侦听器异常作为 cause),它提供了对容器的 group.id 属性的访问。

由于侦听器容器有自己的偏移量提交机制,因此它更喜欢将 Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 设置为 false。它现在会自动将其设置为 false,除非在消费者工厂或容器的消费者属性覆盖中明确设置。

ackOnError 属性现在默认为 false

现在可以在侦听器方法中获取消费者的 group.id 属性。有关更多信息,请参阅获取消费者 group.id

容器有一个新的属性 recordInterceptor,允许在调用侦听器之前检查或修改记录。如果需要调用多个拦截器,也会提供 CompositeRecordInterceptor。有关更多信息,请参阅消息侦听器容器

ConsumerSeekAware 有新的方法允许您相对于开头、结尾或当前位置执行查找,并查找大于或等于时间戳的第一个偏移量。有关更多信息,请参阅[查找]

现在提供了一个便利类 AbstractConsumerSeekAware 来简化查找。有关更多信息,请参阅[查找]

ContainerProperties 提供了一个 idleBetweenPolls 选项,允许侦听器容器中的主循环在 KafkaConsumer.poll() 调用之间休眠。有关更多信息,请参阅其 JavaDoc 和使用 KafkaMessageListenerContainer

当使用 AckMode.MANUAL(或 MANUAL_IMMEDIATE)时,您现在可以通过在 Acknowledgment 上调用 nack 来导致重新传递。有关更多信息,请参阅提交偏移量

现在可以使用 Micrometer Timer 监控侦听器性能。有关更多信息,请参阅监控

容器现在发布与启动相关的其他消费者生命周期事件。有关更多信息,请参阅应用程序事件

事务批处理侦听器现在可以支持僵尸围栏。有关更多信息,请参阅事务

现在可以使用 ContainerCustomizer 配置侦听器容器工厂,以在创建和配置每个容器后进一步配置它。有关更多信息,请参阅容器工厂

ErrorHandler 更改

SeekToCurrentErrorHandler 现在将某些异常视为致命异常,并禁用这些异常的重试,并在第一次失败时调用恢复程序。

SeekToCurrentErrorHandlerSeekToCurrentBatchErrorHandler 现在可以配置为在传递尝试之间应用 BackOff(线程休眠)。

从版本 2.3.2 开始,在错误处理程序恢复失败记录后返回时,将提交恢复记录的偏移量。

DeadLetterPublishingRecoverer 在与 ErrorHandlingDeserializer 一起使用时,现在将发送到死信主题的消息的有效负载设置为无法反序列化的原始值。以前,它是 null,用户代码需要从消息标头中提取 DeserializationException。有关更多信息,请参阅发布死信记录

TopicBuilder

提供了一个新的类TopicBuilder,用于更方便地创建用于自动主题配置的NewTopic @Bean。有关更多信息,请参阅[configuring-topics]

Kafka Streams 更改

现在,您可以对@EnableKafkaStreams创建的StreamsBuilderFactoryBean执行其他配置。有关更多信息,请参阅Streams 配置

现在提供了RecoveringDeserializationExceptionHandler,它允许恢复反序列化错误的记录。它可以与DeadLetterPublishingRecoverer结合使用,将这些记录发送到死信主题。有关更多信息,请参阅从反序列化异常中恢复

已提供HeaderEnricher转换器,使用SpEL生成标头值。有关更多信息,请参阅标头增强器

已提供MessagingTransformer。这允许Kafka Streams拓扑与Spring Messaging组件(例如Spring Integration流)交互。有关更多信息,请参阅MessagingProcessor和参阅[从KStream调用Spring Integration流]

JSON 组件更改

现在,所有支持JSON的组件都默认配置了由JacksonUtils.enhancedObjectMapper()生成的Jackson ObjectMapperJsonDeserializer现在提供基于TypeReference的构造函数,以便更好地处理目标泛型容器类型。此外,还引入了JacksonMimeTypeModule用于将org.springframework.util.MimeType序列化为普通字符串。有关更多信息,请参阅其JavaDocs和序列化、反序列化和消息转换

已提供ByteArrayJsonMessageConverter以及所有Json转换器的新超类JsonMessageConverter。此外,现在可以使用StringOrBytesSerializer;它可以在ProducerRecord中序列化byte[]BytesString值。有关更多信息,请参阅Spring Messaging 消息转换

JsonSerializerJsonDeserializerJsonSerde现在具有流畅的API,使程序配置更简单。有关更多信息,请参阅Javadocs、序列化、反序列化和消息转换Streams JSON 序列化和反序列化

ReplyingKafkaTemplate

当回复超时时,future 将使用KafkaReplyTimeoutException而不是KafkaException异常完成。

此外,现在提供了重载的sendAndReceive方法,允许在每个消息的基础上指定回复超时。

AggregatingReplyingKafkaTemplate

通过聚合来自多个接收者的回复来扩展ReplyingKafkaTemplate。有关更多信息,请参阅聚合多个回复

事务更改

现在,您可以在KafkaTemplateKafkaTransactionManager上覆盖生产者工厂的transactionIdPrefix。有关更多信息,请参阅transactionIdPrefix

新的委托序列化程序/反序列化程序

框架现在提供了一个委托的SerializerDeserializer,利用标头来启用使用多个键/值类型生成和使用记录。有关更多信息,请参阅委托序列化程序和反序列化程序

新的重试反序列化程序

框架现在提供了一个委托的RetryingDeserializer,在发生网络问题等瞬态错误时重试序列化。有关更多信息,请参阅重试反序列化程序

2.1 版和 2.2 版之间的更改

Kafka 客户端版本

此版本需要 2.0.0 或更高版本的kafka-clients

类和包更改

ContainerProperties类已从org.springframework.kafka.listener.config移动到org.springframework.kafka.listener

AckMode枚举已从AbstractMessageListenerContainer移动到ContainerProperties

setBatchErrorHandler()setErrorHandler()方法已从ContainerProperties移动到AbstractMessageListenerContainerAbstractKafkaListenerContainerFactory

回滚后处理

提供了一种新的AfterRollbackProcessor策略。有关更多信息,请参阅回滚后处理器

ConcurrentKafkaListenerContainerFactory 更改

现在,您可以使用ConcurrentKafkaListenerContainerFactory创建和配置任何ConcurrentMessageListenerContainer,而不仅仅是那些用于@KafkaListener注释的容器。有关更多信息,请参阅容器工厂

侦听器容器更改

添加了一个新的容器属性(missingTopicsFatal)。有关更多信息,请参阅使用KafkaMessageListenerContainer

当使用者停止时,现在会发出ConsumerStoppedEvent。有关更多信息,请参阅线程安全

批处理侦听器可以选择接收完整的ConsumerRecords<?, ?>对象而不是List<ConsumerRecord<?, ?>。有关更多信息,请参阅[batch-listeners]

DefaultAfterRollbackProcessorSeekToCurrentErrorHandler现在可以恢复(跳过)持续失败的记录,并且默认情况下,在10次失败后这样做。可以将它们配置为将失败的记录发布到死信主题。

从 2.2.4 版开始,在选择死信主题名称时可以使用使用者的组 ID。

已添加ConsumerStoppingEvent。有关更多信息,请参阅应用程序事件

现在可以将SeekToCurrentErrorHandler配置为在容器配置为AckMode.MANUAL_IMMEDIATE时提交已恢复记录的偏移量(自 2.2.4 版起)。

@KafkaListener 更改

现在,您可以通过在注释上设置属性来覆盖侦听器容器工厂的concurrencyautoStartup属性。现在,您可以添加配置以确定将哪些标头(如果有)复制到回复消息中。有关更多信息,请参阅@KafkaListener 注释

现在,您可以在自己的注释上使用@KafkaListener作为元注释。有关更多信息,请参阅@KafkaListener 作为元注释

现在,更轻松地为@Payload验证配置Validator。有关更多信息,请参阅@KafkaListener @Payload 验证

现在,您可以在注释上直接指定 Kafka 使用者属性;这些属性将覆盖使用者工厂中定义的任何具有相同名称的属性(自 2.2.4 版起)。有关更多信息,请参阅注释属性

标头映射更改

类型为MimeTypeMediaType的标头现在在RecordHeader值中映射为简单字符串。以前,它们被映射为JSON,并且仅解码了MimeType。无法解码MediaType。现在它们是简单的字符串,用于互操作性。

此外,DefaultKafkaHeaderMapper有一个新的addToStringClasses方法,允许指定应使用toString()而不是JSON映射的类型。有关更多信息,请参阅消息标头

嵌入式 Kafka 更改

KafkaEmbedded类及其KafkaRule接口已被弃用,取而代之的是EmbeddedKafkaBroker及其JUnit 4 EmbeddedKafkaRule包装器。@EmbeddedKafka注释现在填充EmbeddedKafkaBroker bean,而不是已弃用的KafkaEmbedded。此更改允许在JUnit 5 测试中使用@EmbeddedKafka@EmbeddedKafka注释现在具有属性ports,用于指定填充EmbeddedKafkaBroker的端口。有关更多信息,请参阅测试应用程序

JsonSerializer/Deserializer 增强功能

现在,您可以通过使用生产者和使用者属性来提供类型映射信息。

反序列化程序上提供了新的构造函数,允许使用提供的目标类型覆盖类型标头信息。

JsonDeserializer现在默认删除任何类型信息标头。

现在,您可以使用 Kafka 属性配置JsonDeserializer以忽略类型信息标头(自 2.2.3 版起)。

有关更多信息,请参阅序列化、反序列化和消息转换

Kafka Streams 更改

Streams 配置 bean 现在必须是KafkaStreamsConfiguration对象,而不是StreamsConfig对象。

StreamsBuilderFactoryBean已从包…​core移动到…​config

已引入KafkaStreamBrancher,以便在KStream实例之上构建条件分支时提供更好的最终用户体验。

有关更多信息,请参阅Apache Kafka Streams 支持配置

事务 ID

当侦听器容器启动事务时,transactional.id现在是transactionIdPrefix附加<group.id>.<topic>.<partition>。此更改允许正确地隔离僵尸进程,此处进行了说明

2.0 版和 2.1 版之间的更改

Kafka 客户端版本

此版本需要 1.0.0 或更高版本的kafka-clients

2.2 版本原生支持 1.1.x 客户端。

JSON 改进

StringJsonMessageConverterJsonSerializer现在在Headers中添加类型信息,让转换器和JsonDeserializer在接收时创建特定类型,基于消息本身而不是固定的配置类型。有关更多信息,请参阅序列化、反序列化和消息转换

容器停止错误处理程序

现在为记录和批处理侦听器提供了容器错误处理程序,它们将侦听器抛出的任何异常视为致命异常/ 它们会停止容器。有关更多信息,请参阅处理异常

暂停和恢复容器

侦听器容器现在具有pause()resume()方法(自 2.1.3 版起)。有关更多信息,请参阅暂停和恢复侦听器容器

有状态重试

从 2.1.3 版开始,您可以配置有状态重试。有关更多信息,请参阅有状态重试

客户端 ID

从 2.1.1 版开始,您现在可以在@KafkaListener上设置client.id前缀。以前,要自定义客户端 ID,您需要为每个侦听器提供一个单独的使用者工厂(和容器工厂)。前缀后跟-n,以便在使用并发时提供唯一的客户端 ID。

记录偏移量提交

默认情况下,主题偏移量提交的日志记录使用DEBUG日志级别执行。从 2.1.2 版本开始,ContainerProperties 中新增了一个名为commitLogLevel的属性,允许您指定这些消息的日志级别。有关更多信息,请参见使用KafkaMessageListenerContainer

默认的 @KafkaHandler

从 2.1.3 版本开始,您可以将类级别 @KafkaListener 上的一个 @KafkaHandler 注解指定为默认注解。有关更多信息,请参见类上的@KafkaListener

ReplyingKafkaTemplate

从 2.1.3 版本开始,提供了一个KafkaTemplate的子类来支持请求/回复语义。有关更多信息,请参见使用ReplyingKafkaTemplate

ChainedKafkaTransactionManager

2.1.3 版本引入了ChainedKafkaTransactionManager。(现在已弃用)。

从 2.0 迁移指南

请参阅2.0 到 2.1 迁移指南。

1.3 和 2.0 之间的更改

Spring Framework 和 Java 版本

Spring for Apache Kafka 项目现在需要 Spring Framework 5.0 和 Java 8。

@KafkaListener 更改

您现在可以使用@SendTo注解@KafkaListener方法(以及类和@KafkaHandler方法)。如果方法返回结果,则将其转发到指定的主题。有关更多信息,请参见使用@SendTo转发侦听器结果

消息侦听器

消息侦听器现在可以感知Consumer对象。有关更多信息,请参见[message-listeners]

使用ConsumerAwareRebalanceListener

重新平衡侦听器现在可以在重新平衡通知期间访问Consumer对象。有关更多信息,请参见重新平衡侦听器

1.2 和 1.3 之间的更改

事务支持

0.11.0.0 客户端库添加了对事务的支持。已添加KafkaTransactionManager和其他事务支持。有关更多信息,请参见事务

标头支持

0.11.0.0 客户端库添加了对消息标头的支持。这些现在可以映射到spring-messaging MessageHeaders并从中映射。有关更多信息,请参见消息标头

创建主题

0.11.0.0 客户端库提供了一个AdminClient,您可以使用它来创建主题。KafkaAdmin使用此客户端自动添加定义为@Bean实例的主题。

Kafka 时间戳支持

KafkaTemplate现在支持一个 API 用于添加带有时间戳的记录。引入了关于timestamp支持的新KafkaHeaders。此外,还添加了新的KafkaConditions.timestamp()KafkaMatchers.hasTimestamp()测试实用程序。有关更多详细信息,请参见使用KafkaTemplate@KafkaListener 注解测试应用程序

@KafkaListener 更改

您现在可以配置一个KafkaListenerErrorHandler来处理异常。有关更多信息,请参见处理异常

默认情况下,@KafkaListenerid属性现在用作group.id属性,覆盖消费者工厂中配置的属性(如果存在)。此外,您可以在注解上显式配置groupId。以前,您需要一个单独的容器工厂(和消费者工厂)才能为侦听器使用不同的group.id值。要恢复使用工厂配置的group.id的先前行为,请将注解上的idIsGroup属性设置为false

@EmbeddedKafka 注解

为方便起见,提供了一个测试类级别的@EmbeddedKafka注解,用于将KafkaEmbedded注册为 Bean。有关更多信息,请参见测试应用程序

Kerberos 配置

现在提供对配置 Kerberos 的支持。有关更多信息,请参见JAAS 和 Kerberos

1.1 和 1.2 之间的更改

此版本使用 0.10.2.x 客户端。

1.0 和 1.1 之间的更改

Kafka 客户端

此版本使用 Apache Kafka 0.10.x.x 客户端。

批处理侦听器

可以将侦听器配置为接收consumer.poll()操作返回的整个消息批次,而不是一次接收一条消息。

空有效负载

当您使用日志压缩时,空有效负载用于“删除”键。

初始偏移量

当显式分配分区时,您现在可以配置相对于消费者组当前位置的初始偏移量,而不是相对于当前末尾的绝对或相对偏移量。

Seek

您现在可以查找每个主题或分区的职位。您可以使用它在初始化期间设置初始位置,当使用组管理并且 Kafka 分配分区时。您还可以在检测到空闲容器时或应用程序执行中的任意点进行查找。有关更多信息,请参见[seek]