3.2.2

参考指南

本指南描述了 Spring Cloud Stream Binder 的 Apache Kafka 实现。它包含了关于其设计、用法和配置选项的信息,以及 Stream Cloud Stream 概念如何映射到 Apache Kafka 特定构造的信息。此外,本指南还解释了 Spring Cloud Stream 的 Kafka Streams 绑定功能。

1. Apache Kafka Binder

1.1. 用法

要使用 Apache Kafka binder,您需要将 spring-cloud-stream-binder-kafka 添加为 Spring Cloud Stream 应用程序的依赖项,如下面的 Maven 示例所示

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

或者,您也可以使用 Spring Cloud Stream Kafka Starter,如下面的 Maven 示例所示

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

1.2. 概览

下图显示了 Apache Kafka binder 操作的简化图

kafka binder
图 1. Kafka Binder

Apache Kafka Binder 实现将每个目标映射到 Apache Kafka 主题。消费者组直接映射到相同的 Apache Kafka 概念。分区也直接映射到 Apache Kafka 分区。

binder 当前使用 Apache Kafka kafka-clients 版本 2.3.1。此客户端可以与旧代理通信(请参阅 Kafka 文档),但某些功能可能不可用。例如,对于早于 0.11.x.x 的版本,不支持原生头。此外,0.11.x.x 不支持 autoAddPartitions 属性。

1.3. 配置选项

本节包含 Apache Kafka binder 使用的配置选项。

有关 binder 的常见配置选项和属性,请参阅核心文档中的绑定属性

1.3.1. Kafka Binder 属性

spring.cloud.stream.kafka.binder.brokers

Kafka binder 连接的代理列表。

默认值:localhost

spring.cloud.stream.kafka.binder.defaultBrokerPort

brokers 允许指定带或不带端口信息的主机(例如,host1,host2:port2)。这设置了当代理列表中未配置端口时的默认端口。

默认值:9092

spring.cloud.stream.kafka.binder.configuration

键/值映射,包含传递给 binder 创建的所有客户端的客户端属性(生产者和消费者)。由于这些属性由生产者和消费者使用,因此用法应仅限于通用属性——例如,安全设置。通过此配置提供的未知 Kafka 生产者或消费者属性将被过滤掉,不允许传播。此处的属性将覆盖引导中设置的任何属性。

默认值:空映射。

spring.cloud.stream.kafka.binder.consumerProperties

任意 Kafka 客户端消费者属性的键/值映射。除了支持已知的 Kafka 消费者属性外,此处也允许未知消费者属性。此处的属性将覆盖引导中设置的任何属性以及上面 configuration 属性中设置的任何属性。

默认值:空映射。

spring.cloud.stream.kafka.binder.headers

由 binder 传输的自定义头列表。仅当与旧应用程序 (⇐ 1.3.x) 且 kafka-clients 版本 < 0.11.0.0 通信时才需要。较新版本原生支持头。

默认值:空。

spring.cloud.stream.kafka.binder.healthTimeout

等待获取分区信息的时间,以秒为单位。如果此计时器到期,则健康报告为 down。

默认值:10。

spring.cloud.stream.kafka.binder.requiredAcks

代理上所需的 ack 数量。有关生产者 acks 属性,请参阅 Kafka 文档。

默认值:1

spring.cloud.stream.kafka.binder.minPartitionCount

仅当设置了 autoCreateTopicsautoAddPartitions 时才有效。binder 在其生产或消费数据的主题上配置的全局最小分区数。它可以被生产者的 partitionCount 设置或生产者的 instanceCount * concurrency 设置的值(如果两者中任何一个更大)覆盖。

默认值:1

spring.cloud.stream.kafka.binder.producerProperties

任意 Kafka 客户端生产者属性的键/值映射。除了支持已知的 Kafka 生产者属性外,此处也允许未知生产者属性。此处的属性将覆盖引导中设置的任何属性以及上面 configuration 属性中设置的任何属性。

默认值:空映射。

spring.cloud.stream.kafka.binder.replicationFactor

如果 autoCreateTopics 处于活动状态,则自动创建主题的复制因子。可以在每个绑定上覆盖。

如果您使用的 Kafka 代理版本早于 2.4,则此值应设置为至少 1。从 3.0.8 版本开始,binder 使用 -1 作为默认值,这表示将使用代理的“default.replication.factor”属性来确定副本数量。请咨询您的 Kafka 代理管理员,了解是否存在要求最小复制因子的策略,如果是这种情况,通常 default.replication.factor 将与该值匹配,并且应使用 -1,除非您需要大于最小值的复制因子。

默认值:-1

spring.cloud.stream.kafka.binder.autoCreateTopics

如果设置为 true,则 binder 会自动创建新主题。如果设置为 false,则 binder 依赖于已配置的主题。在后一种情况下,如果主题不存在,则 binder 无法启动。

此设置独立于代理的 auto.create.topics.enable 设置,并且不影响它。如果服务器设置为自动创建主题,则它们可能会作为元数据检索请求的一部分创建,并使用默认代理设置。

默认值:true

spring.cloud.stream.kafka.binder.autoAddPartitions

如果设置为 true,则 binder 会在需要时创建新分区。如果设置为 false,则 binder 依赖于已配置的主题分区大小。如果目标主题的分区计数小于预期值,则 binder 无法启动。

默认值:false

spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix

在 binder 中启用事务。有关 Kafka 文档中的 transaction.idspring-kafka 文档中的事务,请参阅相关内容。启用事务时,将忽略单个 producer 属性,并且所有生产者都使用 spring.cloud.stream.kafka.binder.transaction.producer.* 属性。

默认 null(无事务)

spring.cloud.stream.kafka.binder.transaction.producer.*

事务性 binder 中生产者的全局生产者属性。请参阅 spring.cloud.stream.kafka.binder.transaction.transactionIdPrefixKafka 生产者属性以及所有 binder 支持的通用生产者属性。

默认值:请参阅各个生产者属性。

spring.cloud.stream.kafka.binder.headerMapperBeanName

用于将 spring-messaging 头映射到 Kafka 头以及从 Kafka 头映射的 KafkaHeaderMapper 的 bean 名称。例如,如果您希望自定义使用 JSON 反序列化头的 BinderHeaderMapper bean 中的受信任包,请使用此项。如果此自定义 BinderHeaderMapper bean 未通过此属性提供给 binder,则 binder 将查找名称为 kafkaBinderHeaderMapper 且类型为 BinderHeaderMapper 的头映射器 bean,然后再回退到 binder 创建的默认 BinderHeaderMapper

默认值:无。

spring.cloud.stream.kafka.binder.considerDownWhenAnyPartitionHasNoLeader

当主题上的任何分区(无论从哪个消费者接收数据)被发现没有 leader 时,将 binder 健康状态设置为 down 的标志。

默认值:false

spring.cloud.stream.kafka.binder.certificateStoreDirectory

