消息监听器容器

提供了两种 MessageListenerContainer 实现

  • KafkaMessageListenerContainer

  • ConcurrentMessageListenerContainer

KafkaMessageListenerContainer 从所有主题或分区接收所有消息,并在单个线程上进行处理。ConcurrentMessageListenerContainer 将委托给一个或多个 KafkaMessageListenerContainer 实例,以提供多线程消费。

从 2.2.7 版本开始,您可以在监听器容器中添加一个 RecordInterceptor;它将在调用监听器之前被调用,允许您检查或修改记录。如果拦截器返回 null,则不会调用监听器。从 2.7 版本开始,它还具有在监听器退出后(正常退出或抛出异常)被调用的其他方法。此外,从 2.7 版本开始,现在有一个 BatchInterceptor,为 批处理监听器 提供类似的功能。此外,ConsumerAwareRecordInterceptor(和 BatchInterceptor)提供了对 Consumer<?, ?> 的访问。例如,这可以用于在拦截器中访问消费者指标。

您不应该在这些拦截器中执行任何影响消费者位置或已提交偏移量的方法;容器需要管理此类信息。
如果拦截器修改了记录(通过创建一个新的记录),则 topicpartitionoffset 必须保持不变,以避免意外的副作用,例如记录丢失。

CompositeRecordInterceptorCompositeBatchInterceptor 可用于调用多个拦截器。

默认情况下,从 2.8 版本开始,在使用事务时,拦截器将在事务开始之前被调用。您可以将监听器容器的 interceptBeforeTx 属性设置为 false,以便在事务开始后调用拦截器。从 2.9 版本开始,这将适用于任何事务管理器,而不仅仅是 KafkaAwareTransactionManager。这允许拦截器参与容器启动的 JDBC 事务,例如。

从 2.3.8、2.4.6 版本开始,ConcurrentMessageListenerContainer 现在支持 静态成员资格,前提是并发度大于 1。group.instance.id 后缀为 -n,其中 n1 开始。这与增加的 session.timeout.ms 一起使用,可以减少重新平衡事件,例如,当应用程序实例重新启动时。

使用 KafkaMessageListenerContainer

以下构造函数可用

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties)

它接收一个 ConsumerFactory 和有关主题和分区的信息,以及其他配置,这些信息包含在 ContainerProperties 对象中。ContainerProperties 具有以下构造函数

public ContainerProperties(TopicPartitionOffset... topicPartitions)

public ContainerProperties(String... topics)

public ContainerProperties(Pattern topicPattern)

第一个构造函数采用 TopicPartitionOffset 数组作为参数,以明确指示容器使用哪些分区(使用消费者 assign() 方法),并使用可选的初始偏移量。默认情况下,正值是绝对偏移量。默认情况下,负值相对于分区内的当前最后一个偏移量。提供了一个接受额外 boolean 参数的 TopicPartitionOffset 构造函数。如果为 true,则初始偏移量(正或负)相对于此消费者的当前位置。偏移量在容器启动时应用。第二个构造函数采用主题数组,Kafka 根据 group.id 属性分配分区,将分区分布在组中。第三个构造函数使用正则表达式 Pattern 来选择主题。

要为容器分配一个MessageListener,您可以在创建容器时使用ContainerProps.setMessageListener方法。以下示例展示了如何操作。

ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
    ...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

请注意,在创建DefaultKafkaConsumerFactory时,使用仅接受上述属性的构造函数意味着键和值Deserializer类是从配置中获取的。或者,可以将Deserializer实例传递给DefaultKafkaConsumerFactory构造函数以用于键和/或值,在这种情况下,所有消费者共享相同的实例。另一种选择是提供Supplier<Deserializer>(从版本 2.3 开始),这些将用于为每个Consumer获取单独的Deserializer实例。

DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

有关您可以设置的各种属性的更多信息,请参阅ContainerPropertiesJavadoc

从版本 2.1.1 开始,提供了一个名为logContainerConfig的新属性。当设置为true且启用了INFO日志记录时,每个监听器容器都会写入一条日志消息,总结其配置属性。

默认情况下,主题偏移量提交的日志记录在DEBUG日志级别执行。从版本 2.1.2 开始,ContainerProperties中的一个名为commitLogLevel的属性允许您指定这些消息的日志级别。例如,要将日志级别更改为INFO,您可以使用containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);

