配置选项

本部分包含 Apache Kafka Binder 使用的配置选项。

有关 Binder 的通用配置选项和属性,请参阅核心文档中的 绑定属性

Kafka Binder 属性

spring.cloud.stream.kafka.binder.brokers

Kafka Binder 连接到的 Broker 列表。

默认值:localhost

spring.cloud.stream.kafka.binder.defaultBrokerPort

brokers 允许指定带有或不带有端口信息的主机(例如,host1,host2:port2)。当 Broker 列表中未配置端口时,此设置将默认端口。

默认值:9092

spring.cloud.stream.kafka.binder.configuration

Binder 创建的所有客户端传递给客户端属性(生产者和使用者)的键/值映射。由于这些属性由生产者和使用者同时使用,因此使用应仅限于通用属性,例如安全设置。通过此配置提供的未知 Kafka 生产者或使用者属性将被过滤掉,并且不允许传播。此处的属性将替代 boot 中设置的任何属性。

默认值:空映射。

spring.cloud.stream.kafka.binder.consumerProperties

任意 Kafka 客户端使用者属性的键/值映射。除了支持已知的 Kafka 使用者属性外,此处还允许未知使用者属性。此处的属性将替代 boot 中和上述 configuration 属性中设置的任何属性。

默认值:空映射。

spring.cloud.stream.kafka.binder.headers

传输器传输的自定义标头的列表。仅在与较旧应用程序(⇐ 1.3.x)通信且 kafka-clients 版本 < 0.11.0.0 时需要。较新版本本机支持标头。

默认值:空。

spring.cloud.stream.kafka.binder.healthTimeout

获取分区信息的时间(秒)。如果此计时器到期,则将运行状况报告为关闭。

默认值:10。

spring.cloud.stream.kafka.binder.requiredAcks

代理上所需的确认数。有关生产者 acks 属性,请参阅 Kafka 文档。

默认值:1

spring.cloud.stream.kafka.binder.minPartitionCount

仅当设置了 autoCreateTopicsautoAddPartitions 时才有效。传输器在生产或使用数据时配置主题上的分区全局最小数。它可以被生产者的 partitionCount 设置或生产者的 instanceCount * concurrency 设置的值(如果任一值更大)所取代。

默认值:1

spring.cloud.stream.kafka.binder.producerProperties

任意 Kafka 客户端生产者属性的键/值映射。除了支持已知的 Kafka 生产者属性外,此处还允许未知的生产者属性。此处的属性会取代引导中设置的任何属性和上述 configuration 属性中的任何属性。

默认值:空映射。

spring.cloud.stream.kafka.binder.replicationFactor

如果 autoCreateTopics 处于活动状态,则为自动创建的主题的复制因子。可以在每个绑定上覆盖。

如果您使用的是 2.4 之前的 Kafka 代理版本,则应将此值至少设置为 1。从版本 3.0.8 开始,传输器使用 -1 作为默认值,这表示将使用代理“default.replication.factor”属性来确定副本数。请咨询您的 Kafka 代理管理员,了解是否有要求最小复制因子的策略,如果是这种情况,则通常 default.replication.factor 将匹配该值,并且应使用 -1,除非您需要大于最小值的复制因子。

默认值:-1

spring.cloud.stream.kafka.binder.autoCreateTopics

如果设置为 true,则传输器会自动创建新主题。如果设置为 false,则传输器依赖于已配置的主题。在后一种情况下,如果主题不存在,则传输器将无法启动。

此设置独立于代理的 auto.create.topics.enable 设置,并且不会影响它。如果服务器设置为自动创建主题,则它们可能会作为元数据检索请求的一部分创建,并带有默认代理设置。

默认值:true

spring.cloud.stream.kafka.binder.autoAddPartitions

如果设置为 true,则绑定器在需要时创建新分区。如果设置为 false,则绑定器依赖于主题的分区大小已配置。如果目标主题的分区计数小于预期值,则绑定器将无法启动。

默认值:false

spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix

在绑定器中启用事务。请参阅 Kafka 文档中的 transaction.idspring-kafka 文档中的 Transactions。启用事务后,将忽略各个 producer 属性,所有生产者都使用 spring.cloud.stream.kafka.binder.transaction.producer.* 属性。

默认值 null(无事务)

spring.cloud.stream.kafka.binder.transaction.producer.*

事务绑定器中生产者的全局生产者属性。请参阅 spring.cloud.stream.kafka.binder.transaction.transactionIdPrefixKafka Producer Properties 以及所有绑定器支持的常规生产者属性。

默认值:请参阅各个生产者属性。

