消息监听器容器

提供了两个 MessageListenerContainer 实现

  • KafkaMessageListenerContainer

  • ConcurrentMessageListenerContainer

KafkaMessageListenerContainer 在单个线程上接收来自所有主题或分区的所有消息。ConcurrentMessageListenerContainer 将委托给一个或多个 KafkaMessageListenerContainer 实例以提供多线程消费。

从 2.2.7 版本开始,您可以向监听器容器添加 RecordInterceptor;它将在调用监听器之前被调用,允许检查或修改记录。如果拦截器返回 null,则不会调用监听器。从 2.7 版本开始,它具有在监听器退出(正常或通过抛出异常)后调用的其他方法。此外,从 2.7 版本开始,现在有一个 BatchInterceptor,为 批量监听器 提供类似的功能。此外,ConsumerAwareRecordInterceptor(和 BatchInterceptor)提供对 Consumer<?, ?> 的访问。例如,这可以用于在拦截器中访问消费者指标。

您不应该在这些拦截器中执行任何影响消费者位置和/或已提交偏移量的方法;容器需要管理此类信息。
如果拦截器通过(创建新的)修改记录,则 topicpartitionoffset 必须保持不变,以避免意外的副作用,例如记录丢失。

CompositeRecordInterceptorCompositeBatchInterceptor 可用于调用多个拦截器。

默认情况下,从 2.8 版本开始,在使用事务时,拦截器将在事务开始之前调用。您可以将监听器容器的 interceptBeforeTx 属性设置为 false,以便改为在事务开始后调用拦截器。从 2.9 版本开始,这将适用于任何事务管理器,而不仅仅是 KafkaAwareTransactionManager。例如,这允许拦截器参与容器启动的 JDBC 事务。

从 2.3.8、2.4.6 版本开始,当并发数大于 1 时,ConcurrentMessageListenerContainer 现在支持 静态成员资格group.instance.id 后缀为 -n,其中 n1 开始。这与增加的 session.timeout.ms 一起,可用于减少重新平衡事件,例如,当应用程序实例重新启动时。

使用 KafkaMessageListenerContainer

以下构造函数可用

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties)

它接收一个 ConsumerFactory 和有关主题和分区的信息,以及 ContainerProperties 对象中的其他配置。ContainerProperties 具有以下构造函数

public ContainerProperties(TopicPartitionOffset... topicPartitions)

public ContainerProperties(String... topics)

public ContainerProperties(Pattern topicPattern)

第一个构造函数采用 TopicPartitionOffset 参数数组,以明确指示容器使用哪些分区(使用消费者 assign() 方法)以及可选的初始偏移量。默认情况下,正值是绝对偏移量。默认情况下,负值相对于分区内当前的最后一个偏移量。提供了采用附加 boolean 参数的 TopicPartitionOffset 的构造函数。如果为 true,则初始偏移量(正或负)相对于此消费者的当前位置。偏移量在容器启动时应用。第二个采用主题数组,Kafka 根据 group.id 属性分配分区——在组之间分配分区。第三个使用正则表达式 Pattern 来选择主题。

要为容器分配一个MessageListener,可以在创建容器时使用ContainerProps.setMessageListener方法。以下示例演示了如何执行此操作。

ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
    ...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

请注意,在创建DefaultKafkaConsumerFactory时,使用仅接收上述属性的构造函数意味着键和值Deserializer类是从配置中获取的。或者,可以将Deserializer实例传递给DefaultKafkaConsumerFactory构造函数的键和/或值,在这种情况下,所有消费者共享相同的实例。另一种选择是提供Supplier<Deserializer>(从版本2.3开始),这些Supplier将用于为每个Consumer获取单独的Deserializer实例。

DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

有关可以设置的各种属性的更多信息,请参阅ContainerPropertiesJavadoc

从版本2.1.1开始,提供了一个名为logContainerConfig的新属性。当设置为true且启用了INFO日志记录时,每个监听器容器都会写入一条日志消息,总结其配置属性。

默认情况下,主题偏移量提交的日志记录是在DEBUG日志级别执行的。从版本2.1.2开始,ContainerProperties中的一个属性commitLogLevel允许您指定这些消息的日志级别。例如,要将日志级别更改为INFO,可以使用containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);

