配置选项
本节包含 Kafka Streams binder 使用的配置选项。
有关 Binder 的通用配置选项和属性,请参阅核心文档。
Kafka Streams Binder 属性
以下属性可在 Binder 级别使用,并且必须以 spring.cloud.stream.kafka.streams.binder. 为前缀。在 Kafka Streams binder 中重复使用的任何 Kafka binder 提供的属性必须以 spring.cloud.stream.kafka.streams.binder 为前缀,而不是 spring.cloud.stream.kafka.binder。此规则的唯一例外是定义 Kafka 引导服务器属性时,此时任何一个前缀都可以使用。
- configuration
-
包含与 Apache Kafka Streams API 相关属性的键/值对 Map。此属性必须以
spring.cloud.stream.kafka.streams.binder.为前缀。以下是一些使用此属性的示例。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
有关可能包含在 Streams 配置中的所有属性的更多信息,请参阅 Apache Kafka Streams 文档中的 StreamsConfig JavaDocs。您可以从 StreamsConfig 设置的所有配置都可以通过此属性进行设置。使用此属性时,它适用于整个应用程序,因为它是一个 Binder 级别的属性。如果应用程序中有多个处理器,所有这些处理器都将获取这些属性。对于像 application.id 这样的属性,这将成为问题,因此您必须仔细检查如何使用此 Binder 级别的 configuration 属性来映射来自 StreamsConfig 的属性。
- functions.<function-bean-name>.applicationId
-
仅适用于函数式处理器。此属性可用于在应用程序中为每个函数设置 application ID。在多个函数的情况下,这是设置 application ID 的便捷方式。
- functions.<function-bean-name>.configuration
-
仅适用于函数式处理器。包含与 Apache Kafka Streams API 相关属性的键/值对 Map。这类似于上面描述的 Binder 级别的
configuration属性,但此级别的configuration属性仅针对指定函数进行限制。当您有多个处理器并且希望根据特定函数限制对配置的访问时,您可能希望使用此属性。所有StreamsConfig属性都可以在这里使用。 - brokers
-
Broker URL
默认值:
localhost - zkNodes
-
Zookeeper URL
默认值:
localhost - deserializationExceptionHandler
-
反序列化错误处理器类型。此处理器应用于 Binder 级别,因此应用于应用程序中的所有输入绑定。可以在消费者绑定级别以更精细的方式对其进行控制。可能的值包括 -
logAndContinue,logAndFail,skipAndContinue或sendToDlq默认值:
logAndFail - applicationId
-
一种方便的方式,可在 Binder 级别全局设置 Kafka Streams 应用的 application.id。如果应用程序包含多个函数,则 application id 应设置不同。详细信息请参见上文关于设置 application id 的讨论。
默认值: 应用程序将生成一个静态 application ID。有关更多详细信息,请参阅 application ID 部分。
- stateStoreRetry.maxAttempts
-
尝试连接状态存储的最大次数。
默认值: 1
- stateStoreRetry.backoffPeriod
-
重试时尝试连接状态存储的回退周期。
默认值: 1000 ms
- consumerProperties
-
Binder 级别的任意消费者属性。
- producerProperties
-
Binder 级别的任意生产者属性。
- includeStoppedProcessorsForHealthCheck
-
当通过 actuator 停止处理器绑定时,默认情况下该处理器将不参与健康检查。将此属性设置为
true以启用所有处理器的健康检查,包括当前通过 bindings actuator endpoint 停止的处理器。默认值: false
Kafka Streams 生产者属性
以下属性*仅*适用于 Kafka Streams 生产者,且必须以 spring.cloud.stream.kafka.streams.bindings.<binding name>.producer. 为前缀。为方便起见,如果存在多个输出绑定且它们都需要一个公共值,则可以使用前缀 spring.cloud.stream.kafka.streams.default.producer. 进行配置。
- keySerde
-
要使用的 key serde
默认值: 请参阅上面关于消息序列化/反序列化的讨论
- valueSerde
-
要使用的 value serde
默认值: 请参阅上面关于消息序列化/反序列化的讨论
- useNativeEncoding
-
启用/禁用原生编码的标志
默认值:
true。 - streamPartitionerBeanName
-
要在消费者端使用的自定义出站分区器 Bean 名称。应用程序可以将自定义
StreamPartitioner作为 Spring Bean 提供,并将此 Bean 的名称提供给生产者使用,而不是使用默认分区器。默认值: 请参阅上面关于出站分区支持的讨论。
- producedAs
-
处理器生成到的 Sink 组件的自定义名称。
默认值:
none(由 Kafka Streams 生成)
Kafka Streams 消费者属性
以下属性适用于 Kafka Streams 消费者,且必须以 spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer. 为前缀。为方便起见,如果存在多个输入绑定且它们都需要一个公共值,则可以使用前缀 spring.cloud.stream.kafka.streams.default.consumer. 进行配置。
- applicationId
-
为每个输入绑定设置 application.id。
默认值: 请参阅上文。
- keySerde
-
要使用的 key serde
默认值: 请参阅上面关于消息序列化/反序列化的讨论
- valueSerde
-
要使用的 value serde
默认值: 请参阅上面关于消息序列化/反序列化的讨论
- materializedAs
-
使用传入 KTable 类型时要物化(materialize)的状态存储
默认值:
none。 - useNativeDecoding
-
启用/禁用原生解码的标志
默认值:
true。 - dlqName
-
DLQ 主题名称。
默认值: 请参阅上文关于错误处理和 DLQ 的讨论。
- startOffset
-
如果没有已提交的 offset 可供消费,则从此 offset 开始消费。这主要用于消费者首次消费主题时。Kafka Streams 使用
earliest作为默认策略,Binder 也使用相同的默认值。可以使用此属性将其覆盖为latest。默认值:
earliest。
注意: 在消费者上使用 resetOffsets 对 Kafka Streams binder 没有影响。与基于消息通道的 binder 不同,Kafka Streams binder 不会按需寻址到开头或结尾。
- deserializationExceptionHandler
-
反序列化错误处理器类型。此处理器应用于每个消费者绑定,而不是应用于之前描述的 Binder 级别属性。可能的值包括 -
logAndContinue,logAndFail,skipAndContinue或sendToDlq默认值:
logAndFail - timestampExtractorBeanName
-
要在消费者端使用的特定时间戳提取器 Bean 名称。应用程序可以将
TimestampExtractor作为 Spring Bean 提供,并将此 Bean 的名称提供给消费者使用,而不是使用默认提取器。默认值: 请参阅上文关于时间戳提取器的讨论。
- eventTypes
-
此绑定支持的事件类型列表,以逗号分隔。
默认值:
none - eventTypeHeaderKey
-
通过此绑定的每个传入记录上的事件类型头键。
默认值:
event_type - consumedAs
-
处理器消费来源的 Source 组件的自定义名称。
默认值:
none(由 Kafka Streams 生成)
关于并发性的特别说明
在 Kafka Streams 中,您可以使用 num.stream.threads 属性控制处理器可以创建的线程数。您可以使用上述在 Binder、函数、生产者或消费者级别描述的各种 configuration 选项来实现这一点。您也可以使用核心 Spring Cloud Stream 为此目的提供的 concurrency 属性。使用此属性时,需要在消费者上进行设置。当您有多个输入绑定时,将其设置在第一个输入绑定上。例如,设置 spring.cloud.stream.bindings.process-in-0.consumer.concurrency 时,它将被 Binder 转换为 num.stream.threads。如果您有多个处理器,并且一个处理器定义了绑定级别的并发性,而其他处理器没有,那么那些没有绑定级别并发性的处理器将回退到通过 spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads 指定的 Binder 范围属性。如果此 Binder 配置不可用,则应用程序将使用 Kafka Streams 设置的默认值。