spring.cloud.stream.kafka.binder.headerMapperBeanName

用于将 spring-messaging 头映射到 Kafka 头以及从 Kafka 头映射回来的 KafkaHeaderMapper 的 Bean 名称。例如,如果您希望自定义使用 JSON 反序列化头信息的 BinderHeaderMapper Bean 中的受信任包,请使用此属性。如果此自定义 BinderHeaderMapper Bean 未使用此属性提供给绑定器,则绑定器将在回退到绑定器创建的默认 BinderHeaderMapper 之前,查找类型为 BinderHeaderMapper、名称为 kafkaBinderHeaderMapper 的头映射器 Bean。

默认值:无。

spring.cloud.stream.kafka.binder.considerDownWhenAnyPartitionHasNoLeader

一个标志,用于将绑定器运行状况设置为 down,当主题上的任何分区(无论从中接收数据的消费者如何)未找到领导者时。

默认值:true

spring.cloud.stream.kafka.binder.certificateStoreDirectory

当信托库或密钥库证书位置被指定为非本地文件系统资源(由 org.springframework.core.io.Resource 支持的资源,例如 CLASSPATH、HTTP 等)时,绑定程序会将资源从路径(可转换为 org.springframework.core.io.Resource)复制到文件系统上的某个位置。这适用于代理级别证书(ssl.truststore.locationssl.keystore.location)和用于架构注册表的证书(schema.registry.ssl.truststore.locationschema.registry.ssl.keystore.location)。请记住,信托库和密钥库位置路径必须在 spring.cloud.stream.kafka.binder.configuration…​ 下提供。例如,spring.cloud.stream.kafka.binder.configuration.ssl.truststore.locationspring.cloud.stream.kafka.binder.configuration.schema.registry.ssl.truststore.location 等。该文件将被复制到此属性的值所指定的某个位置,该位置必须是文件系统上一个现有的目录,并且应用程序运行进程具有该目录的写入权限。如果未设置此值且证书文件是非本地文件系统资源,则会将其复制到系统临时目录,如 System.getProperty("java.io.tmpdir") 所返回的目录。如果此值存在,但文件系统上找不到该目录或该目录不可写,则也会发生这种情况。

默认值:无。

spring.cloud.stream.kafka.binder.metrics.defaultOffsetLagMetricsEnabled

当设置为 true 时,每当访问指标时,都会计算每个使用者主题的偏移量滞后指标。当设置为 false 时,仅使用定期计算的偏移量滞后。

默认值:true

spring.cloud.stream.kafka.binder.metrics.offsetLagMetricsInterval

计算每个使用者主题的偏移量滞后的时间间隔。每当 metrics.defaultOffsetLagMetricsEnabled 被禁用或其计算时间过长时,都会使用此值。

默认值:60 秒

spring.cloud.stream.kafka.binder.enableObservation

在此绑定程序中的所有绑定上启用 Micrometer 观察注册表。

默认值:false

spring.cloud.stream.kafka.binder.healthIndicatorConsumerGroup

KafkaHealthIndicator 元数据使用者 group.id。此使用者由 HealthIndicator 用于查询正在使用的主题的元数据。

默认值:无。

Kafka 使用者属性

以下属性仅适用于 Kafka 使用者,并且必须以 spring.cloud.stream.kafka.bindings.<channelName>.consumer. 为前缀。

为了避免重复,Spring Cloud Stream 支持使用 spring.cloud.stream.kafka.default.consumer.<property>=<value> 格式为所有通道设置值。
admin.configuration

自 2.1.1 版本起,此属性已弃用,推荐使用 topic.properties,并且在未来版本中将不再支持此属性。

admin.replicas-assignment

自 2.1.1 版本起,此属性已弃用,推荐使用 topic.replicas-assignment,并且在未来版本中将不再支持此属性。

admin.replication-factor

自 2.1.1 版本起,此属性已弃用,推荐使用 topic.replication-factor,并且在未来版本中将不再支持此属性。

autoRebalanceEnabled

true 时,主题分区在消费者组的成员之间自动重新平衡。当 false 时,每个消费者基于 spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex 分配一组固定的分区。这要求在每个启动的实例上适当地设置 spring.cloud.stream.instanceCountspring.cloud.stream.instanceIndex 属性。在这种情况下,spring.cloud.stream.instanceCount 属性的值通常必须大于 1。

默认值:true

ackEachRecord