当 truststore 或 keystore 证书位置以 classpath URL (classpath:…​) 给出时,binder 会将 JAR 文件内部 classpath 位置的资源复制到文件系统上的某个位置。这对于代理级别证书 (ssl.truststore.locationssl.keystore.location) 和用于 schema 注册表 (schema.registry.ssl.truststore.locationschema.registry.ssl.keystore.location) 的证书都是如此。请记住,truststore 和 keystore 的 classpath 位置必须在 spring.cloud.stream.kafka.binder.configuration…​ 下提供。例如,spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location`spring.cloud.stream.kafka.binder.configuration.schema.registry.ssl.truststore.location 等。文件将移动到此属性值指定的位置,该位置必须是文件系统上存在的目录,并且可由运行应用程序的进程写入。如果此值未设置且证书文件是 classpath 资源,则它将移动到由 System.getProperty("java.io.tmpdir") 返回的系统临时目录。如果此值存在但找不到目录或不可写入,则也是如此。

默认值:无。

1.3.2. Kafka 消费者属性

为避免重复,Spring Cloud Stream 支持以 spring.cloud.stream.kafka.default.consumer.<property>=<value> 的格式为所有通道设置值。

以下属性仅适用于 Kafka 消费者,并且必须以 spring.cloud.stream.kafka.bindings.<channelName>.consumer. 为前缀。

admin.configuration

自 2.1.1 版起,此属性已弃用,取而代之的是 topic.properties,并将在未来版本中移除对其的支持。

admin.replicas-assignment

自 2.1.1 版起,此属性已弃用,取而代之的是 topic.replicas-assignment,并将在未来版本中移除对其的支持。

admin.replication-factor

自 2.1.1 版起,此属性已弃用,取而代之的是 topic.replication-factor,并将在未来版本中移除对其的支持。

autoRebalanceEnabled

当为 true 时,主题分区会在消费者组的成员之间自动重新平衡。当为 false 时,每个消费者根据 spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex 分配一组固定分区。这需要每个启动实例上都适当地设置 spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex 属性。在这种情况下,spring.cloud.stream.instanceCount 属性的值通常必须大于 1。

默认值:true

ackEachRecord

autoCommitOffsettrue 时,此设置决定是否在处理每个记录后提交偏移量。默认情况下,在处理完 consumer.poll() 返回的批次中的所有记录后,会提交偏移量。通过 max.poll.records Kafka 属性(通过消费者 configuration 属性设置)可以控制轮询返回的记录数。将其设置为 true 可能会导致性能下降,但这样做可以减少发生故障时重新投递记录的可能性。另请参阅 binder 的 requiredAcks 属性,它也会影响提交偏移量的性能。此属性自 3.1 版本起已弃用,取而代之的是使用 ackMode。如果未设置 ackMode 且未启用批处理模式,将使用 RECORD ackMode。

默认值:false

autoCommitOffset

从 3.1 版本开始,此属性已弃用。有关替代方案的更多详细信息,请参阅 ackMode。消息处理后是否自动提交偏移量。如果设置为 false,则入站消息中会存在一个键为 kafka_acknowledgment 且类型为 org.springframework.kafka.support.Acknowledgment 的头。应用程序可以使用此头来确认消息。有关详细信息,请参阅示例部分。当此属性设置为 false 时,Kafka binder 将确认模式设置为 org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL,并且应用程序负责确认记录。另请参阅 ackEachRecord

默认值:true

ackMode

指定容器确认模式。这基于 Spring Kafka 中定义的 AckMode 枚举。如果 ackEachRecord 属性设置为 true 且消费者不处于批处理模式,则将使用 RECORD 确认模式,否则,使用通过此属性提供的确认模式。

autoCommitOnError

在可轮询消费者中,如果设置为 true,则总是在出错时自动提交。如果未设置(默认)或为 false,则在可轮询消费者中不会自动提交。请注意,此属性仅适用于可轮询消费者。

默认值:未设置。

resetOffsets

是否将消费者上的偏移量重置为 startOffset 提供的值。如果提供了 KafkaBindingRebalanceListener,则必须为 false;请参阅使用 KafkaBindingRebalanceListener。有关此属性的更多信息,请参阅重置偏移量

默认值:false

startOffset

新组的起始偏移量。允许的值:earliestlatest。如果为消费者“绑定”显式设置了消费者组(通过 spring.cloud.stream.bindings.<channelName>.group),则“startOffset”设置为 earliest。否则,对于 anonymous 消费者组,它设置为 latest。有关此属性的更多信息,请参阅重置偏移量

默认值:null(等同于 earliest)。

enableDlq

当设置为 true 时,它为消费者启用 DLQ 行为。默认情况下,导致错误的消息会转发到名为 error.<destination>.<group> 的主题。DLQ 主题名称可以通过设置 dlqName 属性或定义类型为 DlqDestinationResolver@Bean 来配置。这为更常见的 Kafka 重放场景提供了一种替代选项,适用于错误数量相对较少且重放整个原始主题可能过于繁琐的情况。有关更多信息,请参阅死信主题处理。从 2.0 版本开始,发送到 DLQ 主题的消息会添加以下头:x-original-topicx-exception-messagex-exception-stacktrace 作为 byte[]。默认情况下,失败的记录会发送到 DLQ 主题中与原始记录相同分区号的分区。有关如何更改此行为,请参阅死信主题分区选择destinationIsPatterntrue 时不允许。

默认值:false

dlqPartitions

enableDlq 为 true 且未设置此属性时,会创建一个死信主题,其分区数与主主题相同。通常,死信记录会发送到死信主题中与原始记录相同的分区。此行为可以更改;请参阅死信主题分区选择。如果此属性设置为 1 并且没有 DqlPartitionFunction bean,则所有死信记录都将写入分区 0。如果此属性大于 1,则您必须提供一个 DlqPartitionFunction bean。请注意,实际分区计数受 binder 的 minPartitionCount 属性影响。

默认值:none

configuration

包含通用 Kafka 消费者属性的键/值对映射。除了 Kafka 消费者属性外,此处还可以传递其他配置属性。例如,应用程序所需的一些属性,如 spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar。此处不能设置 bootstrap.servers 属性;如果需要连接到多个集群,请使用多 binder 支持。

默认值:空映射。

dlqName

用于接收错误消息的 DLQ 主题名称。

默认值:null(如果未指定,导致错误的消息将转发到名为 error.<destination>.<group> 的主题)。

dlqProducerProperties

通过此属性,可以设置 DLQ 特定的生产者属性。所有可通过 Kafka 生产者属性获得的属性都可以通过此属性进行设置。当消费者上启用原生解码(即 useNativeDecoding: true)时,应用程序必须为 DLQ 提供相应的键/值序列化器。这必须以 dlqProducerProperties.configuration.key.serializerdlqProducerProperties.configuration.value.serializer 的形式提供。

默认值:默认 Kafka 生产者属性。

standardHeaders

指示入站通道适配器填充哪些标准头。允许的值:noneidtimestampboth。如果使用原生反序列化且接收消息的第一个组件需要一个 id(例如配置为使用 JDBC 消息存储的聚合器),则此功能很有用。

默认值:none

converterBeanName

实现 RecordMessageConverter 的 bean 的名称。在入站通道适配器中用于替换默认的 MessagingMessageConverter

默认值:null

idleEventInterval

指示最近没有收到消息的事件之间的间隔,以毫秒为单位。使用 ApplicationListener<ListenerContainerIdleEvent> 来接收这些事件。有关使用示例,请参阅示例:暂停和恢复消费者

默认值:30000

destinationIsPattern

当为 true 时,目标被视为一个正则表达式 Pattern,用于由代理匹配主题名称。当为 true 时,不提供主题,并且不允许 enableDlq,因为 binder 在 provisioning 阶段不知道主题名称。请注意,检测与模式匹配的新主题所需的时间由消费者属性 metadata.max.age.ms 控制,该属性(在撰写本文时)默认为 300,000 毫秒(5 分钟)。这可以使用上面的 configuration 属性进行配置。

默认值:false

topic.properties

在配置新主题时使用的 Kafka 主题属性的 Map,例如 spring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0

默认值:无。

topic.replicas-assignment

副本分配的 Map<Integer, List<Integer>>,其中键是分区,值是分配。在配置新主题时使用。请参阅 kafka-clients jar 中的 NewTopic Javadoc。

默认值:无。

topic.replication-factor

配置主题时使用的复制因子。覆盖 binder 范围的设置。如果存在 replicas-assignments 则忽略。

默认值:无(使用 binder 范围的默认值 -1)。

pollTimeout

可轮询消费者中轮询使用的超时时间。

默认值:5 秒。

事务管理器

KafkaAwareTransactionManager 的 bean 名称,用于覆盖此绑定的 binder 事务管理器。通常在您希望使用 ChainedKafkaTransactionManaager 将另一个事务与 Kafka 事务同步时需要。要实现记录的精确一次消费和生产,消费者和生产者绑定必须都配置相同的事务管理器。

默认值:无。

txCommitRecovered

使用事务性 binder 时,恢复记录的偏移量(例如,当重试耗尽且记录发送到死信主题时)将默认通过新事务提交。将此属性设置为 false 将抑制恢复记录的偏移量提交。

默认值:true。

commonErrorHandlerBeanName

每个消费者绑定要使用的 CommonErrorHandler bean 名称。如果存在,此用户提供的 CommonErrorHandler 将优先于 binder 定义的任何其他错误处理器。如果应用程序不想使用 ListenerContainerCustomizer 然后检查目标/组组合来设置错误处理器,这是一种方便表达错误处理器的方式。

默认值:无。

1.3.3. 重置偏移量

当应用程序启动时,每个分配的分区中的初始位置取决于两个属性 startOffsetresetOffsets。如果 resetOffsetsfalse,则应用正常的 Kafka 消费者 auto.offset.reset 语义。即,如果绑定消费者组的分区没有提交的偏移量,则位置为 earliestlatest。默认情况下,具有显式 group 的绑定使用 earliest,匿名绑定(没有 group)使用 latest。这些默认值可以通过设置 startOffset 绑定属性来覆盖。首次使用特定 group 启动绑定时,将没有提交的偏移量。没有提交偏移量的另一种情况是偏移量已过期。对于现代代理(自 2.1 版本起),以及默认代理属性,偏移量在最后一个成员离开组后 7 天过期。有关更多信息,请参阅 offsets.retention.minutes 代理属性。

resetOffsetstrue 时,binder 应用与代理上没有提交偏移量时类似的语义,就像此绑定从未从主题消费过一样;即,任何当前的提交偏移量都将被忽略。

以下是可能使用此功能两种情况。

  1. 从包含键/值对的压缩主题消费。将 resetOffsets 设置为 true 并将 startOffset 设置为 earliest;绑定将对所有新分配的分区执行 seekToBeginning

  2. 从包含事件的主题消费,您只对在此绑定运行时发生的事件感兴趣。将 resetOffsets 设置为 true 并将 startOffset 设置为 latest;绑定将对所有新分配的分区执行 seekToEnd

如果在初始分配后发生再平衡,则寻求仅对在初始分配期间未分配的任何新分配的分区执行。

有关主题偏移量的更多控制,请参阅使用 KafkaBindingRebalanceListener;当提供侦听器时,不应将 resetOffsets 设置为 true,否则将导致错误。

1.3.4. 消费批次

从 3.0 版本开始,当 spring.cloud.stream.binding.<name>.consumer.batch-mode 设置为 true 时,通过轮询 Kafka Consumer 接收到的所有记录都将以 List<?> 的形式呈现给侦听器方法。否则,方法将一次调用一个记录。批处理的大小由 Kafka 消费者属性 max.poll.recordsfetch.min.bytesfetch.max.wait.ms 控制;有关详细信息,请参阅 Kafka 文档。

请记住,批处理模式不支持 @StreamListener - 它仅适用于较新的函数式编程模型。

在批处理模式下不支持 binder 内的重试,因此 maxAttempts 将被覆盖为 1。您可以配置一个 SeekToCurrentBatchErrorHandler(使用 ListenerContainerCustomizer)以实现与 binder 中重试类似的功能。您还可以使用手动 AckMode 并调用 Ackowledgment.nack(index, sleep) 来提交部分批次的偏移量,并重新传递剩余的记录。有关这些技术的更多信息,请参阅 Spring for Apache Kafka 文档

1.3.5. Kafka 生产者属性

为避免重复,Spring Cloud Stream 支持以 spring.cloud.stream.kafka.default.producer.<property>=<value> 的格式为所有通道设置值。

以下属性仅适用于 Kafka 生产者,并且必须以 spring.cloud.stream.kafka.bindings.<channelName>.producer. 为前缀。

admin.configuration

自 2.1.1 版起,此属性已弃用,取而代之的是 topic.properties,并将在未来版本中移除对其的支持。

admin.replicas-assignment

自 2.1.1 版起,此属性已弃用,取而代之的是 topic.replicas-assignment,并将在未来版本中移除对其的支持。

admin.replication-factor

自 2.1.1 版起,此属性已弃用,取而代之的是 topic.replication-factor,并将在未来版本中移除对其的支持。

bufferSize

Kafka 生产者在发送之前尝试批量处理的数据量的上限,以字节为单位。

默认值:16384

sync

生产者是否同步。

默认值:false

sendTimeoutExpression

对出站消息求值的 SpEL 表达式,用于评估启用同步发布时等待 ack 的时间——例如,headers['mySendTimeout']。超时值以毫秒为单位。在 3.0 之前的版本中,除非使用原生编码,否则无法使用有效负载,因为在评估此表达式时,有效负载已经是 byte[] 的形式。现在,在转换有效负载之前评估表达式。

默认值:none

batchTimeout

生产者等待更多消息累积在同一批次中然后发送消息的时间。(通常,生产者根本不会等待,而只是发送在前一次发送进行中累积的所有消息。)非零值可能会以延迟为代价提高吞吐量。

默认值:0

messageKeyExpression

一个针对出站消息进行评估的 SpEL 表达式,用于填充生成的 Kafka 消息的键——例如,headers['myKey']。在 3.0 之前的版本中,除非使用原生编码,否则无法使用有效负载,因为在评估此表达式时,有效负载已经是 byte[] 的形式。现在,在转换有效负载之前评估表达式。对于常规处理器 (Function<String, String>Function<Message<?>, Message<?>),如果生成的键需要与主题中的传入键相同,则可以按如下方式设置此属性。spring.cloud.stream.kafka.bindings.<output-binding-name>.producer.messageKeyExpression: headers['kafka_receivedMessageKey'] 对于响应式函数有一个重要的注意事项。在这种情况下,应用程序需要手动将传入消息中的头复制到出站消息中。您可以设置头,例如 myKey,并使用 headers['myKey'],如上所述,或者为了方便起见,只需设置 KafkaHeaders.MESSAGE_KEY 头,您根本不需要设置此属性。

默认值:none

headerPatterns

一个逗号分隔的简单模式列表,用于匹配 Spring 消息头,这些消息头将映射到 ProducerRecord 中的 Kafka Headers。模式可以以通配符(星号)开头或结尾。模式可以通过前缀 ! 来否定。匹配在第一次匹配(肯定或否定)后停止。例如 !ask,as* 将通过 ash 但不通过 askidtimestamp 永远不会映射。

默认值:*(所有头 - 除了 idtimestamp

configuration

包含通用 Kafka 生产者属性的键/值对映射。此处不能设置 bootstrap.servers 属性;如果需要连接到多个集群,请使用多 binder 支持。

默认值:空映射。

topic.properties

在配置新主题时使用的 Kafka 主题属性的 Map,例如 spring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0

topic.replicas-assignment

副本分配的 Map<Integer, List<Integer>>,其中键是分区,值是分配。在配置新主题时使用。请参阅 kafka-clients jar 中的 NewTopic Javadoc。

默认值:无。

topic.replication-factor

配置主题时使用的复制因子。覆盖 binder 范围的设置。如果存在 replicas-assignments 则忽略。

默认值:无(使用 binder 范围的默认值 -1)。

useTopicHeader

设置为 true 以使用出站消息中的 KafkaHeaders.TOPIC 消息头的值覆盖默认绑定目标(主题名称)。如果不存在该头,则使用默认绑定目标。

默认值:false

recordMetadataChannel

用于发送成功发送结果的 MessageChannel 的 bean 名称;该 bean 必须存在于应用程序上下文中。发送到通道的消息是已发送消息(如果进行了转换,则为转换后的消息),并带有一个额外的头 KafkaHeaders.RECORD_METADATA。该头包含 Kafka 客户端提供的 RecordMetadata 对象;它包括记录写入主题的分区和偏移量。

ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)

失败的发送将发送到生产者错误通道(如果已配置);请参阅错误通道

默认值:null。

Kafka binder 使用生产者的 partitionCount 设置作为提示来创建具有给定分区计数的主题(结合 minPartitionCount,两者中的最大值将用作实际值)。在为 binder 配置 minPartitionCount 和为应用程序配置 partitionCount 时要谨慎,因为将使用较大的值。如果主题已存在且分区计数较小,并且 autoAddPartitions 被禁用(默认),则 binder 将无法启动。如果主题已存在且分区计数较小,并且 autoAddPartitions 被启用,则会添加新分区。如果主题已存在且分区数量大于 minPartitionCountpartitionCount 的最大值,则将使用现有分区计数。
compression

设置 compression.type 生产者属性。支持的值为 nonegzipsnappylz4zstd。如果您将 kafka-clients jar 覆盖为 2.1.0(或更高版本),如 Spring for Apache Kafka 文档中所述,并希望使用 zstd 压缩,请使用 spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd

默认值:none

事务管理器

KafkaAwareTransactionManager 的 bean 名称,用于覆盖此绑定的 binder 事务管理器。通常在您希望使用 ChainedKafkaTransactionManaager 将另一个事务与 Kafka 事务同步时需要。要实现记录的精确一次消费和生产,消费者和生产者绑定必须都配置相同的事务管理器。

默认值:无。

closeTimeout

关闭生产者时等待的超时时间,以秒为单位。

默认值:30

allowNonTransactional

通常,与事务性 binder 关联的所有输出绑定都将在新事务中发布,如果尚未进行事务。此属性允许您覆盖该行为。如果设置为 true,则发布到此输出绑定的记录将不会在事务中运行,除非事务已经进行中。

默认值:false

1.3.6. 使用示例

在本节中,我们将展示前面属性在特定场景中的使用。

示例:将 ackMode 设置为 MANUAL 并依赖手动确认

此示例说明了如何在消费者应用程序中手动确认偏移量。

此示例要求将 spring.cloud.stream.kafka.bindings.input.consumer.ackMode 设置为 MANUAL。请为您的示例使用相应的输入通道名称。

@SpringBootApplication
@EnableBinding(Sink.class)
public class ManuallyAcknowdledgingConsumer {

 public static void main(String[] args) {
     SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
 }

 @StreamListener(Sink.INPUT)
 public void process(Message<?> message) {
     Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
     if (acknowledgment != null) {
         System.out.println("Acknowledgment provided");
         acknowledgment.acknowledge();
     }
 }
}
示例:安全配置

Apache Kafka 0.9 支持客户端和代理之间的安全连接。要利用此功能,请遵循 Apache Kafka 文档中的指南以及 Confluent 文档中 Kafka 0.9 安全指南。使用 spring.cloud.stream.kafka.binder.configuration 选项为 binder 创建的所有客户端设置安全属性。

例如,要将 security.protocol 设置为 SASL_SSL,请设置以下属性

spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL

所有其他安全属性也可以以类似的方式设置。

使用 Kerberos 时,请遵循 参考文档中的说明来创建和引用 JAAS 配置。

Spring Cloud Stream 支持通过使用 JAAS 配置文件和 Spring Boot 属性将 JAAS 配置信息传递给应用程序。

使用 JAAS 配置文件

通过使用系统属性,可以为 Spring Cloud Stream 应用程序设置 JAAS 和(可选)krb5 文件位置。以下示例演示了如何使用 JAAS 配置文件启动带 SASL 和 Kerberos 的 Spring Cloud Stream 应用程序

 java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \
   --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
   --spring.cloud.stream.bindings.input.destination=stream.ticktock \
   --spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
使用 Spring Boot 属性

作为 JAAS 配置文件的替代方案,Spring Cloud Stream 提供了一种机制,允许使用 Spring Boot 属性为 Spring Cloud Stream 应用程序设置 JAAS 配置。

以下属性可用于配置 Kafka 客户端的登录上下文

spring.cloud.stream.kafka.binder.jaas.loginModule

登录模块名称。在正常情况下无需设置。

默认值:com.sun.security.auth.module.Krb5LoginModule

spring.cloud.stream.kafka.binder.jaas.controlFlag

登录模块的控制标志。

默认值:required

spring.cloud.stream.kafka.binder.jaas.options

包含登录模块选项的键/值对映射。

默认值:空映射。

以下示例演示了如何使用 Spring Boot 配置属性启动带 SASL 和 Kerberos 的 Spring Cloud Stream 应用程序

 java --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
   --spring.cloud.stream.bindings.input.destination=stream.ticktock \
   --spring.cloud.stream.kafka.binder.autoCreateTopics=false \
   --spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT \
   --spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true \
   --spring.cloud.stream.kafka.binder.jaas.options.storeKey=true \
   --spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab \
   --spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM

前面的示例表示以下 JAAS 文件的等价物

KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/kafka_client.keytab"
    principal="[email protected]";
};

如果所需主题已存在于代理上或将由管理员创建,则可以关闭自动创建,并且只需要发送客户端 JAAS 属性。

不要在同一个应用程序中混合使用 JAAS 配置文件和 Spring Boot 属性。如果 -Djava.security.auth.login.config 系统属性已存在,Spring Cloud Stream 将忽略 Spring Boot 属性。
在使用 Kerberos 时,请谨慎使用 autoCreateTopicsautoAddPartitions。通常,应用程序可能使用在 Kafka 和 Zookeeper 中没有管理权限的主体。因此,依赖 Spring Cloud Stream 创建/修改主题可能会失败。在安全环境中,我们强烈建议使用 Kafka 工具以管理方式创建主题和管理 ACL。
多 binder 配置和 JAAS

当连接到需要单独 JAAS 配置的多个集群时,请使用属性 sasl.jaas.config 设置 JAAS 配置。当此属性存在于应用程序中时,它将优先于上面提到的其他策略。有关更多详细信息,请参阅此 KIP-85

例如,如果您的应用程序中有两个集群,每个集群都有单独的 JAAS 配置,那么以下是您可以使用的模板

spring.cloud.stream:
    binders:
        kafka1:
          type: kafka
          environment:
             spring:
               cloud:
                 stream:
                  kafka:
                    binder:
                      brokers: localhost:9092
                      configuration.sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";"
        kafka2:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: localhost:9093
                      configuration.sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user1\" password=\"user1-secret\";"
    kafka.binder:
        configuration:
          security.protocol: SASL_PLAINTEXT
          sasl.mechanism: PLAIN

请注意,在上述配置中,两个 Kafka 集群及其各自的 sasl.jaas.config 值都是不同的。

有关如何设置和运行此类应用程序的更多详细信息,请参阅此示例应用程序

示例:暂停和恢复消费者

如果您希望暂停消费但不引起分区再平衡,您可以暂停和恢复消费者。这通过管理绑定生命周期来实现,如 Spring Cloud Stream 文档中 绑定可视化和控制 所示,使用 State.PAUSEDState.RESUMED

要恢复,您可以使用 ApplicationListener(或 @EventListener 方法)接收 ListenerContainerIdleEvent 实例。事件发布的频率由 idleEventInterval 属性控制。

1.4. 事务性 Binder

通过将 spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix 设置为非空值(例如 tx-)来启用事务。当在处理器应用程序中使用时,消费者启动事务;消费者线程上发送的任何记录都参与相同的事务。当侦听器正常退出时,侦听器容器将偏移量发送到事务并提交。所有使用 spring.cloud.stream.kafka.binder.transaction.producer.* 属性配置的生产者绑定都使用一个通用的生产者工厂;单个绑定 Kafka 生产者属性将被忽略。

事务不支持常规 binder 重试(和死信),因为重试将在原始事务中运行,该事务可能会回滚,并且任何已发布的记录也将回滚。启用重试时(通用属性 maxAttempts 大于零),重试属性用于配置 DefaultAfterRollbackProcessor 以启用容器级别的重试。同样,死信记录的发布不再在事务中进行,此功能已转移到侦听器容器,再次通过 DefaultAfterRollbackProcessor,它在主事务回滚后运行。

如果您希望在源应用程序中使用事务,或者从某个任意线程进行仅生产者事务(例如 @Scheduled 方法),则必须获取事务性生产者工厂的引用并使用它定义 KafkaTransactionManager bean。

@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders,
        @Value("${unique.tx.id.per.instance}") String txId) {

    ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
            MessageChannel.class)).getTransactionalProducerFactory();
    KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
    tm.setTransactionId(txId)
    return tm;
}

请注意,我们使用 BinderFactory 获取 binder 的引用;当只有一个 binder 配置时,第一个参数使用 null。如果配置了多个 binder,则使用 binder 名称获取引用。一旦我们获得了 binder 的引用,我们就可以获取 ProducerFactory 的引用并创建事务管理器。

然后您将使用正常的 Spring 事务支持,例如 TransactionTemplate@Transactional,例如

public static class Sender {

    @Transactional
    public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
        stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
    }

}

如果您希望将仅生产者事务与来自其他事务管理器的事务同步,请使用 ChainedTransactionManager

如果您部署应用程序的多个实例,每个实例都需要唯一的 transactionIdPrefix

1.5. 错误通道

从 1.3 版本开始,binder 无条件地将异常发送到每个消费者目标的错误通道,并且还可以配置为将异步生产者发送失败发送到错误通道。有关更多信息,请参阅有关错误处理的这一部分

发送失败的 ErrorMessage 的有效负载是 KafkaSendFailureException,其属性为

  • failedMessage:发送失败的 Spring Messaging Message<?>

  • record:从 failedMessage 创建的原始 ProducerRecord

没有自动处理生产者异常(例如发送到死信队列)。您可以使用自己的 Spring Integration 流来处理这些异常。

1.6. Kafka 指标

Kafka binder 模块暴露以下指标

spring.cloud.stream.binder.kafka.offset:此指标指示给定消费者组从给定 binder 主题中尚未消费的消息数量。提供的指标基于 Micrometer 库。如果 Micrometer 在 classpath 中,并且应用程序未提供其他此类 bean,则 binder 会创建 KafkaBinderMetrics bean。该指标包含消费者组信息、主题和已提交偏移量与主题最新偏移量之间的实际滞后。此指标对于向 PaaS 平台提供自动伸缩反馈特别有用。

您可以通过在应用程序中提供以下组件来阻止 KafkaBinderMetrics 创建必要的基础设施(例如消费者)并报告指标。

@Component
class NoOpBindingMeters {
	NoOpBindingMeters(MeterRegistry registry) {
		registry.config().meterFilter(
				MeterFilter.denyNameStartsWith(KafkaBinderMetrics.OFFSET_LAG_METRIC_NAME));
	}
}

有关如何选择性地抑制仪表的更多详细信息,请参阅此处

1.7. 墓碑记录 (空记录值)

使用压缩主题时,值为 null 的记录(也称为墓碑记录)表示键的删除。要在 @StreamListener 方法中接收此类消息,参数必须标记为非必需才能接收 null 值参数。

@StreamListener(Sink.INPUT)
public void in(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) byte[] key,
               @Payload(required = false) Customer customer) {
    // customer is null if a tombstone record
    ...
}

1.8. 使用 KafkaBindingRebalanceListener

应用程序可能希望在分区最初分配时将主题/分区定位到任意偏移量,或对消费者执行其他操作。从 2.1 版本开始,如果您在应用程序上下文中提供一个 KafkaBindingRebalanceListener bean,它将被连接到所有 Kafka 消费者绑定中。

public interface KafkaBindingRebalanceListener {

	/**
	 * Invoked by the container before any pending offsets are committed.
	 * @param bindingName the name of the binding.
	 * @param consumer the consumer.
	 * @param partitions the partitions.
	 */
	default void onPartitionsRevokedBeforeCommit(String bindingName, Consumer<?, ?> consumer,
			Collection<TopicPartition> partitions) {

	}

	/**
	 * Invoked by the container after any pending offsets are committed.
	 * @param bindingName the name of the binding.
	 * @param consumer the consumer.
	 * @param partitions the partitions.
	 */
	default void onPartitionsRevokedAfterCommit(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {

	}

	/**
	 * Invoked when partitions are initially assigned or after a rebalance.
	 * Applications might only want to perform seek operations on an initial assignment.
	 * @param bindingName the name of the binding.
	 * @param consumer the consumer.
	 * @param partitions the partitions.
	 * @param initial true if this is the initial assignment.
	 */
	default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
			boolean initial) {

	}

}

提供再平衡侦听器时,不能将 resetOffsets 消费者属性设置为 true

1.9. 重试和死信处理

默认情况下,当您在消费者绑定中配置重试(例如 maxAttemts)和 enableDlq 时,这些功能在 binder 内部执行,不参与侦听器容器或 Kafka 消费者。

在某些情况下,将此功能移到侦听器容器中更可取,例如

  • 重试和延迟的总和将超过消费者的 max.poll.interval.ms 属性,可能导致分区再平衡。

  • 您希望将死信发布到不同的 Kafka 集群。

  • 您希望向错误处理器添加重试侦听器。

  • …​

要配置将此功能从 binder 移动到容器,请定义类型为 ListenerContainerWithDlqAndRetryCustomizer@Bean。此接口具有以下方法

/**
 * Configure the container.
 * @param container the container.
 * @param destinationName the destination name.
 * @param group the group.
 * @param dlqDestinationResolver a destination resolver for the dead letter topic (if
 * enableDlq).
 * @param backOff the backOff using retry properties (if configured).
 * @see #retryAndDlqInBinding(String, String)
 */
void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName, String group,
        @Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
        @Nullable BackOff backOff);

/**
 * Return false to move retries and DLQ from the binding to a customized error handler
 * using the retry metadata and/or a {@code DeadLetterPublishingRecoverer} when
 * configured via
 * {@link #configure(AbstractMessageListenerContainer, String, String, BiFunction, BackOff)}.
 * @param destinationName the destination name.
 * @param group the group.
 * @return true to disable retrie in the binding
 */
default boolean retryAndDlqInBinding(String destinationName, String group) {
    return true;
}

目标解析器和 BackOff 是根据绑定属性(如果已配置)创建的。然后,您可以使用它们来创建自定义错误处理器和死信发布器;例如

@Bean
ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> template) {
    return new ListenerContainerWithDlqAndRetryCustomizer() {

        @Override
        public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName,
                String group,
                @Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
                @Nullable BackOff backOff) {

            if (destinationName.equals("topicWithLongTotalRetryConfig")) {
                ConsumerRecordRecoverer dlpr = new DeadLetterPublishingRecoverer(template),
                        dlqDestinationResolver);
                container.setCommonErrorHandler(new DefaultErrorHandler(dlpr, backOff));
            }
        }

        @Override
        public boolean retryAndDlqInBinding(String destinationName, String group) {
            return !destinationName.contains("topicWithLongTotalRetryConfig");
        }

    };
}

现在,只需要一个重试延迟大于消费者的 max.poll.interval.ms 属性。

1.10. 自定义消费者和生产者配置

如果您想对用于在 Kafka 中创建 ConsumerFactoryProducerFactory 的消费者和生产者配置进行高级自定义,您可以实现以下定制器。

  • ConsumerConfigCustomizer

  • ProducerConfigCustomizer

这两个接口都提供了一种配置用于消费者和生产者属性的配置映射的方法。例如,如果您想访问在应用程序级别定义的 bean,您可以将其注入 configure 方法的实现中。当 binder 发现这些定制器可用作 bean 时,它将在创建消费者和生产者工厂之前调用 configure 方法。

这两个接口还提供了对绑定和目标名称的访问,以便在自定义生产者和消费者属性时可以访问它们。

1.11. 自定义 AdminClient 配置

与上述消费者和生产者配置定制一样,应用程序还可以通过提供 AdminClientConfigCustomizer 来定制 AdminClient 的配置。AdminClientConfigCustomizer 的 configure 方法提供了对 AdminClient 属性的访问,您可以使用这些属性进行进一步的定制。Binder 的 Kafka 主题供应器对此定制器提供的属性给予最高优先级。以下是提供此定制器 bean 的示例。

@Bean
public AdminClientConfigCustomizer adminClientConfigCustomizer() {
    return props -> {
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
    };
}

1.12. 自定义 Kafka Binder 健康指示器

当 Spring Boot Actuator 在类路径上时,Kafka binder 会激活一个默认的健康指示器。此健康指示器检查 binder 的健康状况以及与 Kafka 代理的任何通信问题。如果应用程序想要禁用此默认健康检查实现并包含自定义实现,则可以为 KafkaBinderHealth 接口提供一个实现。KafkaBinderHealth 是一个扩展自 HealthIndicator 的标记接口。在自定义实现中,它必须提供 health() 方法的实现。自定义实现必须以 bean 的形式存在于应用程序配置中。当 binder 发现自定义实现时,它将使用它而不是默认实现。以下是应用程序中此类自定义实现 bean 的示例。

@Bean
public KafkaBinderHealth kafkaBinderHealthIndicator() {
    return new KafkaBinderHealth() {
        @Override
        public Health health() {
            // custom implementation details.
        }
    };
}

1.13. 死信主题处理

1.13.1. 死信主题分区选择

默认情况下,记录发布到死信主题时使用与原始记录相同的分区。这意味着死信主题必须至少有与原始记录相同数量的分区。

要更改此行为,请将 DlqPartitionFunction 实现作为 @Bean 添加到应用程序上下文。只能存在一个这样的 bean。该函数将提供消费者组、失败的 ConsumerRecord 和异常。例如,如果您总是希望路由到分区 0,您可以使用

@Bean
public DlqPartitionFunction partitionFunction() {
    return (group, record, ex) -> 0;
}
如果您将消费者绑定的 dlqPartitions 属性设置为 1(并且绑定的 minPartitionCount 等于 1),则无需提供 DlqPartitionFunction;框架将始终使用分区 0。如果您将消费者绑定的 dlqPartitions 属性设置为大于 1 的值(或绑定的 minPartitionCount 大于 1),则您必须提供一个 DlqPartitionFunction bean,即使分区计数与原始主题相同。

还可以为 DLQ 主题定义自定义名称。为此,请将 DlqDestinationResolver 的实现作为 @Bean 创建到应用程序上下文。当 binder 检测到此类 bean 时,它将优先使用,否则它将使用 dlqName 属性。如果两者都未找到,它将默认为 error.<destination>.<group>。以下是 DlqDestinationResolver 作为 @Bean 的示例。

@Bean
public DlqDestinationResolver dlqDestinationResolver() {
    return (rec, ex) -> {
        if (rec.topic().equals("word1")) {
            return "topic1-dlq";
        }
        else {
            return "topic2-dlq";
        }
    };
}

在提供 DlqDestinationResolver 的实现时,需要记住一件重要的事情,那就是绑定器中的 provisioner 不会自动为应用程序创建主题。这是因为绑定器无法推断出实现可能发送到的所有 DLQ 主题的名称。因此,如果您使用此策略提供 DLQ 名称,应用程序有责任确保这些主题事先已创建。

1.13.2. 处理死信主题中的记录

由于框架无法预测用户希望如何处理死信消息,因此它不提供任何标准机制来处理它们。如果死信的原因是暂时的,您可能希望将消息路由回原始主题。但是,如果问题是永久性问题,那可能会导致无限循环。本主题中的示例 Spring Boot 应用程序是一个示例,说明如何将这些消息路由回原始主题,但在三次尝试后将其移动到“停放”主题。该应用程序是另一个从死信主题读取的 spring-cloud-stream 应用程序。当 5 秒内没有收到消息时,它会退出。

示例假设原始目标是 so8400out,消费者组是 so8400

有几种策略需要考虑

  • 考虑仅在主应用程序未运行时运行重定向。否则,瞬态错误的重试将很快耗尽。

  • 或者,采用两阶段方法:使用此应用程序路由到第三个主题,并使用另一个应用程序从该主题路由回主主题。

以下代码清单显示了示例应用程序

application.properties
spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400

spring.cloud.stream.bindings.output.destination=so8400out

spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot

spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest

spring.cloud.stream.kafka.binder.headers=x-retries
应用程序
@SpringBootApplication
@EnableBinding(TwoOutputProcessor.class)
public class ReRouteDlqKApplication implements CommandLineRunner {

    private static final String X_RETRIES_HEADER = "x-retries";

    public static void main(String[] args) {
        SpringApplication.run(ReRouteDlqKApplication.class, args).close();
    }

    private final AtomicInteger processed = new AtomicInteger();

    @Autowired
    private MessageChannel parkingLot;

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public Message<?> reRoute(Message<?> failed) {
        processed.incrementAndGet();
        Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
        if (retries == null) {
            System.out.println("First retry for " + failed);
            return MessageBuilder.fromMessage(failed)
                    .setHeader(X_RETRIES_HEADER, new Integer(1))
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build();
        }
        else if (retries.intValue() < 3) {
            System.out.println("Another retry for " + failed);
            return MessageBuilder.fromMessage(failed)
                    .setHeader(X_RETRIES_HEADER, new Integer(retries.intValue() + 1))
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build();
        }
        else {
            System.out.println("Retries exhausted for " + failed);
            parkingLot.send(MessageBuilder.fromMessage(failed)
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build());
        }
        return null;
    }

    @Override
    public void run(String... args) throws Exception {
        while (true) {
            int count = this.processed.get();
            Thread.sleep(5000);
            if (count == this.processed.get()) {
                System.out.println("Idle, exiting");
                return;
            }
        }
    }

    public interface TwoOutputProcessor extends Processor {

        @Output("parkingLot")
        MessageChannel parkingLot();

    }

}

1.14. 使用 Kafka Binder 进行分区

Apache Kafka 原生支持主题分区。

有时将数据发送到特定分区是有利的——例如,当您希望严格排序消息处理时(特定客户的所有消息都应发送到同一分区)。

以下示例演示了如何配置生产者和消费者端

@SpringBootApplication
@EnableBinding(Source.class)
public class KafkaPartitionProducerApplication {

    private static final Random RANDOM = new Random(System.currentTimeMillis());

    private static final String[] data = new String[] {
            "foo1", "bar1", "qux1",
            "foo2", "bar2", "qux2",
            "foo3", "bar3", "qux3",
            "foo4", "bar4", "qux4",
            };

    public static void main(String[] args) {
        new SpringApplicationBuilder(KafkaPartitionProducerApplication.class)
            .web(false)
            .run(args);
    }

    @InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "5000"))
    public Message<?> generate() {
        String value = data[RANDOM.nextInt(data.length)];
        System.out.println("Sending: " + value);
        return MessageBuilder.withPayload(value)
                .setHeader("partitionKey", value)
                .build();
    }

}
application.yml
spring:
  cloud:
    stream:
      bindings:
        output:
          destination: partitioned.topic
          producer:
            partition-key-expression: headers['partitionKey']
            partition-count: 12
主题必须配置足够的分区,以实现所有消费者组所需的并发性。上述配置支持多达 12 个消费者实例(如果其 concurrency 为 2,则为 6 个;如果其并发为 3,则为 4 个,依此类推)。通常最好“过度配置”分区,以允许未来增加消费者或并发性。
上述配置使用默认分区 (key.hashCode() % partitionCount)。这可能提供也可能不提供适当平衡的算法,具体取决于键值。您可以通过使用 partitionSelectorExpressionpartitionSelectorClass 属性来覆盖此默认值。

由于分区由 Kafka 原生处理,因此消费者端不需要特殊配置。Kafka 在实例之间分配分区。

以下 Spring Boot 应用程序监听 Kafka 流并将(到控制台)每条消息进入的分区 ID 打印出来

@SpringBootApplication
@EnableBinding(Sink.class)
public class KafkaPartitionConsumerApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(KafkaPartitionConsumerApplication.class)
            .web(false)
            .run(args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(@Payload String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
        System.out.println(in + " received from partition " + partition);
    }

}
application.yml
spring:
  cloud:
    stream:
      bindings:
        input:
          destination: partitioned.topic
          group: myGroup

您可以根据需要添加实例。Kafka 会重新平衡分区分配。如果实例计数(或 实例计数 * 并发数)超过分区数,则某些消费者将处于空闲状态。

2. Kafka Streams Binder

2.1. 用法

要使用 Kafka Streams binder,您只需将其添加到 Spring Cloud Stream 应用程序中,使用以下 maven 坐标

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>

快速启动 Kafka Streams binder 新项目的方法是使用 Spring Initializr,然后选择“Cloud Streams”和“Spring for Kafka Streams”,如下图所示

spring initializr kafka streams

2.2. 概览

Spring Cloud Stream 包含一个专门为 Apache Kafka Streams 绑定设计的 binder 实现。通过此原生集成,Spring Cloud Stream“处理器”应用程序可以直接在核心业务逻辑中使用 Apache Kafka Streams API。

Kafka Streams binder 实现建立在 Spring for Apache Kafka 项目提供的基础上。

Kafka Streams binder 为 Kafka Streams 中的三种主要类型提供绑定功能 - KStreamKTableGlobalKTable

Kafka Streams 应用程序通常遵循一种模型,其中从入站主题读取记录,应用业务逻辑,然后将转换后的记录写入出站主题。或者,也可以定义一个没有出站目标的处理器应用程序。

在以下部分中,我们将深入了解 Spring Cloud Stream 与 Kafka Streams 集成的详细信息。

2.3. 编程模型

当使用 Kafka Streams binder 提供的编程模型时,可以使用高级 Streams DSL,也可以混合使用高级和低级 Processor-API。当混合使用高级和低级 API 时,通常通过调用 KStream 上的 transformprocess API 方法来实现。

2.3.1. 函数式风格

从 Spring Cloud Stream 3.0.0 开始,Kafka Streams binder 允许应用程序使用 Java 8 中可用的函数式编程风格进行设计和开发。这意味着应用程序可以简洁地表示为 java.util.function.Functionjava.util.function.Consumer 类型的 lambda 表达式。

让我们看一个非常基本的例子。

@SpringBootApplication
public class SimpleConsumerApplication {

    @Bean
    public java.util.function.Consumer<KStream<Object, String>> process() {

        return input ->
                input.foreach((key, value) -> {
                    System.out.println("Key: " + key + " Value: " + value);
                });
    }
}

尽管简单,但这是一个完整的独立 Spring Boot 应用程序,它利用 Kafka Streams 进行流处理。这是一个没有出站绑定且只有一个入站绑定的消费者应用程序。该应用程序消费数据,并简单地将 KStream 键和值的信息记录到标准输出。该应用程序包含 SpringBootApplication 注解和一个标记为 Bean 的方法。bean 方法的类型是 java.util.function.Consumer,它被 KStream 参数化。然后,在实现中,我们返回一个 Consumer 对象,它本质上是一个 lambda 表达式。在 lambda 表达式内部,提供了处理数据的代码。

在此应用程序中,有一个类型为 KStream 的单一输入绑定。binder 为应用程序创建此绑定,名称为 process-in-0,即函数 bean 名称后跟一个短划线 (-) 和字面量 in,然后是另一个短划线,最后是参数的序数位置。您可以使用此绑定名称来设置其他属性,例如目标。例如,spring.cloud.stream.bindings.process-in-0.destination=my-topic

如果未在绑定上设置目标属性,则会创建一个与绑定同名的主题(如果应用程序具有足够的权限),或者该主题预期已可用。

一旦构建为 uber-jar(例如,kstream-consumer-app.jar),您可以像下面这样运行上述示例。

如果应用程序选择使用 Spring 的 Component 注解定义函数式 bean,则 binder 也支持该模型。上述函数式 bean 可以改写如下。

@Component(name = "process")
public class SimpleConsumer implements java.util.function.Consumer<KStream<Object, String>> {

    @Override
    public void accept(KStream<Object, String> input) {
        input.foreach((key, value) -> {
            System.out.println("Key: " + key + " Value: " + value);
        });
    }
}
java -jar kstream-consumer-app.jar --spring.cloud.stream.bindings.process-in-0.destination=my-topic

这里是另一个示例,它是一个完整的处理器,既有输入绑定又有输出绑定。这是一个经典的词频统计示例,其中应用程序从主题接收数据,然后计算每个单词在滚动时间窗口中的出现次数。

@SpringBootApplication
public class WordCountProcessorApplication {

  @Bean
  public Function<KStream<Object, String>, KStream<?, WordCount>> process() {

    return input -> input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .map((key, value) -> new KeyValue<>(value, value))
                .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
                .windowedBy(TimeWindows.of(5000))
                .count(Materialized.as("word-counts-state-store"))
                .toStream()
                .map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,
                        new Date(key.window().start()), new Date(key.window().end()))));
  }

	public static void main(String[] args) {
		SpringApplication.run(WordCountProcessorApplication.class, args);
	}
}

这里又是一个完整的 Spring Boot 应用程序。与第一个应用程序的不同之处在于,bean 方法的类型是 java.util.function.FunctionFunction 的第一个参数化类型用于输入 KStream,第二个参数化类型用于输出。在方法体中,提供了一个类型为 Function 的 lambda 表达式,并作为实现,给出了实际的业务逻辑。类似于前面讨论的基于 Consumer 的应用程序,此处的输入绑定默认命名为 process-in-0。对于输出,绑定名称也自动设置为 process-out-0

一旦构建为 uber-jar(例如,wordcount-processor.jar),您可以像下面这样运行上述示例。

java -jar wordcount-processor.jar --spring.cloud.stream.bindings.process-in-0.destination=words --spring.cloud.stream.bindings.process-out-0.destination=counts

此应用程序将从 Kafka 主题 words 消费消息,并将计算结果发布到输出主题 counts

Spring Cloud Stream 将确保传入和传出主题的消息都自动绑定为 KStream 对象。作为开发人员,您可以专注于代码的业务方面,即编写处理器中所需的逻辑。设置 Kafka Streams 基础设施所需的 Kafka Streams 特定配置由框架自动处理。

我们上面看到的两个示例都有一个单一的 KStream 输入绑定。在这两种情况下,绑定都从一个主题接收记录。如果您想将多个主题复用到单个 KStream 绑定中,您可以在下面提供逗号分隔的 Kafka 主题作为目标。

spring.cloud.stream.bindings.process-in-0.destination=topic-1,topic-2,topic-3

此外,如果您希望将主题与正则表达式匹配,您还可以提供主题模式作为目标。

spring.cloud.stream.bindings.process-in-0.destination=input.*

多个输入绑定

许多非平凡的 Kafka Streams 应用程序通常通过多个绑定从多个主题消费数据。例如,一个主题作为 Kstream 消费,另一个主题作为 KTableGlobalKTable 消费。应用程序可能希望将数据作为表类型接收的原因有很多。考虑一种用例,其中底层主题通过数据库的变更数据捕获 (CDC) 机制填充,或者应用程序只关心用于下游处理的最新更新。如果应用程序指定需要将数据绑定为 KTableGlobalKTable,则 Kafka Streams binder 将正确地将目标绑定到 KTableGlobalKTable,并使其可用于应用程序操作。我们将看看 Kafka Streams binder 中处理多个输入绑定的几种不同场景。

Kafka Streams Binder 中的 BiFunction

这是一个我们有两个输入和一个输出的示例。在这种情况下,应用程序可以利用 java.util.function.BiFunction

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
    return (userClicksStream, userRegionsTable) -> (userClicksStream
            .leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
                            "UNKNOWN" : region, clicks),
                    Joined.with(Serdes.String(), Serdes.Long(), null))
            .map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
                    regionWithClicks.getClicks()))
            .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
            .reduce(Long::sum)
            .toStream());
}

这里,基本主题与前面的示例相同,但我们有两个输入。Java 的 BiFunction 支持用于将输入绑定到所需的目标。binder 为输入生成的默认绑定名称分别是 process-in-0process-in-1。默认输出绑定是 process-out-0。在此示例中,BiFunction 的第一个参数绑定为第一个输入的 KStream,第二个参数绑定为第二个输入的 KTable

Kafka Streams Binder 中的 BiConsumer

如果存在两个输入但没有输出,在这种情况下,我们可以使用 java.util.function.BiConsumer,如下所示。

@Bean
public BiConsumer<KStream<String, Long>, KTable<String, String>> process() {
    return (userClicksStream, userRegionsTable) -> {}
}
超过两个输入

如果您的输入超过两个怎么办?在某些情况下,您需要两个以上的输入。在这种情况下,binder 允许您链接部分函数。在函数式编程术语中,这种技术通常称为柯里化。随着 Java 8 中添加的函数式编程支持,Java 现在允许您编写柯里化函数。Spring Cloud Stream Kafka Streams binder 可以利用此功能来启用多个输入绑定。

让我们看一个例子。

@Bean
public Function<KStream<Long, Order>,
        Function<GlobalKTable<Long, Customer>,
                Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {

    return orders -> (
              customers -> (
                    products -> (
                        orders.join(customers,
                            (orderId, order) -> order.getCustomerId(),
                                (order, customer) -> new CustomerOrder(customer, order))
                                .join(products,
                                        (orderId, customerOrder) -> customerOrder
                                                .productId(),
                                        (customerOrder, product) -> {
                                            EnrichedOrder enrichedOrder = new EnrichedOrder();
                                            enrichedOrder.setProduct(product);
                                            enrichedOrder.setCustomer(customerOrder.customer);
                                            enrichedOrder.setOrder(customerOrder.order);
                                            return enrichedOrder;
                                        })
                        )
                )
    );
}

让我们看看上面介绍的绑定模型的细节。在这个模型中,我们在入站有 3 个部分应用的函数。我们称它们为 f(x)f(y)f(z)。如果我们在真实数学函数的意义上展开这些函数,它将看起来像这样:f(x) → (fy) → f(z) → KStream<Long, EnrichedOrder>x 变量代表 KStream<Long, Order>y 变量代表 GlobalKTable<Long, Customer>z 变量代表 GlobalKTable<Long, Product>。第一个函数 f(x) 具有应用程序的第一个输入绑定 (KStream<Long, Order>),其输出是函数 f(y)。函数 f(y) 具有应用程序的第二个输入绑定 (GlobalKTable<Long, Customer>),其输出是另一个函数 f(z)。函数 f(z) 的输入是应用程序的第三个输入 (GlobalKTable<Long, Product>),其输出是 KStream<Long, EnrichedOrder>,这是应用程序的最终输出绑定。来自三个部分函数的输入,它们分别是 KStreamGlobalKTableGlobalKTable,在方法体中可供您用于实现作为 lambda 表达式一部分的业务逻辑。

输入绑定分别命名为 enrichOrder-in-0enrichOrder-in-1enrichOrder-in-2。输出绑定命名为 enrichOrder-out-0

使用柯里化函数,您实际上可以拥有任意数量的输入。但是,请记住,如果输入数量过多,并且像上面 Java 中那样部分应用的函数过多,可能会导致代码难以阅读。因此,如果您的 Kafka Streams 应用程序需要超过合理数量的输入绑定,并且您想使用此函数模型,那么您可能需要重新考虑您的设计并适当地分解应用程序。

输出绑定

Kafka Streams binder 允许 KStreamKTable 类型作为输出绑定。在幕后,binder 使用 KStream 上的 to 方法将结果记录发送到输出主题。如果应用程序在函数中提供 KTable 作为输出,binder 仍然通过委托给 KStreamto 方法来使用此技术。

例如,以下两个函数都将起作用

@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
    return KStream::toTable;
    };
}

@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
    return KTable::toStream;
}
多个输出绑定

Kafka Streams 允许将出站数据写入多个主题。此功能在 Kafka Streams 中称为分支。当使用多个输出绑定时,您需要提供一个 KStream 数组 (KStream[]) 作为出站返回类型。

这是一个例子

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>[]> process() {

    Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
    Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
    Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");

    return input -> {
        final Map<String, KStream<Object, WordCount>> stringKStreamMap = input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .groupBy((key, value) -> value)
                .windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
                .count(Materialized.as("WordCounts-branch"))
                .toStream()
                .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
                        new Date(key.window().start()), new Date(key.window().end()))))
                .split()
                .branch(isEnglish)
                .branch(isFrench)
                .branch(isSpanish)
                .noDefaultBranch();

        return stringKStreamMap.values().toArray(new KStream[0]);
    };
}

编程模型保持不变,但出站参数化类型为 KStream[]。对于上述函数,默认的输出绑定名称分别是 process-out-0process-out-1process-out-2。binder 生成三个输出绑定的原因是它检测到返回的 KStream 数组长度为三。请注意,在此示例中,我们提供了 noDefaultBranch();如果我们改为使用 defaultBranch(),那将需要一个额外的输出绑定,本质上返回一个长度为四的 KStream 数组。

Kafka Streams 函数式编程风格总结

总而言之,下表显示了函数式范式中可以使用的各种选项。

输入数量 输出数量 要使用的组件

1

0

java.util.function.Consumer

2

0

java.util.function.BiConsumer

1

1..n

java.util.function.Function

2

1..n

java.util.function.BiFunction

>= 3

0..n

使用柯里化函数

  • 在此表中,如果输出超过一个,则类型简单地变为 KStream[]

Kafka Streams binder 中的函数组合

Kafka Streams binder 支持线性拓扑的最小形式的函数组合。使用 Java 函数式 API 支持,您可以编写多个函数,然后使用 andThen 方法自行组合它们。例如,假设您有以下两个函数。

@Bean
public Function<KStream<String, String>, KStream<String, String>> foo() {
    return input -> input.peek((s, s2) -> {});
}

@Bean
public Function<KStream<String, String>, KStream<String, Long>> bar() {
    return input -> input.peek((s, s2) -> {});
}

即使没有 binder 中的函数组合支持,您也可以如下组合这两个函数。

@Bean
pubic Funcion<KStream<String, String>, KStream<String, Long>> composed() {
    foo().andThen(bar());
}

然后您可以提供 spring.cloud.stream.function.definition=foo;bar;composed 形式的定义。有了 binder 中的函数组合支持,您就不需要编写第三个进行显式函数组合的函数。

您可以简单地这样做

spring.cloud.stream.function.definition=foo|bar

您甚至可以这样做

spring.cloud.stream.function.definition=foo|bar;foo;bar

在此示例中,组合函数的默认绑定名称变为 foobar-in-0foobar-out-0

Kafka Streams bincer 中函数组合的限制

当您拥有 java.util.function.Function bean 时,它可以与另一个函数或多个函数组合。相同的函数 bean 也可以与 java.util.function.Consumer 组合。在这种情况下,消费者是最后一个组合的组件。一个函数可以与多个函数组合,然后以 java.util.function.Consumer bean 结尾。

当组合类型为 java.util.function.BiFunction 的 bean 时,BiFunction 必须是定义中的第一个函数。组合实体必须是 java.util.function.Functionjava.util.function.Consumer 类型。换句话说,您不能接受一个 BiFunction bean,然后与另一个 BiFunction 组合。

您不能与 BiConsumer 类型或 Consumer 是第一个组件的定义进行组合。您也不能与输出为数组(用于分支的 KStream[])的函数进行组合,除非这是定义中的最后一个组件。

函数定义中的第一个 FunctionBiFunction 也可以使用柯里化形式。例如,以下是可能的。

@Bean
public Function<KStream<String, String>, Function<KTable<String, String>, KStream<String, String>>> curriedFoo() {
    return a -> b ->
            a.join(b, (value1, value2) -> value1 + value2);
}

@Bean
public Function<KStream<String, String>, KStream<String, String>> bar() {
    return input -> input.mapValues(value -> value + "From-anotherFooFunc");
}

函数定义可以是 curriedFoo|bar。在幕后,binder 将为柯里化函数创建两个输入绑定,并根据定义中的最终函数创建一个输出绑定。在这种情况下,默认输入绑定将是 curriedFoobar-in-0curriedFoobar-in-1。此示例的默认输出绑定将是 curriedFoobar-out-0

函数组合中使用 KTable 作为输出的特别说明

假设您有两个函数。

@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
    return KStream::toTable;
    };
}

@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
    return KTable::toStream;
}

您可以将它们组合为 foo|bar,但请记住,第二个函数(本例中的 bar)必须具有 KTable 作为输入,因为第一个函数(foo)具有 KTable 作为输出。

2.3.2. 命令式编程模型。

从 binder 的 3.1.0 版本开始,我们建议对基于 Kafka Streams binder 的应用程序使用上面描述的函数式编程模型。从 Spring Cloud Stream 的 3.1.0 版本开始,对 StreamListener 的支持已弃用。下面,我们将提供一些关于基于 StreamListener 的 Kafka Streams 处理器的详细信息作为参考。

以下是使用 StreamListener 的词频统计示例的等效代码。

@SpringBootApplication
@EnableBinding(KafkaStreamsProcessor.class)
public class WordCountProcessorApplication {

    @StreamListener("input")
    @SendTo("output")
    public KStream<?, WordCount> process(KStream<?, String> input) {
        return input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .groupBy((key, value) -> value)
                .windowedBy(TimeWindows.of(5000))
                .count(Materialized.as("WordCounts-multi"))
                .toStream()
                .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))));
    }

    public static void main(String[] args) {
        SpringApplication.run(WordCountProcessorApplication.class, args);
    }

如您所见,这有点冗长,因为您需要提供 EnableBinding 和其他额外的注解,如 StreamListenerSendTo 才能使其成为一个完整的应用程序。EnableBinding 是您指定包含绑定的绑定接口的地方。在此示例中,我们使用包含以下契约的库存 KafkaStreamsProcessor 绑定接口。

public interface KafkaStreamsProcessor {

	@Input("input")
	KStream<?, ?> input();

	@Output("output")
	KStream<?, ?> output();

}

Binder 将为输入的 KStream 和输出的 KStream 创建绑定,因为您正在使用包含这些声明的绑定接口。

除了函数式编程模型中提供的明显差异外,这里需要特别提到的一点是,绑定名称是您在绑定接口中指定的名称。例如,在上述应用程序中,由于我们使用的是 KafkaStreamsProcessor,因此绑定名称是 inputoutput。绑定属性需要使用这些名称。例如 spring.cloud.stream.bindings.input.destinationspring.cloud.stream.bindings.output.destination 等。请记住,这与函数式编程模型有着根本区别,因为在函数式编程模型中,binder 为应用程序生成绑定名称。这是因为应用程序在不使用 EnableBinding 的函数式模型中不提供任何绑定接口。

这是另一个有两个输入的 sink 示例。

@EnableBinding(KStreamKTableBinding.class)
.....
.....
@StreamListener
public void process(@Input("inputStream") KStream<String, PlayEvent> playEvents,
                    @Input("inputTable") KTable<Long, Song> songTable) {
                    ....
                    ....
}

interface KStreamKTableBinding {

    @Input("inputStream")
    KStream<?, ?> inputStream();

    @Input("inputTable")
    KTable<?, ?> inputTable();
}

以下是上面看到的基于 BiFunction 的处理器的 StreamListener 等效项。

@EnableBinding(KStreamKTableBinding.class)
....
....

@StreamListener
@SendTo("output")
public KStream<String, Long> process(@Input("input") KStream<String, Long> userClicksStream,
                                     @Input("inputTable") KTable<String, String> userRegionsTable) {
....
....
}

interface KStreamKTableBinding extends KafkaStreamsProcessor {

    @Input("inputX")
    KTable<?, ?> inputTable();
}

最后,这是具有三个输入和柯里化函数的应用程序的 StreamListener 等效项。

@EnableBinding(CustomGlobalKTableProcessor.class)
...
...
    @StreamListener
    @SendTo("output")
    public KStream<Long, EnrichedOrder> process(
            @Input("input-1") KStream<Long, Order> ordersStream,
            @Input("input-2") GlobalKTable<Long, Customer> customers,
            @Input("input-3") GlobalKTable<Long, Product> products) {

        KStream<Long, CustomerOrder> customerOrdersStream = ordersStream.join(
                customers, (orderId, order) -> order.getCustomerId(),
                (order, customer) -> new CustomerOrder(customer, order));

        return customerOrdersStream.join(products,
                (orderId, customerOrder) -> customerOrder.productId(),
                (customerOrder, product) -> {
                    EnrichedOrder enrichedOrder = new EnrichedOrder();
                    enrichedOrder.setProduct(product);
                    enrichedOrder.setCustomer(customerOrder.customer);
                    enrichedOrder.setOrder(customerOrder.order);
                    return enrichedOrder;
                });
        }

    interface CustomGlobalKTableProcessor {

            @Input("input-1")
            KStream<?, ?> input1();

            @Input("input-2")
            GlobalKTable<?, ?> input2();

            @Input("input-3")
            GlobalKTable<?, ?> input3();

            @Output("output")
            KStream<?, ?> output();
    }

您可能会注意到,上面两个示例更加冗长,因为除了提供 EnableBinding 之外,您还需要编写自己的自定义绑定接口。使用函数式模型,您可以避免所有这些繁琐的细节。

在继续研究 Kafka Streams binder 提供的通用编程模型之前,这里是多个输出绑定的 StreamListener 版本。

EnableBinding(KStreamProcessorWithBranches.class)
public static class WordCountProcessorApplication {

    @Autowired
    private TimeWindows timeWindows;

    @StreamListener("input")
    @SendTo({"output1","output2","output3"})
    public KStream<?, WordCount>[] process(KStream<Object, String> input) {

			Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
			Predicate<Object, WordCount> isFrench =  (k, v) -> v.word.equals("french");
			Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");

			return input
					.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
					.groupBy((key, value) -> value)
					.windowedBy(timeWindows)
					.count(Materialized.as("WordCounts-1"))
					.toStream()
					.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value, new Date(key.window().start()), new Date(key.window().end()))))
					.branch(isEnglish, isFrench, isSpanish);
    }

    interface KStreamProcessorWithBranches {

    		@Input("input")
    		KStream<?, ?> input();

    		@Output("output1")
    		KStream<?, ?> output1();

    		@Output("output2")
    		KStream<?, ?> output2();

    		@Output("output3")
    		KStream<?, ?> output3();
    	}
}

总结一下,我们回顾了使用 Kafka Streams binder 时的各种编程模型选择。

binder 为输入的 KStreamKTableGlobalKTable 提供绑定功能。KTableGlobalKTable 绑定仅在输入端可用。Binder 支持 KStream 的输入和输出绑定。

Kafka Streams binder 编程模型的优点是,binder 为您提供了选择完全函数式编程模型或使用基于 StreamListener 的命令式方法的灵活性。

2.4. 编程模型的辅助功能

2.4.1. 单个应用程序中的多个 Kafka Streams 处理器

Binder 允许在一个 Spring Cloud Stream 应用程序中包含多个 Kafka Streams 处理器。您可以拥有如下应用程序。

@Bean
public java.util.function.Function<KStream<Object, String>, KStream<Object, String>> process() {
   ...
}

@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
  ...
}

@Bean
public java.util.function.BiFunction<KStream<Object, String>, KTable<Integer, String>, KStream<Object, String>> yetAnotherProcess() {
   ...
}

在这种情况下,binder 将创建 3 个独立的 Kafka Streams 对象,它们具有不同的应用程序 ID(下面将详细介绍)。但是,如果应用程序中有多个处理器,您必须告诉 Spring Cloud Stream 需要激活哪些函数。以下是激活函数的方法。

spring.cloud.stream.function.definition: process;anotherProcess;yetAnotherProcess

如果您希望某些功能不立即激活,可以将其从列表中删除。

当您在同一个应用程序中拥有一个 Kafka Streams 处理器和通过不同 binder 处理的其他类型的 Function bean 时,这也是如此(例如,一个基于常规 Kafka 消息通道 binder 的函数 bean)。

2.4.2. Kafka Streams 应用程序 ID

应用程序 ID 是 Kafka Streams 应用程序必须提供的属性。Spring Cloud Stream Kafka Streams binder 允许您以多种方式配置此应用程序 ID。

如果应用程序中只有一个处理器或 StreamListener,则可以使用以下属性在 binder 级别设置此项

spring.cloud.stream.kafka.streams.binder.applicationId.

为了方便起见,如果您只有一个处理器,您还可以使用 spring.application.name 作为属性来委托应用程序 ID。

如果应用程序中有多个 Kafka Streams 处理器,则需要为每个处理器设置应用程序 ID。在函数式模型中,您可以将其作为属性附加到每个函数。

例如,假设您有以下函数。

@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
   ...
}

@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
  ...
}

然后,您可以使用以下 binder 级别属性为每个函数设置应用程序 ID。

spring.cloud.stream.kafka.streams.binder.functions.process.applicationId

spring.cloud.stream.kafka.streams.binder.functions.anotherProcess.applicationId

对于 StreamListener,您需要在处理器的第一个输入绑定上设置此项。

例如,假设您有以下两个基于 StreamListener 的处理器。

@StreamListener
@SendTo("output")
public KStream<String, String> process(@Input("input") <KStream<Object, String>> input) {
   ...
}

@StreamListener
@SendTo("anotherOutput")
public KStream<String, String> anotherProcess(@Input("anotherInput") <KStream<Object, String>> input) {
   ...
}

然后,您必须使用以下绑定属性设置此应用程序 ID。

spring.cloud.stream.kafka.streams.bindings.input.consumer.applicationId

spring.cloud.stream.kafka.streams.bindings.anotherInput.consumer.applicationId

对于基于函数式模型,这种在绑定级别设置应用程序 ID 的方法也适用。但是,如果您使用函数式模型,如上所述在 binder 级别为每个函数设置应用程序 ID 会更容易。

对于生产部署,强烈建议通过配置显式指定应用程序 ID。如果您正在自动扩展应用程序,则这一点尤为关键,在这种情况下,您需要确保部署的每个实例都具有相同的应用程序 ID。

如果应用程序未提供应用程序 ID,则 binder 将为您自动生成一个静态应用程序 ID。这在开发场景中很方便,因为它避免了显式提供应用程序 ID 的需要。以这种方式生成的应用程序 ID 在应用程序重启后将是静态的。在函数式模型中,生成的应用程序 ID 将是函数 bean 名称后跟字面量 applicationID,例如,如果 process 是函数 bean 名称,则为 process-applicationID。对于 StreamListener,生成的应用程序 ID 将使用包含类名称后跟方法名称,然后是字面量 applicationId,而不是使用函数 bean 名称。

设置应用程序 ID 总结
  • 默认情况下,binder 将为每个函数或 StreamListener 方法自动生成应用程序 ID。

  • 如果您有一个处理器,则可以使用 spring.kafka.streams.applicationIdspring.application.namespring.cloud.stream.kafka.streams.binder.applicationId

  • 如果您有多个处理器,则可以使用属性 spring.cloud.stream.kafka.streams.binder.functions.<function-name>.applicationId 为每个函数设置应用程序 ID。对于 StreamListener,可以使用 spring.cloud.stream.kafka.streams.bindings.input.applicationId 完成此操作,假设输入绑定名称为 input

2.4.3. 使用函数式风格覆盖 binder 生成的默认绑定名称

默认情况下,当使用函数式样式时,binder 使用上述策略生成绑定名称,即 <function-bean-name>-<in>|<out>-[0..n],例如 process-in-0、process-out-0 等。如果您想覆盖这些绑定名称,可以通过指定以下属性来完成。

spring.cloud.stream.function.bindings.<default binding name>。默认绑定名称是 binder 生成的原始绑定名称。

例如,假设您有这个函数。

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}

Binder 将生成名称为 process-in-0process-in-1process-out-0 的绑定。现在,如果您想将它们完全更改为其他名称,可能是更具领域特定性的绑定名称,那么您可以如下操作。

spring.cloud.stream.function.bindings.process-in-0=users

spring.cloud.stream.function.bindings.process-in-0=regions

spring.cloud.stream.function.bindings.process-out-0=clicks

之后,您必须将所有绑定级别属性设置到这些新的绑定名称上。

请记住,对于上面描述的函数式编程模型,在大多数情况下,遵守默认绑定名称是有意义的。您可能仍然想要进行此覆盖的唯一原因是当您有大量配置属性并且希望将绑定映射到更领域友好的名称时。

2.4.4. 设置引导服务器配置

运行 Kafka Streams 应用程序时,必须提供 Kafka broker 服务器信息。如果您不提供此信息,binder 期望您在默认的 localhost:9092 上运行 broker。如果不是这种情况,则需要覆盖它。有几种方法可以做到这一点。

  • 使用 boot 属性 - spring.kafka.bootstrapServers

  • Binder 级别属性 - spring.cloud.stream.kafka.streams.binder.brokers

对于 binder 级别属性,无论您是使用通过常规 Kafka binder 提供的 broker 属性 - spring.cloud.stream.kafka.binder.brokers 都没有关系。Kafka Streams binder 将首先检查是否设置了 Kafka Streams binder 特定 broker 属性(spring.cloud.stream.kafka.streams.binder.brokers),如果未找到,则查找 spring.cloud.stream.kafka.binder.brokers

2.5. 记录序列化和反序列化

Kafka Streams binder 允许您以两种方式序列化和反序列化记录。一种是 Kafka 提供的原生序列化和反序列化功能,另一种是 Spring Cloud Stream 框架的消息转换功能。让我们看一些细节。

2.5.1. 入站反序列化

键始终使用原生 Serdes 反序列化。

对于值,默认情况下,入站反序列化由 Kafka 原生执行。请注意,这是 Kafka Streams binder 以前版本默认行为的重大更改,以前的反序列化是由框架完成的。

Kafka Streams binder 将尝试通过查看 java.util.function.Function|ConsumerStreamListener 的类型签名来推断匹配的 Serde 类型。以下是它匹配 Serdes 的顺序。

  • 如果应用程序提供了 Serde 类型的 bean,并且返回类型使用传入键或值的实际类型参数化,则它将使用该 Serde 进行入站反序列化。例如,如果应用程序中存在以下内容,则 binder 会检测到 KStream 的传入值类型与 Serde bean 参数化的类型匹配。它将使用该类型进行入站反序列化。

@Bean
public Serde<Foo> customSerde() {
 ...
}

@Bean
public Function<KStream<String, Foo>, KStream<String, Foo>> process() {
}
  • 接下来,它查看类型并查看它们是否是 Kafka Streams 公开的类型之一。如果是,则使用它们。以下是 binder 将尝试从 Kafka Streams 匹配的 Serde 类型。

    Integer, Long, Short, Double, Float, byte[], UUID and String.
  • 如果 Kafka Streams 提供的 Serde 都不匹配这些类型,那么它将使用 Spring Kafka 提供的 JsonSerde。在这种情况下,binder 假定这些类型是 JSON 友好的。这在您有多个值对象作为输入时很有用,因为 binder 将在内部将它们推断为正确的 Java 类型。但在回退到 JsonSerde 之前,binder 会检查 Kafka Streams 配置中设置的默认 Serde`s,以查看它是否是可以与传入 KStream 的类型匹配的 `Serde

如果上述策略均未奏效,则应用程序必须通过配置提供 Serde。这可以通过两种方式配置 - 绑定或默认。

首先,binder 将检查是否在绑定级别提供了 Serde。例如,如果您有以下处理器,

@Bean
public BiFunction<KStream<CustomKey, AvroIn1>, KTable<CustomKey, AvroIn2>, KStream<CustomKey, AvroOutput>> process() {...}

然后,您可以使用以下方式提供绑定级别 Serde

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果您如上所述为每个输入绑定提供 Serde,那么这将具有更高的优先级,并且 binder 将避免任何 Serde 推理。

如果您希望将默认的键/值 Serdes 用于入站反序列化,则可以在 binder 级别进行设置。

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde

如果您不希望使用 Kafka 提供的原生解码,则可以依赖 Spring Cloud Stream 提供的消息转换功能。由于原生解码是默认设置,为了让 Spring Cloud Stream 反序列化入站值对象,您需要显式禁用原生解码。

例如,如果您拥有与上面相同的 BiFunction 处理器,那么 spring.cloud.stream.bindings.process-in-0.consumer.nativeDecoding: false。您需要单独禁用所有输入的原生解码。否则,对于那些您未禁用的输入,原生解码仍将适用。

默认情况下,Spring Cloud Stream 将使用 application/json 作为内容类型并使用适当的 json 消息转换器。您可以通过使用以下属性和适当的 MessageConverter bean 来使用自定义消息转换器。

spring.cloud.stream.bindings.process-in-0.contentType

2.5.2. 出站序列化

出站序列化与上述入站反序列化遵循相同的规则。与入站反序列化一样,Spring Cloud Stream 以前版本的一个主要变化是出站序列化由 Kafka 原生处理。在 binder 的 3.0 版本之前,这是由框架本身完成的。

出站键始终由 Kafka 使用 binder 推断的匹配 Serde 进行序列化。如果它无法推断键的类型,则需要通过配置指定。

值 Serdes 使用与入站反序列化相同的规则进行推断。首先,它匹配以查看出站类型是否来自应用程序中提供的 bean。如果不是,它会检查是否与 Kafka 公开的 Serde 匹配,例如 - IntegerLongShortDoubleFloatbyte[]UUIDString。如果不起作用,它会回退到 Spring Kafka 项目提供的 JsonSerde,但首先会查看默认的 Serde 配置以查看是否有匹配项。请记住,所有这些对应用程序都是透明的。如果这些都不起作用,则用户必须通过配置提供要使用的 Serde

假设您正在使用与上面相同的 BiFunction 处理器。然后,您可以按如下方式配置出站键/值 Serdes。

spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

如果 Serde 推理失败,并且没有提供绑定级别的 Serdes,那么 binder 将回退到 JsonSerde,但会查看默认的 Serdes 以进行匹配。

默认 Serdes 的配置方式与上面在反序列化部分描述的方式相同。

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde

如果您的应用程序使用分支功能并具有多个输出绑定,则需要为每个绑定配置这些功能。同样,如果 binder 能够推断 Serde 类型,则无需进行此配置。

如果您不希望使用 Kafka 提供的原生编码,而是希望使用框架提供的消息转换,那么您需要显式禁用原生编码,因为原生编码是默认设置。例如,如果您有与上面相同的 BiFunction 处理器,那么 spring.cloud.stream.bindings.process-out-0.producer.nativeEncoding: false。在分支的情况下,您需要单独禁用所有输出的原生编码。否则,对于那些您未禁用的输出,原生编码仍将适用。

当由 Spring Cloud Stream 完成转换时,默认情况下,它将使用 application/json 作为内容类型并使用适当的 json 消息转换器。您可以通过使用以下属性和相应的 MessageConverter bean 来使用自定义消息转换器。

spring.cloud.stream.bindings.process-out-0.contentType

当禁用原生编码/解码时,binder 不会像原生 Serdes 那样进行任何推断。应用程序需要显式提供所有配置选项。因此,建议在编写 Spring Cloud Stream Kafka Streams 应用程序时,通常坚持使用默认的反序列化选项并坚持使用 Kafka Streams 提供的原生反序列化。您必须使用框架提供的消息转换能力的唯一场景是当您的上游生产者使用特定的序列化策略时。在这种情况下,您希望使用匹配的反序列化策略,因为原生机制可能会失败。当依赖默认的 Serde 机制时,应用程序必须确保 binder 能够正确地将入站和出站映射到适当的 Serde,否则可能会失败。

值得一提的是,上面概述的数据解/序列化方法仅适用于处理器的边缘,即入站和出站。您的业务逻辑可能仍需要调用需要显式 Serde 对象的 Kafka Streams API。这些仍然是应用程序的责任,必须由开发人员相应地处理。

2.6. 错误处理

Apache Kafka Streams 提供了原生处理反序列化错误的异常处理能力。有关此支持的详细信息,请参阅 文档。Apache Kafka Streams 开箱即用地提供了两种反序列化异常处理器 - LogAndContinueExceptionHandlerLogAndFailExceptionHandler。顾名思义,前者将记录错误并继续处理下一个记录,后者将记录错误并失败。LogAndFailExceptionHandler 是默认的反序列化异常处理器。

2.6.1. 处理 Binder 中的反序列化异常

Kafka Streams binder 允许使用以下属性指定上述反序列化异常处理器。

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndContinue

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndFail

除了上述两种反序列化异常处理器外,binder 还提供了第三种,用于将错误记录(毒丸)发送到 DLQ(死信队列)主题。以下是启用此 DLQ 异常处理器的方法。

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: sendToDlq

设置上述属性后,所有反序列化错误的记录将自动发送到 DLQ 主题。

您可以如下设置发布 DLQ 消息的主题名称。

您可以提供 DlqDestinationResolver 的实现,它是一个函数式接口。DlqDestinationResolverConsumerRecord 和异常作为输入,然后允许指定主题名称作为输出。通过访问 Kafka ConsumerRecord,可以在 BiFunction 的实现中检查头部记录。

以下是提供 DlqDestinationResolver 实现的示例。

@Bean
public DlqDestinationResolver dlqDestinationResolver() {
    return (rec, ex) -> {
        if (rec.topic().equals("word1")) {
            return "topic1-dlq";
        }
        else {
            return "topic2-dlq";
        }
    };
}

在提供 DlqDestinationResolver 的实现时,需要记住一件重要的事情,那就是绑定器中的 provisioner 不会自动为应用程序创建主题。这是因为绑定器无法推断出实现可能发送到的所有 DLQ 主题的名称。因此,如果您使用此策略提供 DLQ 名称,应用程序有责任确保这些主题事先已创建。

如果应用程序中存在 DlqDestinationResolver bean,则它具有更高的优先级。如果您不想遵循此方法,而是希望通过配置提供静态 DLQ 名称,则可以设置以下属性。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.dlqName: custom-dlq (Change the binding name accordingly)

如果设置了此项,则错误记录将发送到主题 custom-dlq。如果应用程序未使用上述任何一种策略,则它将创建一个名为 error.<input-topic-name>.<application-id> 的 DLQ 主题。例如,如果您的绑定的目标主题是 inputTopic,应用程序 ID 是 process-applicationId,则默认 DLQ 主题是 error.inputTopic.process-applicationId。如果您打算启用 DLQ,始终建议为每个输入绑定显式创建一个 DLQ 主题。

2.6.2. 每个输入消费者绑定的 DLQ

属性 spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler 适用于整个应用程序。这意味着如果同一应用程序中有多个函数或 StreamListener 方法,此属性将应用于所有这些方法。但是,如果一个处理器中有多个处理器或多个输入绑定,则可以使用 binder 为每个输入消费者绑定提供的更细粒度的 DLQ 控制。

如果您有以下处理器,

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}

并且您只想在第一个输入绑定上启用 DLQ,在第二个绑定上跳过并继续,那么您可以在消费者上按如下操作。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler: sendToDlq spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.deserializationExceptionHandler: skipAndContinue

以这种方式设置反序列化异常处理器比在 binder 级别设置具有更高的优先级。

2.6.3. DLQ 分区

默认情况下,记录发布到死信主题时使用与原始记录相同的分区。这意味着死信主题必须至少有与原始记录相同数量的分区。

要更改此行为,请将 DlqPartitionFunction 实现添加为应用程序上下文中的 @Bean。只能存在一个这样的 bean。该函数提供消费者组(在大多数情况下与应用程序 ID 相同)、失败的 ConsumerRecord 和异常。例如,如果您总是希望路由到分区 0,您可以使用

@Bean
public DlqPartitionFunction partitionFunction() {
    return (group, record, ex) -> 0;
}
如果您将消费者绑定的 dlqPartitions 属性设置为 1(并且绑定的 minPartitionCount 等于 1),则无需提供 DlqPartitionFunction;框架将始终使用分区 0。如果您将消费者绑定的 dlqPartitions 属性设置为大于 1 的值(或绑定的 minPartitionCount 大于 1),则您必须提供一个 DlqPartitionFunction bean,即使分区计数与原始主题相同。

在使用 Kafka Streams binder 中的异常处理功能时,需要注意几点。

  • 属性 spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler 适用于整个应用程序。这意味着如果同一应用程序中有多个函数或 StreamListener 方法,此属性将应用于所有这些方法。

  • 反序列化的异常处理与原生反序列化和框架提供的消息转换一致工作。

2.6.4. 处理 Binder 中的生产异常

与上述反序列化异常处理程序的支持不同,binder 不提供用于处理生产异常的此类一流机制。但是,您仍然可以使用 StreamsBuilderFactoryBean 定制器来配置生产异常处理程序,您可以在下面的后续部分中找到更多详细信息。

2.7. 重试关键业务逻辑

在某些情况下,您可能希望重试应用程序中对业务逻辑至关重要的部分。这可能是对关系数据库的外部调用,或从 Kafka Streams 处理器调用 REST 端点。这些调用可能由于各种原因(例如网络问题或远程服务不可用)而失败。通常,如果可以再次尝试,这些故障可能会自行解决。默认情况下,Kafka Streams binder 会为所有输入绑定创建 RetryTemplate bean。

如果函数具有以下签名,

@Bean
public java.util.function.Consumer<KStream<Object, String>> process()

并且使用默认绑定名称,RetryTemplate 将注册为 process-in-0-RetryTemplate。这遵循绑定名称(process-in-0)后跟字面量 -RetryTemplate 的约定。在多个输入绑定的情况下,每个绑定将有一个单独的 RetryTemplate bean 可用。如果应用程序中有一个自定义 RetryTemplate bean,并通过 spring.cloud.stream.bindings.<binding-name>.consumer.retryTemplateName 提供,那么它将优先于任何输入绑定级别的重试模板配置属性。

一旦将绑定中的 RetryTemplate 注入到应用程序中,它就可以用于重试应用程序的任何关键部分。这是一个示例

@Bean
public java.util.function.Consumer<KStream<Object, String>> process(@Lazy @Qualifier("process-in-0-RetryTemplate") RetryTemplate retryTemplate) {

    return input -> input
            .process(() -> new Processor<Object, String>() {
                @Override
                public void init(ProcessorContext processorContext) {
                }

                @Override
                public void process(Object o, String s) {
                    retryTemplate.execute(context -> {
                       //Critical business logic goes here.
                    });
                }

                @Override
                public void close() {
                }
            });
}

或者您可以使用自定义 RetryTemplate 如下。

@EnableAutoConfiguration
public static class CustomRetryTemplateApp {

    @Bean
    @StreamRetryTemplate
    RetryTemplate fooRetryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();

        RetryPolicy retryPolicy = new SimpleRetryPolicy(4);
        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(1);

        retryTemplate.setBackOffPolicy(backOffPolicy);
        retryTemplate.setRetryPolicy(retryPolicy);

        return retryTemplate;
    }

    @Bean
    public java.util.function.Consumer<KStream<Object, String>> process() {

        return input -> input
                .process(() -> new Processor<Object, String>() {
                    @Override
                    public void init(ProcessorContext processorContext) {
                    }

                    @Override
                    public void process(Object o, String s) {
                        fooRetryTemplate().execute(context -> {
                           //Critical business logic goes here.
                        });

                    }

                    @Override
                    public void close() {
                    }
                });
    }
}

请注意,当重试次数耗尽时,默认情况下会抛出最后一个异常,导致处理器终止。如果您希望处理异常并继续处理,可以将 RecoveryCallback 添加到 execute 方法中:这是一个示例。

retryTemplate.execute(context -> {
    //Critical business logic goes here.
    }, context -> {
       //Recovery logic goes here.
       return null;
    ));

有关 RetryTemplate、重试策略、回退策略等的更多信息,请参阅 Spring Retry 项目。

2.8. 状态存储

当使用高级 DSL 并进行触发状态存储的适当调用时,Kafka Streams 会自动创建状态存储。

如果您想将传入的 KTable 绑定具体化为命名状态存储,那么您可以使用以下策略。

假设您有以下函数。

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
   ...
}

然后通过设置以下属性,传入的 KTable 数据将被具体化到命名状态存储中。

spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.materializedAs: incoming-store

您可以在应用程序中将自定义状态存储定义为 bean,这些状态存储将被 binder 检测并添加到 Kafka Streams 构建器中。特别是当使用处理器 API 时,您需要手动注册一个状态存储。为此,您可以在应用程序中将 StateStore 创建为一个 bean。以下是定义此类 bean 的示例。

@Bean
public StoreBuilder myStore() {
    return Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
            Serdes.Long());
}

@Bean
public StoreBuilder otherStore() {
    return Stores.windowStoreBuilder(
            Stores.persistentWindowStore("other-store",
                    1L, 3, 3L, false), Serdes.Long(),
            Serdes.Long());
}

这些状态存储可以由应用程序直接访问。

在引导期间,上述 bean 将由 binder 处理并传递给 Streams 构建器对象。

访问状态存储

Processor<Object, Product>() {

    WindowStore<Object, String> state;

    @Override
    public void init(ProcessorContext processorContext) {
        state = (WindowStore)processorContext.getStateStore("mystate");
    }
    ...
}

在注册全局状态存储时,这将不起作用。为了注册全局状态存储,请参阅下面关于自定义 StreamsBuilderFactoryBean 的部分。

2.9. 交互式查询

Kafka Streams binder API 公开了一个名为 InteractiveQueryService 的类,用于交互式查询状态存储。您可以在应用程序中将其作为 Spring bean 访问。从应用程序访问此 bean 的简单方法是自动注入此 bean。

@Autowired
private InteractiveQueryService interactiveQueryService;

一旦您访问到这个 bean,就可以查询您感兴趣的特定状态存储。请看下面。

ReadOnlyKeyValueStore<Object, Object> keyValueStore =
						interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());

在启动期间,上述检索存储的方法调用可能会失败。例如,它可能仍在初始化状态存储。在这种情况下,重试此操作将很有用。Kafka Streams binder 提供了一个简单的重试机制来适应这种情况。

以下是可用于控制此重试的两个属性。

  • spring.cloud.stream.kafka.streams.binder.stateStoreRetry.maxAttempts - 默认值为 1

  • spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backOffInterval - 默认值为 1000 毫秒。

如果有多个 Kafka Streams 应用程序实例正在运行,那么在交互式查询它们之前,您需要识别哪个应用程序实例托管您正在查询的特定键。InteractiveQueryService API 提供了识别主机信息的方法。

为了使此功能正常工作,您必须按如下方式配置属性 application.server

spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>

以下是一些代码片段

org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
						key, keySerializer);

if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {

    //query from the store that is locally available
}
else {
    //query from the remote host
}

有关这些主机查找方法的更多信息,请参阅这些方法的 Javadoc。对于这些方法,在启动期间,如果底层 KafkaStreams 对象尚未准备就绪,它们可能会抛出异常。上述重试属性也适用于这些方法。

2.9.1. InteractiveQueryService 提供的其他 API 方法

使用以下 API 方法检索与给定存储和键组合关联的 KeyQueryMetadata 对象。

public <K> KeyQueryMetadata getKeyQueryMetadata(String store, K key, Serializer<K> serializer)

使用以下 API 方法检索与给定存储和键组合关联的 KakfaStreams 对象。

public <K> KafkaStreams getKafkaStreams(String store, K key, Serializer<K> serializer)

2.10. 健康指示器

健康指示器需要依赖 spring-boot-starter-actuator。对于 Maven,请使用

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

Spring Cloud Stream Kafka Streams Binder 提供了一个健康指示器来检查底层 streams 线程的状态。Spring Cloud Stream 定义了一个属性 management.health.binders.enabled 来启用健康指示器。请参阅 Spring Cloud Stream 文档

健康指示器为每个流线程的元数据提供以下详细信息

  • 线程名称

  • 线程状态:CREATEDRUNNINGPARTITIONS_REVOKEDPARTITIONS_ASSIGNEDPENDING_SHUTDOWNDEAD

  • 活动任务:任务 ID 和分区

  • 备用任务:任务 ID 和分区

默认情况下,只显示全局状态(UPDOWN)。要显示详细信息,属性 management.endpoint.health.show-details 必须设置为 ALWAYSWHEN_AUTHORIZED。有关健康信息的更多详细信息,请参阅 Spring Boot Actuator 文档

如果所有已注册的 Kafka 线程都处于 RUNNING 状态,则健康指示器的状态为 UP

由于 Kafka Streams binder 中有三个独立的 binder(KStreamKTableGlobalKTable),它们都将报告健康状态。启用 show-details 时,报告的一些信息可能冗余。

当同一个应用程序中存在多个 Kafka Streams 处理器时,将为所有处理器报告健康检查,并按 Kafka Streams 的应用程序 ID 进行分类。

2.11. 访问 Kafka Streams 指标

Spring Cloud Stream Kafka Streams binder 提供了 Kafka Streams 指标,可以通过 Micrometer MeterRegistry 导出。

对于 Spring Boot 2.2.x 版本,指标支持通过 binder 的自定义 Micrometer 指标实现提供。对于 Spring Boot 2.3.x 版本,Kafka Streams 指标支持通过 Micrometer 本地提供。

通过 Boot actuator 端点访问指标时,请确保将 metrics 添加到属性 management.endpoints.web.exposure.include 中。然后,您可以访问 /acutator/metrics 以获取所有可用指标的列表,然后可以通过相同的 URI(/actuator/metrics/<metric-name>)单独访问这些指标。

2.12. 混合使用高级 DSL 和低级 Processor API

Kafka Streams 提供了两种 API 变体。它有一个更高级的 DSL 样式的 API,您可以链式调用各种操作,这对于许多函数式程序员来说可能很熟悉。Kafka Streams 还提供了低级处理器 API。处理器 API 虽然功能强大,并且能够以更低的级别控制事物,但其本质是命令式的。Spring Cloud Stream 的 Kafka Streams binder 允许您使用高级 DSL 或混合使用 DSL 和处理器 API。混合使用这两种变体为您提供了许多选项来控制应用程序中的各种用例。应用程序可以使用 transformprocess 方法 API 调用来访问处理器 API。

以下是如何在 Spring Cloud Stream 应用程序中使用 process API 结合 DSL 和处理器 API 的示例。

@Bean
public Consumer<KStream<Object, String>> process() {
    return input ->
        input.process(() -> new Processor<Object, String>() {
            @Override
            @SuppressWarnings("unchecked")
            public void init(ProcessorContext context) {
               this.context = context;
            }

            @Override
            public void process(Object key, String value) {
                //business logic
            }

            @Override
            public void close() {

        });
}

以下是使用 transform API 的示例。

@Bean
public Consumer<KStream<Object, String>> process() {
    return (input, a) ->
        input.transform(() -> new Transformer<Object, String, KeyValue<Object, String>>() {
            @Override
            public void init(ProcessorContext context) {

            }

            @Override
            public void close() {

            }

            @Override
            public KeyValue<Object, String> transform(Object key, String value) {
                // business logic - return transformed KStream;
            }
        });
}

process API 方法调用是终端操作,而 transform API 是非终端操作,它为您提供一个潜在转换的 KStream,您可以使用它继续使用 DSL 或处理器 API 进行进一步处理。

2.13. 出站分区支持

Kafka Streams 处理器通常将处理后的输出发送到出站 Kafka 主题。如果出站主题已分区并且处理器需要将出站数据发送到特定分区,则应用程序需要提供一个 StreamPartitioner 类型的 bean。有关详细信息,请参阅 StreamPartitioner。让我们看一些示例。

这与我们之前多次看到的处理器相同,

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {

    ...
}

这是输出绑定目的地

spring.cloud.stream.bindings.process-out-0.destination: outputTopic

如果主题 outputTopic 有 4 个分区,如果您不提供分区策略,Kafka Streams 将使用默认分区策略,这可能不是您想要的输出,具体取决于特定用例。假设您想将任何匹配 spring 的键发送到分区 0,将 cloud 发送到分区 1,将 stream 发送到分区 2,将所有其他键发送到分区 3。以下是您需要在应用程序中执行的操作。

@Bean
public StreamPartitioner<String, WordCount> streamPartitioner() {
    return (t, k, v, n) -> {
        if (k.equals("spring")) {
            return 0;
        }
        else if (k.equals("cloud")) {
            return 1;
        }
        else if (k.equals("stream")) {
            return 2;
        }
        else {
            return 3;
        }
    };
}

这是一个粗略的实现,但是,您可以访问记录的键/值、主题名称和分区总数。因此,如有需要,您可以实现复杂的分区策略。

您还需要将此 bean 名称与应用程序配置一起提供。

spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.streamPartitionerBeanName: streamPartitioner

应用程序中的每个输出主题都需要像这样单独配置。

2.14. StreamsBuilderFactoryBean 定制器

通常需要自定义创建 KafkaStreams 对象的 StreamsBuilderFactoryBean。基于 Spring Kafka 提供的底层支持,binder 允许您自定义 StreamsBuilderFactoryBean。您可以使用 StreamsBuilderFactoryBeanCustomizer 来自定义 StreamsBuilderFactoryBean 本身。然后,一旦您通过此定制器访问 StreamsBuilderFactoryBean,您就可以使用 KafkaStreamsCustomzier 自定义相应的 KafkaStreams。这两个定制器都是 Spring for Apache Kafka 项目的一部分。

以下是使用 StreamsBuilderFactoryBeanCustomizer 的示例。

@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
    return sfb -> sfb.setStateListener((newState, oldState) -> {
         //Do some action here!
    });
}

上面所示的只是一个示例,说明了您可以如何自定义 StreamsBuilderFactoryBean。您可以调用 StreamsBuilderFactoryBean 中的任何可用修改操作来自定义它。此定制器将在工厂 bean 启动之前由 binder 调用。

一旦您获得了 StreamsBuilderFactoryBean 的访问权限,您还可以自定义底层的 KafkaStreams 对象。以下是实现此目的的蓝图。

@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
    return factoryBean -> {
        factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
            @Override
            public void customize(KafkaStreams kafkaStreams) {
                kafkaStreams.setUncaughtExceptionHandler((t, e) -> {

                });
            }
        });
    };
}

KafkaStreamsCustomizer 将在底层 KafkaStreams 启动之前由 StreamsBuilderFactoryBeabn 调用。

整个应用程序中只能有一个 StreamsBuilderFactoryBeanCustomizer。那么,我们如何处理多个 Kafka Streams 处理器呢,因为它们每个都由独立的 StreamsBuilderFactoryBean 对象支持?在这种情况下,如果这些处理器的自定义需要不同,那么应用程序需要根据应用程序 ID 应用一些过滤器。

例如,

@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {

    return factoryBean -> {
        if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
                .equals("processor1-application-id")) {
            factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
                @Override
                public void customize(KafkaStreams kafkaStreams) {
                    kafkaStreams.setUncaughtExceptionHandler((t, e) -> {

                    });
                }
            });
        }
    };

2.14.1. 使用 Customizer 注册全局状态存储

如上所述,binder 不提供一种一流的方式来注册全局状态存储。为此,您需要使用定制器。以下是如何实现此目的。

@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
    return fb -> {
        try {
            final StreamsBuilder streamsBuilder = fb.getObject();
            streamsBuilder.addGlobalStore(...);
        }
        catch (Exception e) {

        }
    };
}

同样,如果您有多个处理器,您可能希望通过如上所述使用应用程序 ID 过滤掉其他 StreamsBuilderFactoryBean 对象,将全局状态存储附加到正确的 StreamsBuilder

2.14.2. 使用 Customizer 注册生产异常处理器

在错误处理部分,我们指出 binder 没有提供一种一流的方式来处理生产异常。尽管如此,您仍然可以使用 StreamsBuilderFacotryBean 定制器来注册生产异常处理程序。请参阅下文。

@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
    return fb -> {
        fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
                            CustomProductionExceptionHandler.class);
    };
}

再次,如果您有多个处理器,您可能需要针对正确的 StreamsBuilderFactoryBean 适当设置它。您也可以使用配置属性添加此类生产异常处理程序(有关更多信息,请参见下文),但如果您选择编程方法,这是一个选项。

2.15. 时间戳提取器

Kafka Streams 允许您根据各种时间戳概念控制消费者记录的处理。默认情况下,Kafka Streams 提取嵌入在消费者记录中的时间戳元数据。您可以通过为每个输入绑定提供不同的 TimestampExtractor 实现来更改此默认行为。以下是有关如何实现此目的的一些详细信息。

@Bean
public Function<KStream<Long, Order>,
        Function<KTable<Long, Customer>,
                Function<GlobalKTable<Long, Product>, KStream<Long, Order>>>> process() {
    return orderStream ->
            customers ->
                products -> orderStream;
}

@Bean
public TimestampExtractor timestampExtractor() {
    return new WallclockTimestampExtractor();
}

然后为每个消费者绑定设置上述 TimestampExtractor bean 名称。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-2.consumer.timestampExtractorBeanName=timestampExtractor"

如果您跳过设置自定义时间戳提取器的输入消费者绑定,则该消费者将使用默认设置。

2.16. 结合 Kafka Streams binder 和常规 Kafka Binder 的多 binder

您可以有一个应用程序,其中既有基于常规 Kafka binder 的函数/消费者/供应商,也有基于 Kafka Streams 的处理器。但是,您不能在一个函数或消费者中混合使用它们。

这是一个示例,其中您在同一个应用程序中同时拥有基于 binder 的组件。

@Bean
public Function<String, String> process() {
    return s -> s;
}

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> kstreamProcess() {

    return input -> input;
}

这是配置中的相关部分

spring.cloud.stream.function.definition=process;kstreamProcess
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar

如果您的应用程序与上述相同,但处理两个不同的 Kafka 集群,例如,常规的 process 作用于 Kafka 集群 1 和集群 2(从集群 1 接收数据并发送到集群 2),而 Kafka Streams 处理器作用于 Kafka 集群 2。那么事情就会变得更加复杂。您必须使用 Spring Cloud Stream 提供的 多绑定器 功能。

以下是您的配置在这种情况下可能如何变化。

# multi binder configuration
spring.cloud.stream.binders.kafka1.type: kafka
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-1} #Replace kafkaCluster-1 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka2.type: kafka
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka3.type: kstream
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster

spring.cloud.stream.function.definition=process;kstreamProcess

# From cluster 1 to cluster 2 with regular process function
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-in-0.binder=kafka1 # source from cluster 1
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.process-out-0.binder=kafka2 # send to cluster 2

# Kafka Streams processor on cluster 2
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.binder=kafka3
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
spring.cloud.stream.bindings.kstreamProcess-out-0.binder=kafka3

请注意上面的配置。我们有两种绑定器,但总共有 3 个绑定器,第一个是基于集群 1 (kafka1) 的常规 Kafka 绑定器,然后是基于集群 2 (kafka2) 的另一个 Kafka 绑定器,最后是 kstream (kafka3)。应用程序中的第一个处理器从 kafka1 接收数据并发布到 kafka2,其中两个绑定器都基于常规 Kafka 绑定器,但集群不同。第二个处理器是 Kafka Streams 处理器,它从 kafka3 消费数据,该集群与 kafka2 相同,但绑定器类型不同。

由于 Kafka Streams 绑定器系列中有三种不同的绑定器类型 - kstreamktableglobalktable - 如果您的应用程序有基于这些绑定器中的任何一个的多个绑定,则需要明确提供绑定器类型。

例如,如果您有以下处理器,

@Bean
public Function<KStream<Long, Order>,
        Function<KTable<Long, Customer>,
                Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {

    ...
}

那么,这必须在多绑定器场景中配置如下。请注意,这仅在您有一个真正的多绑定器场景时才需要,即单个应用程序中有多个处理器处理多个集群。在这种情况下,需要显式为绑定提供绑定器,以区分其他处理器的绑定器类型和集群。

spring.cloud.stream.binders.kafka1.type: kstream
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka2.type: ktable
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka3.type: globalktable
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}

spring.cloud.stream.bindings.enrichOrder-in-0.binder=kafka1  #kstream
spring.cloud.stream.bindings.enrichOrder-in-1.binder=kafka2  #ktablr
spring.cloud.stream.bindings.enrichOrder-in-2.binder=kafka3  #globalktable
spring.cloud.stream.bindings.enrichOrder-out-0.binder=kafka1 #kstream

# rest of the configuration is omitted.

2.17. 状态清理

默认情况下,当绑定停止时,不会清理任何本地状态。这与 Spring Kafka 2.7 版本以来的行为相同。有关更多详细信息,请参阅 Spring Kafka 文档。要修改此行为,只需将单个 CleanupConfig @Bean(配置为在启动、停止或两者都不进行清理)添加到应用程序上下文;该 bean 将被检测并连接到工厂 bean 中。

2.18. Kafka Streams 拓扑可视化

Kafka Streams binder 提供了以下执行器端点,用于检索拓扑描述,您可以使用外部工具可视化该拓扑。

/actuator/kafkastreamstopology

/actuator/kafkastreamstopology/<application-id of the processor>

您需要包含 Spring Boot 的 actuator 和 web 依赖项才能访问这些端点。此外,您还需要将 kafkastreamstopology 添加到 management.endpoints.web.exposure.include 属性中。默认情况下,kafkastreamstopology 端点是禁用的。

2.19. Kafka Streams 应用程序中基于事件类型的路由

Kafka Streams binder 不支持常规消息通道绑定器中可用的路由功能。但是,Kafka Streams binder 仍然通过入站记录上的事件类型记录头提供路由功能。

要启用基于事件类型的路由,应用程序必须提供以下属性。

spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.eventTypes.

这可以是逗号分隔的值。

例如,假设我们有这个函数

@Bean
public Function<KStream<Integer, Foo>, KStream<Integer, Foo>> process() {
    return input -> input;
}

让我们还假设我们只希望在此函数中执行业务逻辑,如果传入记录的事件类型为 foobar。这可以使用绑定上的 eventTypes 属性表示如下。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypes=foo,bar

现在,当应用程序运行时,binder 会检查每个传入记录的 event_type 头部,查看其值是否设置为 foobar。如果未找到其中任何一个,则将跳过函数执行。

默认情况下,binder 期望记录头键为 event_type,但可以为每个绑定更改。例如,如果我们要将此绑定上的头键更改为 my_event 而不是默认值,则可以如下更改。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypeHeaderKey=my_event.

2.20. Kafka Streams binder 中的绑定可视化和控制

从 3.1.2 版本开始,Kafka Streams binder 支持绑定可视化和控制。只支持 STOPPEDSTARTED 两种生命周期阶段。Kafka Streams binder 中不提供 PAUSEDRESUMED 生命周期阶段。

为了激活绑定可视化和控制,应用程序需要包含以下两个依赖项。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-web</artifactId>
</dependency>

如果您喜欢使用 webflux,则可以包含 spring-boot-starter-webflux 而不是标准 web 依赖项。

此外,您还需要设置以下属性

management.endpoints.web.exposure.include=bindings

为了进一步说明此功能,让我们使用以下应用程序作为指南

@SpringBootApplication
public class KafkaStreamsApplication {

	public static void main(String[] args) {
		SpringApplication.run(KafkaStreamsApplication.class, args);
	}

	@Bean
	public Consumer<KStream<String, String>> consumer() {
		return s -> s.foreach((key, value) -> System.out.println(value));
	}

	@Bean
	public Function<KStream<String, String>, KStream<String, String>> function() {
		return ks -> ks;
	}

}

如我们所见,该应用程序有两个 Kafka Streams 函数 - 一个是消费者,另一个是函数。消费者绑定默认命名为 consumer-in-0。类似地,对于函数,输入绑定是 function-in-0,输出绑定是 function-out-0

应用程序启动后,我们可以使用以下绑定端点找到有关绑定的详细信息。

 curl https://:8080/actuator/bindings | jq .
[
  {
    "bindingName": "consumer-in-0",
    "name": "consumer-in-0",
    "group": "consumer-applicationId",
    "pausable": false,
    "state": "running",
    "paused": false,
    "input": true,
    "extendedInfo": {}
  },
  {
    "bindingName": "function-in-0",
    "name": "function-in-0",
    "group": "function-applicationId",
    "pausable": false,
    "state": "running",
    "paused": false,
    "input": true,
    "extendedInfo": {}
  },
  {
    "bindingName": "function-out-0",
    "name": "function-out-0",
    "group": "function-applicationId",
    "pausable": false,
    "state": "running",
    "paused": false,
    "input": false,
    "extendedInfo": {}
  }
]

上面可以找到所有三个绑定的详细信息。

现在让我们停止 consumer-in-0 绑定。

curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST https://:8080/actuator/bindings/consumer-in-0

此时,将不会通过此绑定接收任何记录。

再次启动绑定。

curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST https://:8080/actuator/bindings/consumer-in-0

当单个函数上存在多个绑定时,在其中任何一个绑定上调用这些操作都将起作用。这是因为单个函数上的所有绑定都由同一个 StreamsBuilderFactoryBean 支持。因此,对于上述函数,function-in-0function-out-0 都将起作用。

2.21. 手动启动 Kafka Streams 处理器

Spring Cloud Stream Kafka Streams binder 在 Spring for Apache Kafka 的 StreamsBuilderFactoryBean 之上提供了一个名为 StreamsBuilderFactoryManager 的抽象。此管理器 API 用于控制基于 binder 的应用程序中每个处理器的多个 StreamsBuilderFactoryBean。因此,当使用 binder 时,如果您想手动控制应用程序中各种 StreamsBuilderFactoryBean 对象的自动启动,则需要使用 StreamsBuilderFactoryManager。您可以使用属性 spring.kafka.streams.auto-startup 并将其设置为 false 以关闭处理器的自动启动。然后,在应用程序中,您可以使用以下内容通过 StreamsBuilderFactoryManager 启动处理器。

@Bean
public ApplicationRunner runner(StreamsBuilderFactoryManager sbfm) {
    return args -> {
        sbfm.start();
    };
}

当您希望应用程序在主线程中启动并让 Kafka Streams 处理器单独启动时,此功能非常有用。例如,当您有一个需要恢复的大型状态存储时,如果处理器像默认情况下那样正常启动,这可能会阻止您的应用程序启动。如果您正在使用某种活跃度探测机制(例如在 Kubernetes 上),它可能会认为应用程序已关闭并尝试重新启动。为了纠正此问题,您可以将 spring.kafka.streams.auto-startup 设置为 false 并遵循上述方法。

请记住,当使用 Spring Cloud Stream binder 时,您不是直接处理 Spring for Apache Kafka 的 StreamsBuilderFactoryBean,而是 StreamsBuilderFactoryManager,因为 StreamsBuilderFactoryBean 对象由 binder 内部管理。

2.22. 选择性手动启动 Kafka Streams 处理器

虽然上述方法将通过 StreamsBuilderFactoryManager 无条件地对应用程序中的所有 Kafka Streams 处理器应用 auto start false,但通常希望只有个别选定的 Kafka Streams 处理器不自动启动。例如,假设您的应用程序中有三个不同的函数(处理器),并且对于其中一个处理器,您不希望它作为应用程序启动的一部分自动启动。以下是这种情况的一个示例。

@Bean
public Function<KStream<?, ?>, KStream<?, ?>> process1() {

}

@Bean
public Consumer<KStream<?, ?>> process2() {

}

@Bean
public BiFunction<KStream<?, ?>, KTable<?, ?>, KStream<?, ?>> process3() {

}

在上述场景中,如果您将 spring.kafka.streams.auto-startup 设置为 false,则所有处理器都不会在应用程序启动期间自动启动。在这种情况下,您必须通过调用底层 StreamsBuilderFactoryManager 上的 start() 来以编程方式启动它们,如上所述。但是,如果我们要选择性地禁用一个处理器,则必须在该处理器的单个绑定上设置 auto-startup。假设我们不希望 process3 函数自动启动。这是一个具有两个输入绑定(process3-in-0process3-in-1)的 BiFunction。为了避免此处理器自动启动,您可以选择任何一个输入绑定并在其上设置 auto-startup。选择哪个绑定无关紧要;如果您愿意,可以在两个绑定上都将 auto-startup 设置为 false,但一个就足够了。因为它们共享同一个工厂 bean,所以您不必在两个绑定上都将 autoStartup 设置为 false,但为了清晰起见,这样做可能更有意义。

以下是您可以用于禁用此处理器自动启动的 Spring Cloud Stream 属性。

spring.cloud.stream.bindings.process3-in-0.consumer.auto-startup: false

spring.cloud.stream.bindings.process3-in-1.consumer.auto-startup: false

然后,您可以手动启动处理器,无论是使用 REST 端点还是使用 BindingsEndpoint API,如下所示。为此,您需要确保类路径上存在 Spring Boot actuator 依赖项。

curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST https://:8080/actuator/bindings/process3-in-0

@Autowired
BindingsEndpoint endpoint;

@Bean
public ApplicationRunner runner() {
    return args -> {
        endpoint.changeState("process3-in-0", State.STARTED);
    };
}

有关此机制的更多详细信息,请参阅参考文档中的 此部分

当通过禁用 auto-startup 控制绑定(如本节所述)时,请注意这仅适用于消费者绑定。换句话说,如果您使用生产者绑定 process3-out-0,它在禁用处理器自动启动方面没有任何效果,尽管此生产者绑定与消费者绑定使用相同的 StreamsBuilderFactoryBean

2.23. 使用 Spring Cloud Sleuth 进行追踪

当 Spring Cloud Sleuth 在基于 Spring Cloud Stream Kafka Streams binder 的应用程序的类路径中时,其消费者和生产者都会自动通过跟踪信息进行检测。但是,为了跟踪任何特定于应用程序的操作,这些操作需要由用户代码显式检测。这可以通过在应用程序中注入 Spring Cloud Sleuth 的 KafkaStreamsTracing bean,然后通过此注入的 bean 调用各种 Kafka Streams 操作来完成。以下是使用它的一些示例。

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> clicks(KafkaStreamsTracing kafkaStreamsTracing) {
    return (userClicksStream, userRegionsTable) -> (userClicksStream
            .transformValues(kafkaStreamsTracing.peek("span-1", (key, value) -> LOG.info("key/value: " + key + "/" + value)))
            .leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
                            "UNKNOWN" : region, clicks),
                    Joined.with(Serdes.String(), Serdes.Long(), null))
            .transform(kafkaStreamsTracing.map("span-2", (key, value) -> {
                LOG.info("Click Info: " + value.getRegion() + "/" + value.getClicks());
                return new KeyValue<>(value.getRegion(),
                        value.getClicks());
            }))
            .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
            .reduce(Long::sum, Materialized.as(CLICK_UPDATES))
            .toStream());
}

在上面的示例中,有两个地方添加了显式跟踪检测。首先,我们正在记录来自传入 KStream 的键/值信息。当记录此信息时,相关的 span 和 trace ID 也将被记录,以便监控系统可以跟踪它们并与相同的 span id 相关联。其次,当我们调用 map 操作时,我们不是直接在 KStream 类上调用它,而是将其包装在 transform 操作中,然后从 KafkaStreamsTracing 调用 map。在这种情况下,记录的消息也将包含 span ID 和 trace ID。

这是另一个示例,我们使用低级 transformer API 访问各种 Kafka Streams 头部。当 spring-cloud-sleuth 在类路径中时,所有跟踪头部也可以这样访问。

@Bean
public Function<KStream<String, String>, KStream<String, String>> process(KafkaStreamsTracing kafkaStreamsTracing) {
    return input -> input.transform(kafkaStreamsTracing.transformer(
            "transformer-1",
            () -> new Transformer<String, String, KeyValue<String, String>>() {
                ProcessorContext context;

                @Override
                public void init(ProcessorContext context) {
                    this.context = context;
                }

                @Override
                public KeyValue<String, String> transform(String key, String value) {
                    LOG.info("Headers: " + this.context.headers());
                    LOG.info("K/V:" + key + "/" + value);
                    // More transformations, business logic execution, etc. go here.
                    return KeyValue.pair(key, value);
                }

                @Override
                public void close() {
                }
            }));
}

2.24. 配置选项

本节包含 Kafka Streams binder 使用的配置选项。

有关绑定器的常见配置选项和属性,请参阅核心文档

2.24.1. Kafka Streams Binder 属性

以下属性在绑定器级别可用,并且必须以 spring.cloud.stream.kafka.streams.binder. 为前缀。Kafka Streams 绑定器中重复使用的任何 Kafka 绑定器提供的属性必须以 spring.cloud.stream.kafka.streams.binder 为前缀,而不是 spring.cloud.stream.kafka.binder。此规则的唯一例外是定义 Kafka 引导服务器属性时,在这种情况下,任何前缀都可以。

configuration

包含 Apache Kafka Streams API 相关属性的键/值对映射。此属性必须以 spring.cloud.stream.kafka.streams.binder. 为前缀。以下是一些使用此属性的示例。

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000

有关 Streams 配置中可能包含的所有属性的更多信息,请参阅 Apache Kafka Streams 文档中的 StreamsConfig JavaDocs。您可以通过此属性设置 StreamsConfig 中的所有配置。当使用此属性时,它适用于整个应用程序,因为这是一个 binder 级别属性。如果应用程序中有多个处理器,所有处理器都将获取这些属性。在 application.id 等属性的情况下,这可能会出现问题,因此您必须仔细检查 StreamsConfig 中的属性如何使用此 binder 级别的 configuration 属性进行映射。

functions.<function-bean-name>.applicationId

仅适用于函数式处理器。这可用于在应用程序中为每个函数设置应用程序 ID。在多个函数的情况下,这是一种设置应用程序 ID 的便捷方式。

functions.<function-bean-name>.configuration

仅适用于函数式处理器。包含 Apache Kafka Streams API 相关属性的键/值对映射。这类似于上面描述的 binder 级别的 configuration 属性,但此级别的 configuration 属性仅限于命名函数。当您有多个处理器并且希望根据特定函数限制对配置的访问时,您可能希望使用此属性。此处可以使用所有 StreamsConfig 属性。

brokers

Broker URL

默认值:localhost

zkNodes

Zookeeper URL

默认值:localhost

deserializationExceptionHandler

反序列化错误处理程序类型。此处理程序应用于绑定器级别,因此应用于应用程序中的所有输入绑定。有一种更细粒度的方式可以在消费者绑定级别控制它。可能的值为 - logAndContinuelogAndFailskipAndContinuesendToDlq

默认值:logAndFail

applicationId

在绑定器级别全局设置 Kafka Streams 应用程序的 application.id 的便捷方式。如果应用程序包含多个函数或 StreamListener 方法,则应以不同方式设置应用程序 ID。详细讨论应用程序 ID 的设置请参见上文。

默认值:应用程序将生成一个静态应用程序 ID。有关详细信息,请参阅应用程序 ID 部分。

stateStoreRetry.maxAttempts

尝试连接到状态存储的最大尝试次数。

默认值:1

stateStoreRetry.backoffPeriod

重试时连接到状态存储的退避周期。

默认值:1000 毫秒

consumerProperties

绑定器级别的任意消费者属性。

producerProperties

绑定器级别的任意生产者属性。

includeStoppedProcessorsForHealthCheck

当通过 actuator 停止处理器的绑定时,默认情况下此处理器将不参与健康检查。将此属性设置为 true 以启用所有处理器的健康检查,包括那些当前通过绑定 actuator 端点停止的处理器。

默认值:false

2.24.2. Kafka Streams 生产者属性

以下属性适用于 Kafka Streams 生产者,并且必须以 spring.cloud.stream.kafka.streams.bindings.<binding name>.producer. 为前缀。为方便起见,如果存在多个输出绑定并且它们都需要一个公共值,则可以通过使用前缀 spring.cloud.stream.kafka.streams.default.producer. 进行配置。

keySerde

要使用的键序列化器

默认值:参见上面关于消息解/序列化的讨论

valueSerde

要使用的值序列化器

默认值:参见上面关于消息解/序列化的讨论

useNativeEncoding

启用/禁用原生编码的标志

默认值:true

streamPartitionerBeanName

用于消费者的自定义出站分区器 bean 名称。应用程序可以提供自定义的 StreamPartitioner 作为 Spring bean,并且可以将此 bean 的名称提供给生产者以使用,而不是默认的。

默认值:参见上面关于出站分区支持的讨论。

producedAs

处理器正在生产到的 sink 组件的自定义名称。

默认值:none (由 Kafka Streams 生成)

2.24.3. Kafka Streams 消费者属性

以下属性适用于 Kafka Streams 消费者,并且必须以 spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer. 为前缀。为方便起见,如果存在多个输入绑定并且它们都需要一个公共值,则可以通过使用前缀 spring.cloud.stream.kafka.streams.default.consumer. 进行配置。

applicationId

为每个输入绑定设置 application.id。这仅适用于基于 StreamListener 的处理器,对于基于函数的处理器,请参阅上面概述的其他方法。

默认值:参见上文。

keySerde

要使用的键序列化器

默认值:参见上面关于消息解/序列化的讨论

valueSerde

要使用的值序列化器

默认值:参见上面关于消息解/序列化的讨论

materializedAs

使用传入 KTable 类型时要具体化的状态存储

默认值:none

useNativeDecoding

启用/禁用原生解码的标志

默认值:true

dlqName

DLQ 主题名称。

默认值:参见上面关于错误处理和 DLQ 的讨论。

startOffset

如果没有已提交的偏移量可供消费,则从哪个偏移量开始。这主要用于消费者首次从主题消费时。Kafka Streams 使用 earliest 作为默认策略,binder 使用相同的默认值。这可以通过此属性覆盖为 latest

默认值:earliest

注意:在消费者上使用 resetOffsets 对 Kafka Streams binder 没有影响。与基于消息通道的 binder 不同,Kafka Streams binder 不会按需查找开头或结尾。

deserializationExceptionHandler

反序列化错误处理程序类型。此处理程序应用于每个消费者绑定,而不是之前描述的绑定器级别属性。可能的值为 - logAndContinuelogAndFailskipAndContinuesendToDlq

默认值:logAndFail

timestampExtractorBeanName

特定时间戳提取器 bean 名称,用于消费者。应用程序可以提供 TimestampExtractor 作为 Spring bean,并且可以将此 bean 的名称提供给消费者以使用,而不是默认的。

默认值:参见上面关于时间戳提取器的讨论。

eventTypes

此绑定支持的事件类型列表,逗号分隔。

默认值:none

eventTypeHeaderKey

通过此绑定传入的每条记录上的事件类型头键。

默认值:event_type

consumedAs

处理器从中消费的源组件的自定义名称。

默认值:none (由 Kafka Streams 生成)

2.24.4. 关于并发的特别说明

在 Kafka Streams 中,您可以使用 num.stream.threads 属性控制处理器可以创建的线程数。您可以通过上述绑定器、函数、生产者或消费者级别的各种 configuration 选项来完成此操作。您也可以为此目的使用 Spring Cloud Stream 核心提供的 concurrency 属性。使用此属性时,您需要将其应用于消费者。当函数或 StreamListener 中有多个输入绑定时,请将其设置在第一个输入绑定上。例如,当设置 spring.cloud.stream.bindings.process-in-0.consumer.concurrency 时,它将被绑定器转换为 num.stream.threads。如果您有多个处理器,并且一个处理器定义了绑定级别并发,而其他处理器没有,则那些没有绑定级别并发的处理器将回退到通过 spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads 指定的绑定器范围属性。如果此绑定器配置不可用,则应用程序将使用 Kafka Streams 设置的默认值。

3. 提示、技巧和秘籍

3.1. Kafka 简单 DLQ

3.1.1. 问题陈述

作为开发人员,我希望编写一个从 Kafka 主题处理记录的消费者应用程序。但是,如果在处理过程中发生一些错误,我不想让应用程序完全停止。相反,我想将错误记录发送到 DLT(死信主题),然后继续处理新记录。

3.1.2. 解决方案

解决此问题的方法是使用 Spring Cloud Stream 中的 DLQ 功能。为了便于讨论,我们假设以下是我们的处理器函数。

@Bean
public Consumer<byte[]> processData() {
  return s -> {
     throw new RuntimeException();
  };

这是一个非常简单的函数,它对处理的所有记录都抛出异常,但您可以将此函数扩展到任何其他类似情况。

为了将错误记录发送到 DLT,我们需要提供以下配置。

spring.cloud.stream:
  bindings:
   processData-in-0:
     group: my-group
     destination: input-topic
 kafka:
   bindings:
     processData-in-0:
       consumer:
         enableDlq: true
         dlqName: input-topic-dlq

为了激活 DLQ,应用程序必须提供一个组名。匿名消费者无法使用 DLQ 功能。我们还需要通过将 Kafka 消费者绑定上的 enableDLQ 属性设置为 true 来启用 DLQ。最后,我们可以选择性地通过在 Kafka 消费者绑定上提供 dlqName 来提供 DLT 名称,否则在此情况下默认为 input-topic-dlq.my-group.error

请注意,在上面提供的示例消费者中,有效载荷的类型是 byte[]。默认情况下,Kafka binder 中的 DLQ 生产者期望有效载荷类型为 byte[]。如果不是这种情况,那么我们需要提供正确序列化器的配置。例如,让我们将消费者函数重新编写如下

@Bean
public Consumer<String> processData() {
  return s -> {
     throw new RuntimeException();
  };
}

现在,我们需要告诉 Spring Cloud Stream,我们希望在写入 DLT 时如何序列化数据。以下是此场景的修改配置

spring.cloud.stream:
  bindings:
   processData-in-0:
     group: my-group
     destination: input-topic
 kafka:
   bindings:
     processData-in-0:
       consumer:
         enableDlq: true
         dlqName: input-topic-dlq
         dlqProducerProperties:
           configuration:
             value.serializer: org.apache.kafka.common.serialization.StringSerializer

3.2. 带高级重试选项的 DLQ

3.2.1. 问题陈述

这与上面的方法类似,但作为开发人员,我希望配置重试的处理方式。

3.2.2. 解决方案

如果您遵循了上述方法,那么当处理遇到错误时,您将获得 Kafka binder 中内置的默认重试选项。

默认情况下,binder 最多重试 3 次,初始延迟 1 秒,每次退避乘数为 2.0,最大延迟为 10 秒。您可以如下更改所有这些配置

spring.cloud.stream.bindings.processData-in-0.consumer.maxAtttempts
spring.cloud.stream.bindings.processData-in-0.consumer.backOffInitialInterval
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMultipler
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMaxInterval

如果需要,您还可以通过提供布尔值映射来提供可重试异常列表。例如,

spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalStateException=true
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalArgumentException=false

默认情况下,未在上述映射中列出的任何异常都将被重试。如果不需要,可以通过提供以下内容禁用:

spring.cloud.stream.bindings.processData-in-0.consumer.defaultRetryable=false

您还可以提供自己的 RetryTemplate 并将其标记为 @StreamRetryTemplate,它将被绑定器扫描并使用。这在您需要更复杂的重试策略和策略时很有用。

如果您有多个 @StreamRetryTemplate bean,那么您可以使用以下属性指定您的绑定想要使用哪一个:

spring.cloud.stream.bindings.processData-in-0.consumer.retry-template-name=<your-retry-template-bean-name>

3.3. 使用 DLQ 处理反序列化错误

3.3.1. 问题陈述

我有一个处理器,在 Kafka 消费者中遇到反序列化异常。我期望 Spring Cloud Stream DLQ 机制能捕获这种情况,但它没有。我该如何处理?

3.3.2. 解决方案

当 Kafka 消费者抛出不可恢复的反序列化异常时,Spring Cloud Stream 提供的正常 DLQ 机制将无济于事。这是因为此异常甚至在消费者 poll() 方法返回之前就发生了。Spring for Apache Kafka 项目提供了一些很好的方法来帮助 binder 处理这种情况。让我们探讨一下。

假设这是我们的函数

@Bean
public Consumer<String> functionName() {
    return s -> {
        System.out.println(s);
    };
}

这是一个简单的函数,它接受一个 String 参数。

我们希望绕过 Spring Cloud Stream 提供的消息转换器,而使用原生反序列化器。对于 String 类型,这没什么意义,但对于更复杂的类型,如 AVRO 等,您必须依赖外部反序列化器,因此希望将转换委托给 Kafka。

现在当消费者接收到数据时,假设有一个错误的记录导致了反序列化错误,例如有人传递了一个 Integer 而不是一个 String。在这种情况下,如果您在应用程序中不采取任何措施,异常将通过链传播,并且您的应用程序最终将退出。

为了处理此问题,您可以添加一个 ListenerContainerCustomizer @Bean,它配置一个 SeekToCurrentErrorHandler。此 SeekToCurrentErrorHandler 配置有一个 DeadLetterPublishingRecoverer。我们还需要为消费者配置一个 ErrorHandlingDeserializer。这听起来很复杂,但实际上,在这种情况下,它归结为这 3 个 bean。

@Bean
	public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(SeekToCurrentErrorHandler errorHandler) {
		return (container, dest, group) -> {
			container.setErrorHandler(errorHandler);
		};
	}
	@Bean
	public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
		return new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer);
	}
	@Bean
	public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
		return new DeadLetterPublishingRecoverer(bytesTemplate);
	}

让我们分析一下它们。第一个是 ListenerContainerCustomizer bean,它接受一个 SeekToCurrentErrorHandler。容器现在使用该特定错误处理程序进行自定义。您可以在此处了解有关容器自定义的更多信息。

第二个 bean 是 SeekToCurrentErrorHandler,它配置为发布到 DLT。有关 SeekToCurrentErrorHandler 的更多详细信息,请参见此处

第三个 bean 是 DeadLetterPublishingRecoverer,它最终负责发送到 DLT。默认情况下,DLT 主题的命名格式为 ORIGINAL_TOPIC_NAME.DLT。但是您可以更改它。有关详细信息,请参见文档

我们还需要通过应用程序配置配置一个 ErrorHandlingDeserializer

ErrorHandlingDeserializer 委托给实际的反序列化器。在发生错误时,它将记录的键/值设置为 null,并包含消息的原始字节。然后它在头部设置异常并将此记录传递给监听器,监听器然后调用已注册的错误处理程序。

以下是所需的配置

spring.cloud.stream:
  function:
    definition: functionName
  bindings:
    functionName-in-0:
      group: group-name
      destination: input-topic
      consumer:
       use-native-decoding: true
  kafka:
    bindings:
      functionName-in-0:
        consumer:
          enableDlq: true
          dlqName: dlq-topic
          dlqProducerProperties:
            configuration:
              value.serializer: org.apache.kafka.common.serialization.StringSerializer
          configuration:
            value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer

我们通过绑定上的 configuration 属性提供了 ErrorHandlingDeserializer。我们还指出了要委托的实际反序列化器是 StringDeserializer

请记住,上述任何 DLQ 属性都与本方案中的讨论无关。它们纯粹是为了解决任何应用程序级别的错误。

3.4. Kafka binder 中的基本偏移量管理

3.4.1. 问题陈述

我想编写一个 Spring Cloud Stream Kafka 消费者应用程序,但不确定它如何管理 Kafka 消费者偏移量。你能解释一下吗?

3.4.2. 解决方案

我们鼓励您阅读 文档 中的相关部分,以获得全面的理解。

简而言之

Kafka 默认支持两种类型的起始偏移量 - earliestlatest。它们的语义从名称中不言自明。

假设您是第一次运行消费者。如果您的 Spring Cloud Stream 应用程序中缺少 group.id,那么它就变成了匿名消费者。每当您有匿名消费者时,Spring Cloud Stream 应用程序默认将从主题分区中可用的 latest 偏移量开始。另一方面,如果您显式指定了 group.id,那么 Spring Cloud Stream 应用程序默认将从主题分区中可用的 earliest 偏移量开始。

在上述两种情况(具有显式组和匿名组的消费者)下,可以通过使用属性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.startOffset 并将其设置为 earliestlatest 来切换起始偏移量。

现在,假设您之前已经运行过消费者,现在又重新启动它。在这种情况下,上述情况中的起始偏移量语义不再适用,因为消费者为消费者组找到了一个已提交的偏移量(对于匿名消费者,尽管应用程序没有提供 group.id,但 binder 会为您自动生成一个)。它只是从上次提交的偏移量开始。即使您提供了 startOffset 值,这也是如此。

但是,您可以通过使用 resetOffsets 属性来覆盖消费者从上次提交偏移量开始的默认行为。为此,请将属性 spring.cloud.stream.kafka.bindings.<binding-name>.consumer.resetOffsets 设置为 true(默认值为 false)。然后确保提供 startOffset 值(earliestlatest)。当您这样做并启动消费者应用程序时,每次启动时,它都会像第一次启动一样开始,并忽略分区的所有已提交偏移量。

3.5. 在 Kafka 中定位任意偏移量

3.5.1. 问题陈述

使用 Kafka binder,我知道它可以将偏移量设置为 earliestlatest,但我有一个需求,需要将偏移量查找至中间的某个任意偏移量。Spring Cloud Stream Kafka binder 有没有办法实现这一点?

3.5.2. 解决方案

之前我们看到了 Kafka binder 如何帮助您解决基本的偏移量管理。默认情况下,binder 不允许您回溯到任意偏移量,至少通过我们在该方案中看到的那种机制。但是,binder 提供了一些低级策略来实现此用例。让我们探讨一下。

首先,当您想重置到 earliestlatest 之外的任意偏移量时,请确保将 resetOffsets 配置保留为默认值,即 false。然后您必须提供一个 KafkaBindingRebalanceListener 类型的自定义 bean,它将被注入到所有消费者绑定中。它是一个带有几个默认方法的接口,但我们感兴趣的方法是:

/**
	 * Invoked when partitions are initially assigned or after a rebalance. Applications
	 * might only want to perform seek operations on an initial assignment. While the
	 * 'initial' argument is true for each thread (when concurrency is greater than 1),
	 * implementations should keep track of exactly which partitions have been sought.
	 * There is a race in that a rebalance could occur during startup and so a topic/
	 * partition that has been sought on one thread may be re-assigned to another
	 * thread and you may not wish to re-seek it at that time.
	 * @param bindingName the name of the binding.
	 * @param consumer the consumer.
	 * @param partitions the partitions.
	 * @param initial true if this is the initial assignment on the current thread.
	 */
	default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
			Collection<TopicPartition> partitions, boolean initial) {
		// do nothing
	}

让我们看看细节。

本质上,此方法将在主题分区的初始分配期间或再平衡之后每次调用。为了更好地说明,让我们假设我们的主题是 foo,它有 4 个分区。最初,我们只在组中启动一个消费者,此消费者将从所有分区消费。当消费者第一次启动时,所有 4 个分区都将进行初始分配。但是,我们不希望分区从默认值开始消费(因为我们定义了一个组,所以是 earliest),而是希望每个分区在查找任意偏移量后开始消费。想象一下您有一个业务用例,需要从如下所示的某些偏移量开始消费。

Partition   start offset

0           1000
1           2000
2           2000
3           1000

这可以通过如下实现上述方法来完成。

@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions, boolean initial) {

    Map<TopicPartition, Long> topicPartitionOffset = new HashMap<>();
    topicPartitionOffset.put(new TopicPartition("foo", 0), 1000L);
    topicPartitionOffset.put(new TopicPartition("foo", 1), 2000L);
    topicPartitionOffset.put(new TopicPartition("foo", 2), 2000L);
    topicPartitionOffset.put(new TopicPartition("foo", 3), 1000L);

    if (initial) {
        partitions.forEach(tp -> {
            if (topicPartitionOffset.containsKey(tp)) {
                final Long offset = topicPartitionOffset.get(tp);
                try {
                    consumer.seek(tp, offset);
                }
                catch (Exception e) {
                    // Handle excpetions carefully.
                }
            }
        });
    }
}

这只是一个粗略的实现。实际用例比这复杂得多,您需要相应地进行调整,但这确实为您提供了一个基本草图。当消费者 seek 失败时,它可能会抛出一些运行时异常,您需要决定在这种情况下该怎么做。

3.5.3. 如果我们启动一个具有相同 group id 的第二个消费者会怎样?

当我们添加第二个消费者时,会发生重新平衡,并且一些分区将被移动。假设新的消费者获得了分区 23。当这个新的 Spring Cloud Stream 消费者调用这个 onPartitionsAssigned 方法时,它将看到这是该消费者分区 23 的初始分配。因此,它将由于 initial 参数上的条件检查而执行查找操作。对于第一个消费者,它现在只有分区 01。但是,对于此消费者,它只是一个重新平衡事件,不被视为初始分配。因此,它不会由于 initial 参数上的条件检查而重新查找给定的偏移量。

3.6. 如何使用 Kafka binder 手动确认?

3.6.1. 问题陈述

使用 Kafka binder,我知道它可以在我的消费者中手动确认消息。我该怎么做?

3.6.2. 解决方案

默认情况下,Kafka binder 委托给 Spring for Apache Kafka 项目中的默认提交设置。Spring Kafka 中的默认 ackModebatch。有关详细信息,请参阅 此处

在某些情况下,您希望禁用此默认提交行为并依赖手动提交。以下步骤允许您这样做。

将属性spring.cloud.stream.kafka.bindings.<binding-name>.consumer.ackMode设置为MANUALMANUAL_IMMEDIATE。这样设置后,消费者方法收到的消息中将包含一个名为kafka_acknowledgment(来自KafkaHeaders.ACKNOWLEDGMENT)的标头。

例如,假设这是您的消费者方法。

@Bean
public Consumer<Message<String>> myConsumer() {
    return msg -> {
        Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
        if (acknowledgment != null) {
         System.out.println("Acknowledgment provided");
         acknowledgment.acknowledge();
        }
    };
}

然后,将属性spring.cloud.stream.bindings.myConsumer-in-0.consumer.ackMode设置为MANUALMANUAL_IMMEDIATE

3.7. 如何在 Spring Cloud Stream 中覆盖默认绑定名称?

3.7.1. 问题陈述

Spring Cloud Stream 根据函数定义和签名创建默认绑定,但我如何将这些绑定覆盖为更符合领域友好的名称?

3.7.2. 解决方案

假设以下是您的函数签名。

@Bean
public Function<String, String> uppercase(){
...
}

默认情况下,Spring Cloud Stream 将创建如下绑定。

  1. uppercase-in-0

  2. uppercase-out-0

您可以使用以下属性覆盖这些绑定。

spring.cloud.stream.function.bindings.uppercase-in-0=my-transformer-in
spring.cloud.stream.function.bindings.uppercase-out-0=my-transformer-out

此后,所有绑定属性都必须在新名称my-transformer-inmy-transformer-out上进行设置。

以下是 Kafka Streams 和多个输入的另一个示例。

@Bean
public BiFunction<KStream<String, Order>, KTable<String, Account>, KStream<String, EnrichedOrder>> processOrder() {
...
}

默认情况下,Spring Cloud Stream 将为该函数创建三个不同的绑定名称。

  1. processOrder-in-0

  2. processOrder-in-1

  3. processOrder-out-0

每次您想对这些绑定进行一些配置时,都必须使用这些绑定名称。您不喜欢这样,并且希望使用更符合领域友好且可读的绑定名称,例如:

  1. orders

  2. accounts

  3. enrichedOrders

您只需设置以下三个属性即可轻松做到这一点:

  1. spring.cloud.stream.function.bindings.processOrder-in-0=orders

  2. spring.cloud.stream.function.bindings.processOrder-in-1=accounts

  3. spring.cloud.stream.function.bindings.processOrder-out-0=enrichedOrders

一旦您这样做了,它将覆盖默认绑定名称,并且您想要设置的任何属性都必须在新绑定名称上。

3.8. 如何将消息键作为记录的一部分发送?

3.8.1. 问题陈述

我需要将一个键与记录的有效载荷一起发送,Spring Cloud Stream 中有什么方法可以做到吗?

3.8.2. 解决方案

通常需要将关联数据结构(如映射)作为带有键和值的记录发送。Spring Cloud Stream 允许您以直接的方式执行此操作。以下是实现此目的的基本蓝图,但您可能需要根据自己的特定用例进行调整。

这是一个示例生产者方法(又称Supplier)。

@Bean
public Supplier<Message<String>> supplier() {
    return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}

这是一个发送带有String有效载荷和键的消息的简单函数。请注意,我们使用KafkaHeaders.MESSAGE_KEY将键设置为消息头。

如果您想更改默认的kafka_messageKey键,那么在配置中,我们需要指定此属性:

spring.cloud.stream.kafka.bindings.supplier-out-0.producer.messageKeyExpression=headers['my-special-key']

请注意,我们使用绑定名称supplier-out-0,因为这是我们的函数名称,请相应更新。

然后,我们在生成消息时使用这个新键。

3.9. 如何使用原生序列化器和反序列化器,而不是 Spring Cloud Stream 进行的消息转换?

3.9.1. 问题陈述

我不想使用 Spring Cloud Stream 中的消息转换器,而是想使用 Kafka 中的原生序列化器和反序列化器。默认情况下,Spring Cloud Stream 使用其内部内置的消息转换器来处理此转换。我如何绕过此操作并将责任委托给 Kafka?

3.9.2. 解决方案

这真的很容易做到。

您只需提供以下属性即可启用原生序列化。

spring.cloud.stream.kafka.bindings.<binding-name>.producer.useNativeEncoding: true

然后,您还需要设置序列化器。有两种方法可以做到这一点。

spring.cloud.stream.kafka.bindings.<binding-name>.producer.configurarion.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configurarion.value.serializer: org.apache.kafka.common.serialization.StringSerializer

或者使用绑定器配置。

spring.cloud.stream.kafka.binder.configurarion.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.binder.configurarion.value.serializer: org.apache.kafka.common.serialization.StringSerializer

当使用绑定器方式时,它适用于所有绑定,而将它们设置在绑定上则是针对每个绑定。

在反序列化方面,您只需将反序列化器作为配置提供即可。

例如,

spring.cloud.stream.kafka.bindings.<binding-name>.consumer.configurarion.key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configurarion.value.deserializer: org.apache.kafka.common.serialization.StringDeserializer

您也可以在绑定器级别设置它们。

有一个可选属性,您可以设置它来强制进行原生解码。

spring.cloud.stream.kafka.bindings.<binding-name>.consumer.useNativeDecoding: true

但是,对于 Kafka 绑定器来说,这是不必要的,因为当它到达绑定器时,Kafka 已经使用配置的反序列化器对它们进行了反序列化。

3.10. 解释 Kafka Streams binder 中偏移量重置的工作原理

3.10.1. 问题陈述

默认情况下,Kafka Streams 绑定器对于新消费者总是从最早的偏移量开始。有时,应用程序需要或有益于从最新的偏移量开始。Kafka Streams 绑定器允许您这样做。

3.10.2. 解决方案

在我们查看解决方案之前,让我们先看以下场景。

@Bean
public BiConsumer<KStream<Object, Object>, KTable<Object, Object>> myBiConsumer{
    (s, t) -> s.join(t, ...)
    ...
}

我们有一个需要两个输入绑定的BiConsumer Bean。在这种情况下,第一个绑定用于KStream,第二个绑定用于KTable。首次运行此应用程序时,默认情况下,两个绑定都从earliest偏移量开始。如果由于某些要求,我想从latest偏移量开始怎么办?您可以通过启用以下属性来做到这一点。

spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-0.consumer.startOffset: latest
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-1.consumer.startOffset: latest

如果您只想让一个绑定从latest偏移量开始,而另一个绑定从默认的earliest消费,则将后者绑定排除在配置之外。

请记住,一旦存在已提交的偏移量,这些设置将生效,并且已提交的偏移量将优先。

3.11. 在 Kafka 中跟踪成功发送记录(生产)

3.11.1. 问题陈述

我有一个 Kafka 生产者应用程序,我想跟踪所有成功的发送。

3.11.2. 解决方案

让我们假设应用程序中有以下供应商。

@Bean
	public Supplier<Message<String>> supplier() {
		return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
	}

然后,我们需要定义一个新的MessageChannel bean 来捕获所有成功的发送信息。

@Bean
	public MessageChannel fooRecordChannel() {
		return new DirectChannel();
	}

接下来,在应用程序配置中定义此属性,以提供recordMetadataChannel的 bean 名称。

spring.cloud.stream.kafka.bindings.supplier-out-0.producer.recordMetadataChannel: fooRecordChannel

此时,成功的发送信息将发送到fooRecordChannel

您可以编写一个如下所示的IntegrationFlow来查看信息。

@Bean
public IntegrationFlow integrationFlow() {
    return f -> f.channel("fooRecordChannel")
                 .handle((payload, messageHeaders) -> payload);
}

handle方法中,有效载荷是发送到 Kafka 的内容,消息头包含一个名为kafka_recordMetadata的特殊键。它的值是RecordMetadata,其中包含有关主题分区、当前偏移量等信息。

3.12. 在 Kafka 中添加自定义头映射器

3.12.1. 问题陈述

我有一个 Kafka 生产者应用程序设置了一些头,但它们在消费者应用程序中丢失了。这是为什么?

3.12.2. 解决方案

在正常情况下,这应该没问题。

想象一下,您有以下生产者。

@Bean
public Supplier<Message<String>> supply() {
    return () -> MessageBuilder.withPayload("foo").setHeader("foo", "bar").build();
}

在消费者端,您应该仍然看到头“foo”,以下内容不应给您带来任何问题。

@Bean
public Consumer<Message<String>> consume() {
    return s -> {
        final String foo = (String)s.getHeaders().get("foo");
        System.out.println(foo);
    };
}

如果您在应用程序中提供了自定义头映射器,那么这将不起作用。假设您的应用程序中有一个空的KafkaHeaderMapper

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {

        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {

        }
    };
}

如果这是您的实现,那么您将错过消费者端的foo头。您可能在那些KafkaHeaderMapper方法中有一些逻辑。您需要以下内容来填充foo头。

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {
            final String foo = (String) headers.get("foo");
            target.add("foo", foo.getBytes());
        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {
            final Header foo = source.lastHeader("foo");
			target.put("foo", new String(foo.value()));
        }
    }

这将正确地将foo头从生产者填充到消费者。

3.12.3. 关于 ID 头的特别说明

在 Spring Cloud Stream 中,id头是一个特殊的头,但有些应用程序可能希望拥有特殊的自定义 id 头,例如custom-idIDId。第一个(custom-id)无需任何自定义头映射器即可从生产者传播到消费者。但是,如果您使用框架保留的id头的变体(例如IDIdiD等)进行生产,那么您将遇到框架内部问题。有关此用例的更多上下文,请参阅此StackOverflow 帖子。在这种情况下,您必须使用自定义KafkaHeaderMapper来映射大小写敏感的 id 头。例如,假设您有以下生产者。

@Bean
public Supplier<Message<String>> supply() {
    return () -> MessageBuilder.withPayload("foo").setHeader("Id", "my-id").build();
}

上面的头Id将从消费端消失,因为它与框架的id头冲突。您可以提供自定义KafkaHeaderMapper来解决此问题。

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper1() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {
            final String myId = (String) headers.get("Id");
			target.add("Id", myId.getBytes());
        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {
            final Header Id = source.lastHeader("Id");
			target.put("Id", new String(Id.value()));
        }
    };
}

通过这样做,idId头都将从生产者端到消费者端可用。

3.13. 在事务中生产到多个主题

3.13.1. 问题陈述

我如何向多个 Kafka 主题生产事务性消息?

有关更多上下文,请参阅此StackOverflow 问题

3.13.2. 解决方案

在 Kafka 绑定器中使用事务支持进行事务处理,然后提供一个AfterRollbackProcessor。为了生产到多个主题,请使用StreamBridge API。

以下是相关的代码片段。

@Autowired
StreamBridge bridge;

@Bean
Consumer<String> input() {
    return str -> {
        System.out.println(str);
        this.bridge.send("left", str.toUpperCase());
        this.bridge.send("right", str.toLowerCase());
        if (str.equals("Fail")) {
            throw new RuntimeException("test");
        }
    };
}

@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BinderFactory binders) {
    return (container, dest, group) -> {
        ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
                MessageChannel.class)).getTransactionalProducerFactory();
        KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(pf);
        DefaultAfterRollbackProcessor rollbackProcessor = rollbackProcessor(template);
        container.setAfterRollbackProcessor(rollbackProcessor);
    };
}

DefaultAfterRollbackProcessor rollbackProcessor(KafkaTemplate<byte[], byte[]> template) {
    return new DefaultAfterRollbackProcessor<>(
            new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L), template, true);
}

3.13.3. 所需配置

spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: tx-
spring.cloud.stream.kafka.binder.required-acks=all
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.input-in-0.destination=input
spring.cloud.stream.bindings.left.destination=left
spring.cloud.stream.bindings.right.destination=right

spring.cloud.stream.kafka.bindings.input-in-0.consumer.maxAttempts=1

为了测试,您可以使用以下内容。

@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
    return args -> {
        System.in.read();
        template.send("input", "Fail".getBytes());
        template.send("input", "Good".getBytes());
    };
}

一些重要注意事项

请确保您的应用程序配置中没有任何 DLQ 设置,因为我们手动配置了 DLT(默认情况下,它将基于初始消费者函数发布到名为input.DLT的主题)。此外,将消费者绑定上的maxAttempts重置为1,以避免绑定器重试。在上面的示例中,总共将尝试 3 次(初始尝试 + FixedBackoff中的 2 次尝试)。

有关如何测试此代码的更多详细信息,请参阅StackOverflow 帖子。如果您使用 Spring Cloud Stream 通过添加更多消费者函数来测试它,请确保将消费者绑定上的isolation-level设置为read-committed

StackOverflow 帖子也与此讨论相关。

3.14. 运行多个可轮询消费者时要避免的陷阱

3.14.1. 问题陈述

如何运行可轮询消费者的多个实例并为每个实例生成唯一的client.id

3.14.2. 解决方案

假设我有以下定义。

spring.cloud.stream.pollable-source: foo
spring.cloud.stream.bindings.foo-in-0.group: my-group

当运行应用程序时,Kafka 消费者会生成一个 client.id(类似于consumer-my-group-1)。对于应用程序的每个运行实例,此client.id将相同,从而导致意外问题。

为了解决这个问题,您可以在应用程序的每个实例上添加以下属性:

spring.cloud.stream.kafka.bindings.foo-in-0.consumer.configuration.client.id=${client.id}

有关更多详细信息,请参阅此GitHub 问题

附录

附录 A:构建

A.1. 基本编译和测试

要构建源代码,您需要安装 JDK 1.7。

构建使用 Maven wrapper,因此您无需安装特定版本的 Maven。要启用测试,您应该在构建之前运行 Kafka 服务器 0.9 或更高版本。有关运行服务器的更多信息,请参阅下文。

主要的构建命令是

$ ./mvnw clean install

如果您愿意,也可以添加“-DskipTests”以避免运行测试。

您也可以自行安装 Maven(>=3.3.3),并在下面的示例中用 mvn 命令代替 ./mvnw。如果您这样做,并且您的本地 Maven 设置不包含 Spring 预发布构件的仓库声明,您可能还需要添加 -P spring
请注意,您可能需要通过设置 MAVEN_OPTS 环境变量,并将其值设置为 -Xmx512m -XX:MaxPermSize=128m 来增加 Maven 可用的内存量。我们尝试在 .mvn 配置中涵盖这一点,因此如果您发现必须这样做才能使构建成功,请提出一个问题,将这些设置添加到源代码控制中。

需要中间件的项目通常包含docker-compose.yml,因此请考虑使用Docker Compose在 Docker 容器中运行中间件服务器。

A.2. 文档

有一个“完整”配置文件将生成文档。

A.3. 使用代码

如果您没有 IDE 偏好,我们建议您在使用代码时使用Spring Tools SuiteEclipse。我们使用m2eclipse Eclipse 插件提供 Maven 支持。其他 IDE 和工具也应能正常工作。

A.3.1. 使用 m2eclipse 导入到 eclipse

我们建议在使用 Eclipse 时使用m2eclipse Eclipse 插件。如果您尚未安装 m2eclipse,可以从“Eclipse Marketplace”获取。

不幸的是,m2e 尚不支持 Maven 3.3,因此一旦项目导入 Eclipse,您还需要告诉 m2eclipse 对项目使用.settings.xml文件。如果您不这样做,您可能会看到许多与项目中 POM 相关错误。打开您的 Eclipse 首选项,展开 Maven 首选项,然后选择用户设置。在用户设置字段中单击浏览并导航到您导入的 Spring Cloud 项目,选择该项目中的.settings.xml文件。单击应用,然后单击确定以保存首选项更改。

或者,您可以将.settings.xml中的存储库设置复制到您自己的~/.m2/settings.xml中。

A.3.2. 不使用 m2eclipse 导入到 eclipse

如果您不想使用 m2eclipse,可以使用以下命令生成 Eclipse 项目元数据:

$ ./mvnw eclipse:eclipse

生成的 Eclipse 项目可以通过从“文件”菜单中选择“导入现有项目”来导入。

[[contributing] == 贡献

Spring Cloud 根据非限制性 Apache 2.0 许可证发布,并遵循非常标准的 Github 开发流程,使用 Github 跟踪器处理问题并将拉取请求合并到 master 分支。如果您想贡献哪怕是微不足道的东西,请不要犹豫,但请遵循以下准则。

A.4. 签署贡献者许可协议

在我们接受非平凡的补丁或拉取请求之前,我们需要您签署贡献者协议。签署贡献者协议不会授予任何人主仓库的提交权限,但这确实意味着我们可以接受您的贡献,如果接受,您将获得作者署名。积极的贡献者可能会被邀请加入核心团队,并被授予合并拉取请求的能力。

A.5. 代码约定和内务管理

这些对于拉取请求都不是必不可少的,但它们都会有所帮助。它们也可以在原始拉取请求之后但在合并之前添加。

  • 使用 Spring Framework 代码格式约定。如果您使用 Eclipse,可以使用Spring Cloud Build项目中的eclipse-code-formatter.xml文件导入格式化程序设置。如果使用 IntelliJ,可以使用Eclipse Code Formatter Plugin导入相同的文件。

  • 确保所有新的 .java 文件都包含一个简单的 Javadoc 类注释,至少包含一个标识您的 @author 标签,并且最好至少有一个段落说明该类的用途。

  • 将 ASF 许可头注释添加到所有新的 .java 文件中(从项目中的现有文件复制)

  • 如果您对 .java 文件进行了大量修改(不仅仅是表面上的更改),请将自己添加为 @author

  • 添加一些 Javadoc,如果您更改了命名空间,还要添加一些 XSD 文档元素。

  • 一些单元测试也会有很大帮助——总得有人去做。

  • 如果您的分支没有其他人使用,请将其与当前 master(或主项目中的其他目标分支)进行 rebase。

  • 编写提交消息时请遵循 这些约定,如果您正在修复现有问题,请在提交消息末尾添加 Fixes gh-XXXX(其中 XXXX 是问题编号)。

© . This site is unofficial and not affiliated with VMware.