监听器容器属性

表 1. ContainerProperties 属性
属性 默认值 描述

ackCount

1

ackModeCOUNTCOUNT_TIME 时,提交待处理偏移量之前的记录数。

adviceChain

null

围绕消息监听器的 Advice 对象链(例如 MethodInterceptor 围绕 advice),按顺序调用。

ackMode

BATCH

控制偏移量提交的频率 - 请参阅 提交偏移量

ackTime

5000

ackModeTIMECOUNT_TIME 时,提交待处理偏移量的时间(以毫秒为单位)。

assignmentCommitOption

LATEST_ONLY _NO_TX

是否在分配时提交初始位置;默认情况下,仅当 ConsumerConfig.AUTO_OFFSET_RESET_CONFIGlatest 时才会提交初始偏移量,即使存在事务管理器,它也不会在事务中运行。有关可用选项的更多信息,请参阅 ContainerProperties.AssignmentCommitOption 的 JavaDocs。

asyncAcks

false

启用乱序提交(请参阅 手动提交偏移量);消费者被暂停,提交被推迟,直到填补空白。

authExceptionRetryInterval

null

当不为空时,表示在 Kafka 客户端抛出 AuthenticationExceptionAuthorizationException 时,轮询之间休眠的 Duration。当为空时,此类异常被视为致命错误,容器将停止。

batchRecoverAfterRollback

false

设置为 true 以启用批处理恢复,请参阅 回滚后处理器

clientId

(空字符串)

client.id 消费者属性的前缀。覆盖消费者工厂 client.id 属性;在并发容器中,-n 作为每个消费者实例的后缀添加。

checkDeserExWhenKeyNull

false

设置为 true 以在收到 null key 时始终检查 DeserializationException 标头。当消费者代码无法确定是否已配置 ErrorHandlingDeserializer 时很有用,例如使用委托反序列化器时。

checkDeserExWhenValueNull

false

设置为true,以便在接收到null value 时始终检查DeserializationException 标头。当消费者代码无法确定是否已配置ErrorHandlingDeserializer 时很有用,例如在使用委托反序列化器时。

commitCallback

null

如果存在且syncCommitsfalse,则在提交完成后调用回调。

commitLogLevel

DEBUG

与提交偏移量相关的日志的日志级别。

consumerRebalanceListener

null

重新平衡监听器;请参阅重新平衡监听器

commitRetries

3

设置使用syncCommits 设置为 true 时RetriableCommitFailedException 的重试次数。默认值为 3(总共 4 次尝试)。

consumerStartTimeout

30s

等待消费者启动之前记录错误的时间;例如,如果您使用线程不足的任务执行器,则可能会发生这种情况。

deliveryAttemptHeader

false

请参阅传递尝试标头

eosMode

V2

恰好一次语义模式;请参阅恰好一次语义

fixTxOffsets

false

在使用事务性生产者生产的记录时,如果消费者位于分区的末尾,则由于用于指示事务提交/回滚的伪记录以及可能存在的回滚记录,滞后可能会错误地报告为大于零。这不会在功能上影响消费者,但一些用户表示对“滞后”非零表示担忧。将此属性设置为true,容器将更正此类错误报告的偏移量。检查在下次轮询之前执行,以避免在提交处理中添加重大复杂性。在撰写本文时,仅当消费者配置为isolation.level=read_committedmax.poll.records 大于 1 时,才会更正滞后。有关更多信息,请参阅KAFKA-10683

groupId

null

覆盖消费者group.id 属性;由@KafkaListener idgroupId 属性自动设置。

idleBeforeDataMultiplier

5.0

在接收到任何记录之前应用的 idleEventInterval 乘数。接收到记录后,不再应用乘数。从 2.8 版本开始可用。

idleBetweenPolls

0

用于通过在轮询之间休眠线程来减慢传递速度。处理一批记录的时间加上此值必须小于 max.poll.interval.ms 消费者属性。

idleEventInterval

null

设置后,启用 ListenerContainerIdleEvent 的发布,请参阅 应用程序事件检测空闲和无响应的消费者。另请参阅 idleBeforeDataMultiplier

idlePartitionEventInterval

null

设置后,启用 ListenerContainerIdlePartitionEvent 的发布,请参阅 应用程序事件检测空闲和无响应的消费者

kafkaConsumerProperties

用于覆盖在消费者工厂上配置的任何任意消费者属性。

kafkaAwareTransactionManager

null

请参阅 事务

listenerTaskExecutor

SimpleAsyncTaskExecutor

用于运行消费者线程的任务执行器。默认执行器创建名为 <name>-C-n 的线程;对于 KafkaMessageListenerContainer,名称是 bean 名称;对于 ConcurrentMessageListenerContainer,名称是 bean 名称后缀为 -n,其中 n 对于每个子容器递增。

logContainerConfig

false

设置为 true 以在 INFO 级别记录所有容器属性。

messageListener

null

消息监听器。

micrometerEnabled

true

是否为消费者线程维护 Micrometer 计时器。

micrometerTags

要添加到 micrometer 指标的静态标签映射。

micrometerTagsProvider

null

一个根据消费者记录提供动态标签的函数。

missingTopicsFatal

false

如果配置的主题在代理上不存在,则设置为 true 会阻止容器启动。

monitorInterval

30s

检查消费者线程状态以获取 NonResponsiveConsumerEvent 的频率。请参阅 noPollThresholdpollTimeout

noPollThreshold

3.0

乘以 pollTimeOut 以确定是否发布 NonResponsiveConsumerEvent。请参阅 monitorInterval