autoCommitOffsettrue 时,此设置决定是否在处理完每条记录后提交偏移量。默认情况下,在处理完 consumer.poll() 返回的记录批中的所有记录后提交偏移量。可以通过消费者 configuration 属性设置的 Kafka 属性 max.poll.records 来控制轮询返回的记录数。将此属性设置为 true 可能会导致性能下降,但这样做可以降低在发生故障时重新传递记录的可能性。此外,请参阅绑定 requiredAcks 属性,它也会影响提交偏移量的性能。此属性已在 3.1 中弃用,推荐使用 ackMode。如果未设置 ackMode 且未启用批处理模式,则将使用 RECORD ackMode。

默认值:false

autoCommitOffset

从 3.1 版本开始,此属性已弃用。有关备用方案的更多详细信息,请参见 ackMode。在处理消息后是否自动提交偏移量。如果设置为 false,则入站消息中存在键为 kafka_acknowledgment 的类型为 org.springframework.kafka.support.Acknowledgment 标头的标头。应用程序可以使用此标头来确认消息。有关详细信息,请参见示例部分。当此属性设置为 false 时,Kafka 绑定程序将 ack 模式设置为 org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL,并且应用程序负责确认记录。另请参见 ackEachRecord

默认值:true

ackMode

指定容器 ack 模式。这基于 Spring Kafka 中定义的 AckMode 枚举。如果 ackEachRecord 属性设置为 true 并且消费者不在批处理模式中,那么这将使用 RECORD 的 ack 模式,否则使用此属性提供的 ack 模式。

autoCommitOnError

在可轮询消费者中,如果设置为 true,则它始终在出错时自动提交。如果没有设置(默认值)或为 false,则它不会在可轮询消费者中自动提交。请注意,此属性仅适用于可轮询消费者。

默认值:未设置。

resetOffsets

是否将消费者的偏移量重置为 startOffset 提供的值。如果提供了 KafkaBindingRebalanceListener,则必须为 false;请参见rebalance 侦听器。有关此属性的更多信息,请参见reset-offsets

默认值:false

startOffset

新组的起始偏移量。允许的值:earliestlatest。如果明确为消费者“绑定”设置了消费者组(通过 spring.cloud.stream.bindings.<channelName>.group),则将 'startOffset' 设置为 earliest。否则,对于 anonymous 消费者组,将其设置为 latest。有关此属性的更多信息,请参见reset-offsets

默认值:null(等效于 earliest)。

enableDlq

当设置为 true 时,它将为消费者启用 DLQ 行为。默认情况下,导致错误的消息将转发到名为 error.<destination>.<group> 的主题。可以通过设置 dlqName 属性或定义类型为 DlqDestinationResolver@Bean 来配置 DLQ 主题名称。对于错误数量相对较少且重新播放整个原始主题可能过于繁琐的情况,这提供了一种替代方案,以替代更常见的 Kafka 重播场景。有关更多信息,请参见kafka dlq 处理。从 2.0 版本开始,发送到 DLQ 主题的消息将使用以下标头进行增强:x-original-topicx-exception-messagex-exception-stacktrace 作为 byte[]。默认情况下,失败的记录将发送到 DLQ 主题中与原始记录相同的分区号。有关如何更改该行为,请参见dlq 分区选择destinationIsPatterntrue 时不允许。

默认值:false

dlqPartitions

enableDlq 为 true,且此属性未设置时,将创建一个分区数与主主题相同的死信主题。通常,死信记录将发送到死信主题中与原始记录相同的分区中。此行为可以更改;请参阅 dlq 分区选择。如果此属性设置为 1 且没有 DqlPartitionFunction bean,则所有死信记录都将写入分区 0。如果此属性大于 1,则必须提供 DlqPartitionFunction bean。请注意,实际分区计数受 binder 的 minPartitionCount 属性影响。

默认值:none

配置

包含通用 Kafka 消费者属性的键/值对的映射。除了具有 Kafka 消费者属性之外,还可以在此处传递其他配置属性。例如,应用程序所需的一些属性,如 spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=barbootstrap.servers 属性不能在此处设置;如果需要连接到多个集群,请使用多 binder 支持。

默认值:空映射。

dlqName

接收错误消息的 DLQ 主题的名称。

默认值:null(如果未指定,则导致错误的消息将转发到名为 error.<destination>.<group> 的主题)。

dlqProducerProperties

使用此属性,可以设置 DLQ 特定的生产者属性。可以通过此属性设置 Kafka 生产者属性中可用的所有属性。当在消费者上启用本机解码(即 useNativeDecoding: true)时,应用程序必须为 DLQ 提供相应的键/值序列化器。这必须以 dlqProducerProperties.configuration.key.serializerdlqProducerProperties.configuration.value.serializer 的形式提供。