从版本 2.2 开始,添加了一个名为missingTopicsFatal的新容器属性(默认情况下:从 2.3.4 开始为false)。如果配置的任何主题在代理上不存在,这将阻止容器启动。如果容器配置为监听主题模式(正则表达式),则不适用。以前,容器线程在consumer.poll()方法中循环等待主题出现,同时记录许多消息。除了日志之外,没有迹象表明存在问题。

从 2.8 版本开始,引入了新的容器属性 authExceptionRetryInterval。这会导致容器在从 KafkaConsumer 获取任何 AuthenticationExceptionAuthorizationException 后重试获取消息。例如,当配置的用户被拒绝访问特定主题或凭据不正确时,就会发生这种情况。定义 authExceptionRetryInterval 允许容器在授予适当权限时恢复。

默认情况下,没有配置任何间隔 - 身份验证和授权错误被视为致命错误,这会导致容器停止。

从 2.8 版本开始,在创建消费者工厂时,如果您以对象形式提供反序列化器(在构造函数中或通过 setter),工厂将调用 configure() 方法以使用配置属性对其进行配置。

使用 ConcurrentMessageListenerContainer

单个构造函数类似于 KafkaListenerContainer 构造函数。以下清单显示了构造函数的签名

public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                            ContainerProperties containerProperties)

它还有一个 concurrency 属性。例如,container.setConcurrency(3) 创建三个 KafkaMessageListenerContainer 实例。

对于第一个构造函数,Kafka 使用其组管理功能将分区分配到消费者之间。

在监听多个主题时,默认的分区分配可能与您的预期不符。例如,如果您有三个主题,每个主题有五个分区,并且您想使用 concurrency=15,您将只看到五个活动的消费者,每个消费者分配了来自每个主题的一个分区,而其他 10 个消费者处于空闲状态。这是因为默认的 Kafka PartitionAssignorRangeAssignor(请参阅其 Javadoc)。对于这种情况,您可能希望考虑使用 RoundRobinAssignor,它将分区分配到所有消费者之间。然后,每个消费者分配一个主题或分区。要更改 PartitionAssignor,您可以在提供给 DefaultKafkaConsumerFactory 的属性中设置 partition.assignment.strategy 消费者属性(ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)。

在使用 Spring Boot 时,您可以按如下方式分配策略

spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor

当容器属性配置了TopicPartitionOffset时,ConcurrentMessageListenerContainer会将TopicPartitionOffset实例分配到委托的KafkaMessageListenerContainer实例中。

例如,如果提供了六个TopicPartitionOffset实例,并且concurrency3,则每个容器将获得两个分区。对于五个TopicPartitionOffset实例,两个容器将获得两个分区,而第三个容器将获得一个分区。如果concurrency大于TopicPartitions的数量,则会向下调整concurrency,以便每个容器获得一个分区。

client.id属性(如果设置)将附加-n,其中n是与并发性相对应的消费者实例。当启用 JMX 时,这需要为 MBean 提供唯一的名称。

从 1.3 版本开始,MessageListenerContainer提供对底层KafkaConsumer指标的访问。对于ConcurrentMessageListenerContainermetrics()方法返回所有目标KafkaMessageListenerContainer实例的指标。指标按底层KafkaConsumer提供的client-id分组到Map<MetricName, ? extends Metric>中。

从 2.3 版本开始,ContainerProperties提供了一个idleBetweenPolls选项,允许监听器容器中的主循环在KafkaConsumer.poll()调用之间休眠。实际的休眠间隔将选择为提供的选项和max.poll.interval.ms消费者配置与当前记录批处理时间之间的差值中的最小值。

提交偏移量

提供了几个选项来提交偏移量。如果enable.auto.commit消费者属性为true,则 Kafka 会根据其配置自动提交偏移量。如果为false,则容器支持几种AckMode设置(在下一列表中描述)。默认的AckModeBATCH。从 2.3 版本开始,框架将enable.auto.commit设置为false,除非在配置中显式设置。以前,如果未设置该属性,则使用 Kafka 默认值(true)。

