消息监听器容器
提供了两个 MessageListenerContainer
实现
-
KafkaMessageListenerContainer
-
ConcurrentMessageListenerContainer
KafkaMessageListenerContainer
在单个线程上接收来自所有主题或分区的所有消息。ConcurrentMessageListenerContainer
将委托给一个或多个 KafkaMessageListenerContainer
实例以提供多线程消费。
从 2.2.7 版本开始,您可以向监听器容器添加 RecordInterceptor
;它将在调用监听器之前被调用,允许检查或修改记录。如果拦截器返回 null,则不会调用监听器。从 2.7 版本开始,它具有在监听器退出(正常或通过抛出异常)后调用的其他方法。此外,从 2.7 版本开始,现在有一个 BatchInterceptor
,为 批量监听器 提供类似的功能。此外,ConsumerAwareRecordInterceptor
(和 BatchInterceptor
)提供对 Consumer<?, ?>
的访问。例如,这可以用于在拦截器中访问消费者指标。
您不应该在这些拦截器中执行任何影响消费者位置和/或已提交偏移量的方法;容器需要管理此类信息。 |
如果拦截器通过(创建新的)修改记录,则 topic 、partition 和 offset 必须保持不变,以避免意外的副作用,例如记录丢失。 |
CompositeRecordInterceptor
和 CompositeBatchInterceptor
可用于调用多个拦截器。
默认情况下,从 2.8 版本开始,在使用事务时,拦截器将在事务开始之前调用。您可以将监听器容器的 interceptBeforeTx
属性设置为 false
,以便改为在事务开始后调用拦截器。从 2.9 版本开始,这将适用于任何事务管理器,而不仅仅是 KafkaAwareTransactionManager
。例如,这允许拦截器参与容器启动的 JDBC 事务。
从 2.3.8、2.4.6 版本开始,当并发数大于 1 时,ConcurrentMessageListenerContainer
现在支持 静态成员资格。group.instance.id
后缀为 -n
,其中 n
从 1
开始。这与增加的 session.timeout.ms
一起,可用于减少重新平衡事件,例如,当应用程序实例重新启动时。
使用 KafkaMessageListenerContainer
以下构造函数可用
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
它接收一个 ConsumerFactory
和有关主题和分区的信息,以及 ContainerProperties
对象中的其他配置。ContainerProperties
具有以下构造函数
public ContainerProperties(TopicPartitionOffset... topicPartitions)
public ContainerProperties(String... topics)
public ContainerProperties(Pattern topicPattern)
第一个构造函数采用 TopicPartitionOffset
参数数组,以明确指示容器使用哪些分区(使用消费者 assign()
方法)以及可选的初始偏移量。默认情况下,正值是绝对偏移量。默认情况下,负值相对于分区内当前的最后一个偏移量。提供了采用附加 boolean
参数的 TopicPartitionOffset
的构造函数。如果为 true
,则初始偏移量(正或负)相对于此消费者的当前位置。偏移量在容器启动时应用。第二个采用主题数组,Kafka 根据 group.id
属性分配分区——在组之间分配分区。第三个使用正则表达式 Pattern
来选择主题。
要为容器分配一个MessageListener
,可以在创建容器时使用ContainerProps.setMessageListener
方法。以下示例演示了如何执行此操作。
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
请注意,在创建DefaultKafkaConsumerFactory
时,使用仅接收上述属性的构造函数意味着键和值Deserializer
类是从配置中获取的。或者,可以将Deserializer
实例传递给DefaultKafkaConsumerFactory
构造函数的键和/或值,在这种情况下,所有消费者共享相同的实例。另一种选择是提供Supplier<Deserializer>
(从版本2.3开始),这些Supplier
将用于为每个Consumer
获取单独的Deserializer
实例。
DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
有关可以设置的各种属性的更多信息,请参阅ContainerProperties
的Javadoc。
从版本2.1.1开始,提供了一个名为logContainerConfig
的新属性。当设置为true
且启用了INFO
日志记录时,每个监听器容器都会写入一条日志消息,总结其配置属性。
默认情况下,主题偏移量提交的日志记录是在DEBUG
日志级别执行的。从版本2.1.2开始,ContainerProperties
中的一个属性commitLogLevel
允许您指定这些消息的日志级别。例如,要将日志级别更改为INFO
,可以使用containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);
。
从版本2.2开始,添加了一个名为missingTopicsFatal
的新容器属性(从2.3.4开始默认为false
)。如果配置的任何主题在代理上不存在,则会阻止容器启动。如果容器配置为侦听主题模式(正则表达式),则此属性不适用。以前,容器线程在consumer.poll()
方法中循环等待主题出现,同时记录许多消息。除了日志之外,没有迹象表明存在问题。
从版本2.8开始,引入了新的容器属性authExceptionRetryInterval
。这会导致容器在从KafkaConsumer
获取任何AuthenticationException
或AuthorizationException
后重试获取消息。例如,当配置的用户被拒绝访问读取某个主题或凭据不正确时,可能会发生这种情况。定义authExceptionRetryInterval
允许容器在授予适当权限时恢复。
默认情况下,未配置任何间隔 - 身份验证和授权错误被视为致命错误,这会导致容器停止。 |
从版本2.8开始,在创建消费者工厂时,如果以对象的形式提供反序列化器(在构造函数中或通过setter),则工厂将调用configure()
方法使用配置属性对其进行配置。
使用ConcurrentMessageListenerContainer
单个构造函数类似于KafkaListenerContainer
构造函数。以下列表显示了构造函数的签名。
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
它还有一个concurrency
属性。例如,container.setConcurrency(3)
创建三个KafkaMessageListenerContainer
实例。
对于第一个构造函数,Kafka使用其组管理功能在消费者之间分配分区。
在侦听多个主题时,默认分区分配可能不是您期望的。例如,如果您有三个主题,每个主题有五个分区,并且您想要使用 在使用Spring Boot时,您可以按如下方式分配策略。
|
当容器属性配置有TopicPartitionOffset
时,ConcurrentMessageListenerContainer
会在委托的KafkaMessageListenerContainer
实例之间分配TopicPartitionOffset
实例。
例如,如果提供了六个TopicPartitionOffset
实例,并且concurrency
为3
,则每个容器将获得两个分区。对于五个TopicPartitionOffset
实例,两个容器将获得两个分区,而第三个容器将获得一个分区。如果concurrency
大于TopicPartitions
的数量,则会向下调整concurrency
,以便每个容器获得一个分区。
client.id 属性(如果设置)将附加-n ,其中n 是对应于并发性的消费者实例。当启用JMX时,这需要为MBean提供唯一名称。 |
从版本1.3开始,MessageListenerContainer
提供了对底层KafkaConsumer
指标的访问。在ConcurrentMessageListenerContainer
的情况下,metrics()
方法返回所有目标KafkaMessageListenerContainer
实例的指标。指标按底层KafkaConsumer
提供的client-id
分组到Map<MetricName, ? extends Metric>
中。
从版本2.3开始,ContainerProperties
提供了一个idleBetweenPolls
选项,允许监听器容器中的主循环在KafkaConsumer.poll()
调用之间休眠。实际的休眠间隔被选择为提供的选项和max.poll.interval.ms
消费者配置与当前记录批处理时间之间的差值的最小值。
提交偏移量
提供了几个用于提交偏移量的选项。如果enable.auto.commit
消费者属性为true
,则Kafka会根据其配置自动提交偏移量。如果为false
,则容器支持几种AckMode
设置(在下一个列表中描述)。默认的AckMode
为BATCH
。从版本2.3开始,框架将enable.auto.commit
设置为false
,除非在配置中显式设置。以前,如果未设置该属性,则使用Kafka默认值(true
)。
消费者poll()
方法返回一个或多个ConsumerRecords
。MessageListener
将为每个记录调用。以下列表描述了容器针对每个AckMode
采取的操作(当未使用事务时)。
-
RECORD
:在监听器处理完记录后返回时提交偏移量。 -
BATCH
:在处理完poll()
返回的所有记录后提交偏移量。 -
TIME
:在处理完poll()
返回的所有记录后提交偏移量,只要自上次提交以来的ackTime
已超过。 -
COUNT
:在处理完poll()
返回的所有记录后提交偏移量,只要自上次提交以来已收到ackCount
条记录。 -
COUNT_TIME
:类似于TIME
和COUNT
,但如果任一条件为true
,则执行提交。 -
MANUAL
:消息监听器负责acknowledge()
Acknowledgment
。之后,将应用与BATCH
相同的语义。 -
MANUAL_IMMEDIATE
:当监听器调用Acknowledgment.acknowledge()
方法时立即提交偏移量。
在使用事务时,偏移量将发送到事务,并且语义等效于RECORD
或BATCH
,具体取决于监听器类型(记录或批处理)。
MANUAL 和MANUAL_IMMEDIATE 要求监听器为AcknowledgingMessageListener 或BatchAcknowledgingMessageListener 。请参阅消息监听器。 |
根据syncCommits
容器属性,将使用消费者上的commitSync()
或commitAsync()
方法。syncCommits
默认为true
;另请参阅setSyncCommitTimeout
。请参阅setCommitCallback
以获取异步提交的结果;默认回调是LoggingCommitCallback
,它记录错误(以及调试级别的成功)。
由于监听器容器有自己的提交偏移量机制,因此它更喜欢Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
为false
。从版本2.3开始,它无条件地将其设置为false,除非在消费者工厂中专门设置或容器的消费者属性覆盖。
Acknowledgment
具有以下方法。
public interface Acknowledgment {
void acknowledge();
}
此方法使监听器可以控制何时提交偏移量。
从版本2.3开始,Acknowledgment
接口有两个额外的方法nack(long sleep)
和nack(int index, long sleep)
。第一个用于记录监听器,第二个用于批处理监听器。为您的监听器类型调用错误的方法将抛出IllegalStateException
。
如果要提交部分批处理,请使用nack() ,在使用事务时,将AckMode 设置为MANUAL ;调用nack() 会将已成功处理的记录的偏移量发送到事务。 |
只能在调用监听器的消费者线程上调用nack() 。 |
在使用乱序提交时,不允许使用nack() 。 |
对于记录监听器,当调用nack()
时,将提交任何挂起的偏移量,丢弃上次轮询的其余记录,并在其分区上执行查找,以便在下次poll()
中重新传递失败的记录和未处理的记录。通过设置sleep
参数,可以在重新传递之前暂停消费者。这类似于在容器配置有DefaultErrorHandler
时抛出异常的功能。
nack() 会暂停整个监听器指定的休眠持续时间,包括所有分配的分区。 |
在使用批处理监听器时,您可以指定批处理中发生故障的索引。当调用nack()
时,将为索引之前的记录提交偏移量,并在分区的失败和丢弃的记录上执行查找,以便它们将在下次poll()
中重新传递。
有关更多信息,请参阅容器错误处理程序。
消费者在休眠期间暂停,以便我们继续轮询代理以保持消费者处于活动状态。实际的休眠时间及其分辨率取决于容器的pollTimeout ,默认为5秒。最短休眠时间等于pollTimeout ,所有休眠时间都将是它的倍数。对于较短的休眠时间或为了提高其准确性,请考虑减少容器的pollTimeout 。 |
从版本3.0.10开始,批处理监听器可以使用Acknowledgment
参数上的acknowledge(index)
提交批处理部分的偏移量。当调用此方法时,将提交索引处的记录(以及所有之前的记录)的偏移量。在执行部分批处理提交后调用acknowledge()
将提交批处理其余部分的偏移量。以下限制适用。
-
需要
AckMode.MANUAL_IMMEDIATE
。 -
必须在监听器线程上调用此方法。
-
监听器必须使用
List
而不是原始的ConsumerRecords
。 -
索引必须在列表元素的范围内。
-
索引必须大于先前调用中使用的索引。
这些限制是强制执行的,并且该方法将抛出IllegalArgumentException
或IllegalStateException
,具体取决于违规情况。