默认值:默认 Kafka 生产者属性。

standardHeaders

指示入站通道适配器填充哪些标准标头。允许的值:noneidtimestampboth。如果使用本机反序列化,并且接收消息的第一个组件需要 id(例如配置为使用 JDBC 消息存储的聚合器),则此属性很有用。

默认值:none

converterBeanName

实现 RecordMessageConverter 的 bean 的名称。在入站通道适配器中使用,以替换默认的 MessagingMessageConverter

默认值:null

idleEventInterval

指示最近未收到消息的事件之间的毫秒间隔。使用 ApplicationListener<ListenerContainerIdleEvent> 接收这些事件。有关用法示例,请参阅 暂停-恢复

默认值:30000

destinationIsPattern

为 true 时,将目标视为正则表达式 Pattern,由代理用于匹配主题名称。为 true 时,不会预置主题,并且不允许 enableDlq,因为 binder 在预置阶段不知道主题名称。请注意,检测与模式匹配的新主题所花费的时间由消费者属性 metadata.max.age.ms 控制,该属性(在撰写本文时)默认为 300,000ms(5 分钟)。可以使用上面的 configuration 属性对其进行配置。

默认值:false

topic.properties

在配置新主题时使用的 Kafka 主题属性的 Map,例如,spring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0

默认值:无。

topic.replicas-assignment

副本分配的 Map<Integer, List<Integer>>,其中键是分区,值是分配。在配置新主题时使用。请参阅 kafka-clients jar 中的 NewTopic Javadoc。

默认值:无。

topic.replication-factor

在配置主题时使用的复制因子。覆盖绑定范围的设置。如果存在 replicas-assignments,则忽略。

默认值:无(使用绑定范围的默认值 -1)。

pollTimeout

在可轮询消费者中轮询时使用的超时时间。

默认值:5 秒。

transactionManager

用于覆盖此绑定的绑定程序事务管理器的 KafkaAwareTransactionManager 的 Bean 名称。通常,如果你想使用 ChainedKafkaTransactionManaager 同步另一个事务和 Kafka 事务,则需要这样做。为了实现记录的精确一次消费和生产,消费者和生产者绑定都必须使用相同的事务管理器进行配置。

默认值:无。

txCommitRecovered

默认情况下,在使用事务绑定程序时,已恢复记录的偏移量(例如,当重试用尽且记录发送到死信主题时)将通过新事务提交。将此属性设置为 false 会抑制提交已恢复记录的偏移量。

默认值:true。

commonErrorHandlerBeanName

每个消费者绑定要使用的 CommonErrorHandler Bean 名称。如果存在,此用户提供的 CommonErrorHandler 优先于绑定程序定义的任何其他错误处理程序。如果应用程序不想使用 ListenerContainerCustomizer,然后检查目标/组组合来设置错误处理程序,则这是一种表达错误处理程序的便捷方法。

默认值:无。

Kafka 生产者属性

以下属性仅适用于 Kafka 生产者,并且必须以 spring.cloud.stream.kafka.bindings.<channelName>.producer. 为前缀。

为了避免重复,Spring Cloud Stream 支持使用 spring.cloud.stream.kafka.default.producer.<property>=<value> 格式为所有通道设置值。
admin.configuration

自 2.1.1 版本起,此属性已弃用,推荐使用 topic.properties,并且在未来版本中将不再支持此属性。

admin.replicas-assignment

自 2.1.1 版本起,此属性已弃用,推荐使用 topic.replicas-assignment,并且在未来版本中将不再支持此属性。

admin.replication-factor

自 2.1.1 版本起,此属性已弃用,推荐使用 topic.replication-factor,并且在未来版本中将不再支持此属性。

bufferSize

Kafka 生产者在发送之前尝试批量处理多少数据的上限(以字节为单位)。

默认值:16384

sync

生产者是否同步。

默认值:false

sendTimeoutExpression

针对传出消息评估的 SpEL 表达式,用于评估在启用同步发布时等待确认的时间,例如,headers['mySendTimeout']。超时值以毫秒为单位。在 3.0 之前的版本中,除非使用本机编码,否则无法使用有效负载,因为在评估此表达式时,有效负载已经采用 byte[] 的形式。现在,在转换有效负载之前评估表达式。

默认值:none

batchTimeout

生产者等待更长的消息在发送消息之前累积在同一批次中的时间。(通常,生产者根本不等待,只是发送在前一次发送进行中时累积的所有消息。)非零值可能会以牺牲延迟为代价来提高吞吐量。