从版本2.2开始,添加了一个名为missingTopicsFatal的新容器属性(从2.3.4开始默认为false)。如果配置的任何主题在代理上不存在,则会阻止容器启动。如果容器配置为侦听主题模式(正则表达式),则此属性不适用。以前,容器线程在consumer.poll()方法中循环等待主题出现,同时记录许多消息。除了日志之外,没有迹象表明存在问题。

从版本2.8开始,引入了新的容器属性authExceptionRetryInterval。这会导致容器在从KafkaConsumer获取任何AuthenticationExceptionAuthorizationException后重试获取消息。例如,当配置的用户被拒绝访问读取某个主题或凭据不正确时,可能会发生这种情况。定义authExceptionRetryInterval允许容器在授予适当权限时恢复。

默认情况下,未配置任何间隔 - 身份验证和授权错误被视为致命错误,这会导致容器停止。

从版本2.8开始,在创建消费者工厂时,如果以对象的形式提供反序列化器(在构造函数中或通过setter),则工厂将调用configure()方法使用配置属性对其进行配置。

使用ConcurrentMessageListenerContainer

单个构造函数类似于KafkaListenerContainer构造函数。以下列表显示了构造函数的签名。

public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                            ContainerProperties containerProperties)

它还有一个concurrency属性。例如,container.setConcurrency(3)创建三个KafkaMessageListenerContainer实例。

对于第一个构造函数,Kafka使用其组管理功能在消费者之间分配分区。

在侦听多个主题时,默认分区分配可能不是您期望的。例如,如果您有三个主题,每个主题有五个分区,并且您想要使用concurrency=15,您将只看到五个活动的消费者,每个消费者分配每个主题的一个分区,而其他10个消费者处于空闲状态。这是因为默认的Kafka PartitionAssignorRangeAssignor(请参阅其Javadoc)。对于这种情况,您可能需要考虑使用RoundRobinAssignor,它在所有消费者之间分配分区。然后,每个消费者分配一个主题或分区。要更改PartitionAssignor,可以在提供给DefaultKafkaConsumerFactory的属性中设置partition.assignment.strategy消费者属性(ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)。

在使用Spring Boot时,您可以按如下方式分配策略。

spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor

当容器属性配置有TopicPartitionOffset时,ConcurrentMessageListenerContainer会在委托的KafkaMessageListenerContainer实例之间分配TopicPartitionOffset实例。

例如,如果提供了六个TopicPartitionOffset实例,并且concurrency3,则每个容器将获得两个分区。对于五个TopicPartitionOffset实例,两个容器将获得两个分区,而第三个容器将获得一个分区。如果concurrency大于TopicPartitions的数量,则会向下调整concurrency,以便每个容器获得一个分区。

client.id属性(如果设置)将附加-n,其中n是对应于并发性的消费者实例。当启用JMX时,这需要为MBean提供唯一名称。

从版本1.3开始,MessageListenerContainer提供了对底层KafkaConsumer指标的访问。在ConcurrentMessageListenerContainer的情况下,metrics()方法返回所有目标KafkaMessageListenerContainer实例的指标。指标按底层KafkaConsumer提供的client-id分组到Map<MetricName, ? extends Metric>中。

从版本2.3开始,ContainerProperties提供了一个idleBetweenPolls选项,允许监听器容器中的主循环在KafkaConsumer.poll()调用之间休眠。实际的休眠间隔被选择为提供的选项和max.poll.interval.ms消费者配置与当前记录批处理时间之间的差值的最小值。

提交偏移量

提供了几个用于提交偏移量的选项。如果enable.auto.commit消费者属性为true,则Kafka会根据其配置自动提交偏移量。如果为false,则容器支持几种AckMode设置(在下一个列表中描述)。默认的AckModeBATCH。从版本2.3开始,框架将enable.auto.commit设置为false,除非在配置中显式设置。以前,如果未设置该属性,则使用Kafka默认值(true)。