observationConvention

null

设置后,根据消费者记录中的信息,在计时器和跟踪中添加动态标签。

observationEnabled

false

设置为 true 以通过 Micrometer 启用观测。

offsetAndMetadataProvider

null

OffsetAndMetadata 的提供者;默认情况下,提供者会创建带有空元数据的偏移量和元数据。提供者提供了一种自定义元数据的方法。

onlyLogRecordMetadata

false

设置为 false 以记录完整的消费者记录(在错误、调试日志等中),而不是仅记录 topic-partition@offset

pauseImmediate

false

当容器暂停时,在处理完当前记录后停止处理,而不是在处理完之前轮询的所有记录后停止;剩余的记录将保留在内存中,并在容器恢复时传递给监听器。

pollTimeout

5000

传递给 Consumer.poll() 的超时时间(以毫秒为单位)。

pollTimeoutWhilePaused

100

当容器处于暂停状态时,传递给 Consumer.poll() 的超时时间(以毫秒为单位)。

restartAfterAuthExceptions

false

如果容器由于授权/身份验证异常而停止,则设置为 True 以重新启动容器。

scheduler

ThreadPoolTaskScheduler

在该调度器上运行消费者监控任务。

shutdownTimeout

10000

stop() 方法阻塞的最大时间(以毫秒为单位),直到所有消费者停止,并在发布容器停止事件之前。

stopContainerWhenFenced

false

如果抛出 ProducerFencedException,则停止监听器容器。有关更多信息,请参见 回滚后处理器

stopImmediate

false

当容器停止时,在处理完当前记录后停止处理,而不是在处理完之前轮询的所有记录后停止。

subBatchPerPartition

参见描述。

当使用批处理监听器时,如果此值为 true,则监听器将使用轮询结果拆分为子批次调用,每个分区一个。默认值为 false

syncCommitTimeout

null

syncCommitstrue 时使用的超时时间。未设置时,容器将尝试确定 default.api.timeout.ms 消费者属性并使用它;否则将使用 60 秒。

syncCommits

true

是否对偏移量使用同步或异步提交;请参阅 commitCallback

topics topicPattern topicPartitions

n/a

配置的主题、主题模式或显式分配的主题/分区。互斥;必须提供至少一个;由 ContainerProperties 构造函数强制执行。

transactionManager

null

自 3.2 版起已弃用,请参阅 [kafkaAwareTransactionManager]其他事务管理器

表 2. AbstractListenerContainer 属性
属性 默认值 描述

afterRollbackProcessor

DefaultAfterRollbackProcessor

在事务回滚后调用的 AfterRollbackProcessor

applicationEventPublisher

应用程序上下文

事件发布者。

batchErrorHandler

参见描述。

已弃用 - 请参阅 commonErrorHandler

batchInterceptor

null

设置 BatchInterceptor 以在调用批处理监听器之前调用;不适用于记录监听器。另请参阅 interceptBeforeTx

beanName

bean 名称

容器的 bean 名称;对于子容器,后缀为 -n

commonErrorHandler

参见描述。

当提供 transactionManager 时为 DefaultErrorHandlernull,当使用 DefaultAfterRollbackProcessor 时。请参阅 容器错误处理程序

containerProperties

ContainerProperties

容器属性实例。

groupId

参见描述。

如果存在,则为 containerProperties.groupId,否则为消费者工厂中的 group.id 属性。

interceptBeforeTx

true

确定 recordInterceptor 是在事务开始之前还是之后调用。

listenerId

参见描述。

用户配置的容器的 Bean 名称或 @KafkaListenerid 属性。

listenerInfo

null

用于填充 KafkaHeaders.LISTENER_INFO 标头的值。使用 @KafkaListener 时,此值从 info 属性获取。此标头可以在各种地方使用,例如 RecordInterceptorRecordFilterStrategy 以及监听器代码本身。

pauseRequested

(只读)

如果已请求消费者暂停,则为真。

recordInterceptor

null

设置 RecordInterceptor,在调用记录监听器之前调用;不适用于批处理监听器。另请参见 interceptBeforeTx

topicCheckTimeout

30s

missingTopicsFatal 容器属性为 true 时,等待 describeTopics 操作完成的秒数。

表 3. KafkaMessageListenerContainer 属性
属性 默认值 描述

assignedPartitions

(只读)

当前分配给此容器的(显式或非显式)分区。

assignedPartitionsByClientId

(只读)

当前分配给此容器的(显式或非显式)分区。

clientIdSuffix

null

并发容器使用它为每个子容器的消费者提供唯一的 client.id

containerPaused

n/a

如果已请求暂停并且消费者实际上已暂停,则为真。

表 4. ConcurrentMessageListenerContainer 属性
属性 默认值 描述

alwaysClientIdSuffix

true

设置为 false 以抑制在 concurrency 仅为 1 时向 client.id 消费者属性添加后缀。

assignedPartitions

(只读)

当前分配给此容器的子 KafkaMessageListenerContainer 的(显式或非显式)分区的聚合。

assignedPartitionsByClientId

(只读)

当前分配给此容器的子 KafkaMessageListenerContainer 的(显式或非显式)分区,以子容器的消费者的 client.id 属性为键。

concurrency

1

要管理的子 KafkaMessageListenerContainer 的数量。

containerPaused

n/a

如果已请求暂停并且所有子容器的使用者实际上已暂停,则为真。

containers

n/a

对所有子 KafkaMessageListenerContainer 的引用。