消费者poll()方法返回一个或多个ConsumerRecords。对于每个记录,都会调用MessageListener。以下列表描述了容器对每个AckMode采取的操作(当未使用事务时)。

  • RECORD:在监听器处理完记录后提交偏移量。

  • BATCH:在处理完poll()返回的所有记录后提交偏移量。

  • TIME:在处理完poll()返回的所有记录后提交偏移量,只要自上次提交以来的ackTime已超过。

  • COUNT: 当所有由 poll() 返回的记录都被处理后,只要自上次提交以来已收到 ackCount 条记录,就提交偏移量。

  • COUNT_TIME: 与 TIMECOUNT 类似,但如果任一条件为 true,则执行提交。

  • MANUAL: 消息监听器负责 acknowledge() Acknowledgment。之后,将应用与 BATCH 相同的语义。

  • MANUAL_IMMEDIATE: 当监听器调用 Acknowledgment.acknowledge() 方法时,立即提交偏移量。

当使用 事务 时,偏移量将发送到事务,语义等同于 RECORDBATCH,具体取决于监听器类型(记录或批次)。

MANUALMANUAL_IMMEDIATE 要求监听器为 AcknowledgingMessageListenerBatchAcknowledgingMessageListener。请参阅 消息监听器

根据 syncCommits 容器属性,将使用消费者上的 commitSync()commitAsync() 方法。syncCommits 默认情况下为 true;另请参阅 setSyncCommitTimeout。请参阅 setCommitCallback 以获取异步提交的结果;默认回调是 LoggingCommitCallback,它记录错误(以及调试级别的成功)。

由于监听器容器有自己的提交偏移量的机制,因此它更倾向于将 Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG 设置为 false。从版本 2.3 开始,它会无条件地将其设置为 false,除非在消费者工厂中专门设置或容器的消费者属性覆盖。

Acknowledgment 具有以下方法

public interface Acknowledgment {

    void acknowledge();

}

此方法使监听器能够控制何时提交偏移量。

从版本 2.3 开始,Acknowledgment 接口有两个额外的 nack(long sleep)nack(int index, long sleep) 方法。第一个用于记录监听器,第二个用于批次监听器。为监听器类型调用错误的方法将抛出 IllegalStateException

如果您想提交部分批次,使用 nack(),当使用事务时,将 AckMode 设置为 MANUAL;调用 nack() 将将已成功处理的记录的偏移量发送到事务。
nack() 只能在调用监听器的消费者线程上调用。
当使用 乱序提交 时,不允许使用 nack()

使用记录监听器时,调用nack()方法会提交所有挂起的偏移量,丢弃上次轮询的剩余记录,并在其分区上执行查找,以便在下次poll()时重新传递失败的记录和未处理的记录。通过设置sleep参数,可以在重新传递之前暂停消费者。这与在容器配置为DefaultErrorHandler时抛出异常的功能类似。

nack()会暂停所有分配的分区,包括整个监听器,持续指定的休眠时间。

使用批处理监听器时,可以指定批处理中发生故障的索引。调用nack()方法时,将提交索引之前的记录的偏移量,并在分区上执行失败和丢弃记录的查找,以便在下次poll()时重新传递它们。

有关更多信息,请参见容器错误处理程序

消费者在休眠期间处于暂停状态,以便我们继续轮询代理以保持消费者处于活动状态。实际的休眠时间及其分辨率取决于容器的pollTimeout,默认值为 5 秒。最短休眠时间等于pollTimeout,所有休眠时间都将是它的倍数。对于较短的休眠时间或为了提高其准确性,请考虑减少容器的pollTimeout

从 3.0.10 版本开始,批处理监听器可以使用Acknowledgment参数上的acknowledge(index)方法提交批处理部分的偏移量。调用此方法时,将提交索引处的记录(以及所有之前的记录)的偏移量。在执行部分批处理提交后调用acknowledge()将提交批处理剩余部分的偏移量。以下限制适用

  • 需要AckMode.MANUAL_IMMEDIATE

  • 必须在监听器线程上调用该方法

  • 监听器必须使用List而不是原始的ConsumerRecords进行消费

  • 索引必须在列表元素的范围内

  • 索引必须大于先前调用中使用的索引

这些限制将被强制执行,该方法将抛出IllegalArgumentExceptionIllegalStateException,具体取决于违规情况。

监听器容器自动启动

监听器容器实现SmartLifecycle,并且autoStartup默认情况下为true。容器在后期阶段(Integer.MAX-VALUE - 100)启动。实现SmartLifecycle的其他组件(用于处理来自监听器的数据)应该在更早的阶段启动。- 100留出空间供后期阶段启用组件,以便在容器之后自动启动。