消费者poll()方法返回一个或多个ConsumerRecordsMessageListener将为每个记录调用。以下列表描述了容器针对每个AckMode采取的操作(当未使用事务时)。

  • RECORD:在监听器处理完记录后返回时提交偏移量。

  • BATCH:在处理完poll()返回的所有记录后提交偏移量。

  • TIME:在处理完poll()返回的所有记录后提交偏移量,只要自上次提交以来的ackTime已超过。

  • COUNT:在处理完poll()返回的所有记录后提交偏移量,只要自上次提交以来已收到ackCount条记录。

  • COUNT_TIME:类似于TIMECOUNT,但如果任一条件为true,则执行提交。

  • MANUAL:消息监听器负责acknowledge()Acknowledgment。之后,将应用与BATCH相同的语义。

  • MANUAL_IMMEDIATE:当监听器调用Acknowledgment.acknowledge()方法时立即提交偏移量。

在使用事务时,偏移量将发送到事务,并且语义等效于RECORDBATCH,具体取决于监听器类型(记录或批处理)。

MANUALMANUAL_IMMEDIATE要求监听器为AcknowledgingMessageListenerBatchAcknowledgingMessageListener。请参阅消息监听器

根据syncCommits容器属性,将使用消费者上的commitSync()commitAsync()方法。syncCommits默认为true;另请参阅setSyncCommitTimeout。请参阅setCommitCallback以获取异步提交的结果;默认回调是LoggingCommitCallback,它记录错误(以及调试级别的成功)。

由于监听器容器有自己的提交偏移量机制,因此它更喜欢Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIGfalse。从版本2.3开始,它无条件地将其设置为false,除非在消费者工厂中专门设置或容器的消费者属性覆盖。

Acknowledgment具有以下方法。

public interface Acknowledgment {

    void acknowledge();

}

此方法使监听器可以控制何时提交偏移量。

从版本2.3开始,Acknowledgment接口有两个额外的​​方法nack(long sleep)nack(int index, long sleep)。第一个用于记录监听器,第二个用于批处理监听器。为您的监听器类型调用错误的方法将抛出IllegalStateException

如果要提交部分批处理,请使用nack(),在使用事务时,将AckMode设置为MANUAL;调用nack()会将已成功处理的记录的偏移量发送到事务。
只能在调用监听器的消费者线程上调用nack()
在使用乱序提交时,不允许使用nack()

对于记录监听器,当调用nack()时,将提交任何挂起的偏移量,丢弃上次轮询的其余记录,并在其分区上执行查找,以便在下次poll()中重新传递失败的记录和未处理的记录。通过设置sleep参数,可以在重新传递之前暂停消费者。这类似于在容器配置有DefaultErrorHandler时抛出异常的功能。

nack()会暂停整个监听器指定的休眠持续时间,包括所有分配的分区。

在使用批处理监听器时,您可以指定批处理中发生故障的索引。当调用nack()时,将为索引之前的记录提交偏移量,并在分区的失败和丢弃的记录上执行查找,以便它们将在下次poll()中重新传递。

有关更多信息,请参阅容器错误处理程序

消费者在休眠期间暂停,以便我们继续轮询代理以保持消费者处于活动状态。实际的休眠时间及其分辨率取决于容器的pollTimeout,默认为5秒。最短休眠时间等于pollTimeout,所有休眠时间都将是它的倍数。对于较短的休眠时间或为了提高其准确性,请考虑减少容器的pollTimeout

从版本3.0.10开始,批处理监听器可以使用Acknowledgment参数上的acknowledge(index)提交批处理部分的偏移量。当调用此方法时,将提交索引处的记录(以及所有之前的记录)的偏移量。在执行部分批处理提交后调用acknowledge()将提交批处理其余部分的偏移量。以下限制适用。

  • 需要AckMode.MANUAL_IMMEDIATE

  • 必须在监听器线程上调用此方法。

  • 监听器必须使用List而不是原始的ConsumerRecords

  • 索引必须在列表元素的范围内。

  • 索引必须大于先前调用中使用的索引。

这些限制是强制执行的,并且该方法将抛出IllegalArgumentExceptionIllegalStateException,具体取决于违规情况。

监听器容器自动启动

监听器容器实现了SmartLifecycle,并且autoStartup默认为true。容器在后期阶段(Integer.MAX-VALUE - 100)启动。实现SmartLifecycle的其他组件(用于处理来自监听器的数据)应该在较早的阶段启动。- 100为后期阶段留出空间,以便在容器之后启用组件自动启动。