默认值:0

messageKeyExpression

针对用于填充产生的 Kafka 消息的键的传出消息评估的 SpEL 表达式,例如,headers['myKey']。对于 3.0 之前的版本,除非使用本机编码,否则无法使用有效负载,因为在评估此表达式时,有效负载已经采用 byte[] 的形式。现在,在转换有效负载之前评估表达式。对于常规处理器(Function<String, String>Function<Message<?>, Message<?>),如果产生的键需要与来自主题的传入键相同,则可以将此属性设置为如下。spring.cloud.stream.kafka.bindings.<output-binding-name>.producer.messageKeyExpression: headers['kafka_receivedMessageKey'] 对于响应式函数,需要注意一个重要的警告。在这种情况下,应用程序需要手动将标头从传入消息复制到传出消息。您可以设置标头,例如 myKey 并按照上面建议的那样使用 headers['myKey'],或者为了方便,只需设置 KafkaHeaders.MESSAGE_KEY 标头,而根本不需要设置此属性。

默认值:none

headerPatterns

要映射到 ProducerRecord 中的 Kafka Headers 的 Spring 消息标头的简单模式的逗号分隔列表。模式可以以通配符(星号)开头或结尾。模式可以通过前缀 ! 来否定。匹配在第一次匹配(正或负)后停止。例如 !ask,as* 将通过 ash 但不会通过 askidtimestamp 永远不会被映射。

默认值:*(所有标头 - 除了 idtimestamp

配置

包含通用 Kafka 生产者属性的键/值对映射。bootstrap.servers 属性不能在此处设置;如果您需要连接到多个集群,请使用多绑定程序支持。

默认值:空映射。

topic.properties

用于在配置新主题时使用的 Kafka 主题属性的 Map — 例如,spring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0

topic.replicas-assignment

副本分配的 Map<Integer, List<Integer>>,其中键是分区,值是分配。在配置新主题时使用。请参阅 kafka-clients jar 中的 NewTopic Javadoc。

默认值:无。

topic.replication-factor

在配置主题时使用的复制因子。覆盖绑定范围的设置。如果存在 replicas-assignments,则忽略。

默认值:无(使用绑定范围的默认值 -1)。

useTopicHeader

设置为 true 以使用出站消息中的 KafkaHeaders.TOPIC 消息头的值覆盖默认绑定目标(主题名称)。如果标头不存在,则使用默认绑定目标。

默认值:false

recordMetadataChannel

应向其发送成功发送结果的 MessageChannel 的 bean 名称;该 bean 必须存在于应用程序上下文中。发送到该通道的消息是已发送的消息(如果进行了转换),并带有附加标头 KafkaHeaders.RECORD_METADATA。该标头包含 Kafka 客户端提供的 RecordMetadata 对象;它包括记录在主题中写入的分区和偏移量。

ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)

发送失败会进入生产者错误通道(如果已配置);请参阅 Kafka 错误通道

默认值:null。

Kafka 绑定程序使用生产者的 partitionCount 设置作为提示,以创建具有给定分区计数的主题(结合 minPartitionCount,使用这两个值中的较大值)。为绑定程序配置 minPartitionCount 和应用程序的 partitionCount 时要小心,因为使用较大的值。如果已存在分区计数较小的主题并且禁用了 autoAddPartitions(默认值),则绑定程序将无法启动。如果已存在分区计数较小的主题并且启用了 autoAddPartitions,则会添加新分区。如果已存在分区计数大于(minPartitionCountpartitionCount)的最大值,则使用现有分区计数。
compression

设置 compression.type 生产者属性。支持的值为 nonegzipsnappylz4zstd。如果您已将 kafka-clients jar 覆盖到 2.1.0(或更高版本),如 Spring for Apache Kafka 文档 中所述,并且希望使用 zstd 压缩,请使用 spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd

默认值:none

transactionManager

用于覆盖此绑定的绑定程序事务管理器的 KafkaAwareTransactionManager 的 Bean 名称。通常,如果你想使用 ChainedKafkaTransactionManaager 同步另一个事务和 Kafka 事务,则需要这样做。为了实现记录的精确一次消费和生产,消费者和生产者绑定都必须使用相同的事务管理器进行配置。

默认值:无。

closeTimeout

关闭生产者时等待的秒数超时。

默认值:30

allowNonTransactional

通常,如果尚未进行处理,与事务性绑定程序关联的所有输出绑定都将在新事务中发布。此属性允许您覆盖该行为。如果设置为 true,则发布到此输出绑定的记录将不会在事务中运行,除非已经进行处理。

默认值:false