应用程序事件

以下 Spring 应用程序事件由监听器容器及其消费者发布

  • ConsumerStartingEvent:当消费者线程首次启动时发布,在它开始轮询之前。

  • ConsumerStartedEvent:当消费者即将开始轮询时发布。

  • ConsumerFailedToStartEvent:如果在 consumerStartTimeout 容器属性内没有发布 ConsumerStartingEvent,则发布此事件。此事件可能表明配置的任务执行器没有足够的线程来支持它所使用的容器及其并发性。当发生这种情况时,还会记录错误消息。

  • ListenerContainerIdleEvent:当在 idleInterval(如果已配置)内没有收到任何消息时发布。

  • ListenerContainerNoLongerIdleEvent:当在之前发布 ListenerContainerIdleEvent 后消费到记录时发布。

  • ListenerContainerPartitionIdleEvent:当在 idlePartitionEventInterval(如果已配置)内没有从该分区收到任何消息时发布。

  • ListenerContainerPartitionNoLongerIdleEvent:当从之前发布 ListenerContainerPartitionIdleEvent 的分区消费到记录时发布。

  • NonResponsiveConsumerEvent: 当消费者在 poll 方法中被阻塞时发布。

  • ConsumerPartitionPausedEvent: 当分区被暂停时,由每个消费者发布。

  • ConsumerPartitionResumedEvent: 当分区被恢复时,由每个消费者发布。

  • ConsumerPausedEvent: 当容器被暂停时,由每个消费者发布。

  • ConsumerResumedEvent: 当容器被恢复时,由每个消费者发布。

  • ConsumerStoppingEvent: 在停止之前,由每个消费者发布。

  • ConsumerStoppedEvent: 消费者关闭后发布。参见 线程安全

  • ConsumerRetryAuthEvent: 当消费者的身份验证或授权失败并正在重试时发布。

  • ConsumerRetryAuthSuccessfulEvent: 当身份验证或授权已成功重试时发布。只有在之前发生过 ConsumerRetryAuthEvent 时才会发生。

  • ContainerStoppedEvent: 当所有消费者都停止时发布。

默认情况下,应用程序上下文的事件多播器在调用线程上调用事件监听器。如果您将多播器更改为使用异步执行器,则在事件包含对消费者的引用时,您不能调用任何 Consumer 方法。

ListenerContainerIdleEvent 具有以下属性

  • source: 发布事件的监听器容器实例。

  • container: 监听器容器或父监听器容器(如果源容器是子容器)。

  • id: 监听器 ID(或容器 bean 名称)。

  • idleTime: 发布事件时容器处于空闲状态的时间。

  • topicPartitions: 生成事件时容器分配的主题和分区。

  • consumer: 对 Kafka Consumer 对象的引用。例如,如果之前调用了消费者的 pause() 方法,则可以在收到事件时 resume()

  • paused: 容器当前是否处于暂停状态。有关更多信息,请参见 暂停和恢复监听器容器

ListenerContainerNoLongerIdleEvent 具有相同的属性,除了 idleTimepaused

ListenerContainerPartitionIdleEvent 具有以下属性

  • source: 发布事件的监听器容器实例。

  • container: 监听器容器或父监听器容器(如果源容器是子容器)。

  • id: 监听器 ID(或容器 bean 名称)。

  • idleTime: 发布事件时分区消费处于空闲状态的时间。

  • topicPartition: 触发事件的主题和分区。

  • consumer: 对 Kafka Consumer 对象的引用。例如,如果之前调用了消费者的 pause() 方法,则可以在收到事件时 resume()

  • paused: 该消费者的分区消费当前是否处于暂停状态。有关更多信息,请参见 暂停和恢复监听器容器

ListenerContainerPartitionNoLongerIdleEvent 具有相同的属性,除了 idleTimepaused

NonResponsiveConsumerEvent 具有以下属性

  • source: 发布事件的监听器容器实例。

  • container: 监听器容器或父监听器容器(如果源容器是子容器)。

  • id: 监听器 ID(或容器 bean 名称)。

  • timeSinceLastPoll: 容器上次调用 poll() 之前的时间。

  • topicPartitions: 生成事件时容器分配的主题和分区。

  • consumer: 对 Kafka Consumer 对象的引用。例如,如果之前调用了消费者的 pause() 方法,则可以在收到事件时 resume()

  • paused: 容器当前是否处于暂停状态。有关更多信息,请参见 暂停和恢复监听器容器

ConsumerPausedEventConsumerResumedEventConsumerStopping 事件具有以下属性

  • source: 发布事件的监听器容器实例。

  • container: 监听器容器或父监听器容器(如果源容器是子容器)。

  • partitions: 涉及的 TopicPartition 实例。

ConsumerPartitionPausedEventConsumerPartitionResumedEvent 事件具有以下属性

  • source: 发布事件的监听器容器实例。

  • container: 监听器容器或父监听器容器(如果源容器是子容器)。

  • partition: 涉及的 TopicPartition 实例。

ConsumerRetryAuthEvent 事件具有以下属性

  • source: 发布事件的监听器容器实例。

  • container: 监听器容器或父监听器容器(如果源容器是子容器)。

  • 原因:

    • AUTHENTICATION - 由于身份验证异常而发布了该事件。

    • AUTHORIZATION - 由于授权异常而发布了该事件。

ConsumerStartingEventConsumerStartedEventConsumerFailedToStartEventConsumerStoppedEventConsumerRetryAuthSuccessfulEventContainerStoppedEvent 事件具有以下属性

  • source: 发布事件的监听器容器实例。

  • container: 监听器容器或父监听器容器(如果源容器是子容器)。

所有容器(无论是子容器还是父容器)都会发布 ContainerStoppedEvent。对于父容器,源和容器属性相同。

此外,ConsumerStoppedEvent 具有以下附加属性

  • 原因:

    • NORMAL - 消费者正常停止(容器已停止)。

    • ERROR - 抛出了 java.lang.Error

    • FENCED - 事务性生产者被隔离,并且 stopContainerWhenFenced 容器属性为 true

    • AUTH - 抛出了 AuthenticationExceptionAuthorizationException,并且未配置 authExceptionRetryInterval

    • NO_OFFSET - 分区没有偏移量,并且 auto.offset.reset 策略为 none

您可以使用此事件在发生此类情况后重新启动容器

if (event.getReason.equals(Reason.FENCED)) {
    event.getSource(MessageListenerContainer.class).start();
}

检测空闲和无响应的消费者

虽然异步消费者效率很高,但检测它们何时处于空闲状态是一个问题。如果在一段时间内没有收到任何消息,您可能希望采取一些措施。

您可以配置监听器容器,以便在一段时间内没有消息传递时发布 ListenerContainerIdleEvent。当容器处于空闲状态时,每 idleEventInterval 毫秒发布一次事件。

要配置此功能,请在容器上设置 idleEventInterval。以下示例演示了如何操作

@Bean
public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
    ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
    ...
    containerProps.setIdleEventInterval(60000L);
    ...
    KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(consumerFactory, containerProps);
    return container;
}

以下示例演示了如何为 @KafkaListener 设置 idleEventInterval

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.getContainerProperties().setIdleEventInterval(60000L);
    ...
    return factory;
}

在所有这些情况下,当容器处于空闲状态时,每分钟发布一次事件。

如果由于某种原因,消费者 poll() 方法没有退出,没有收到任何消息,并且无法生成空闲事件(这是早期版本的 kafka-clients 在代理不可达时出现的问题)。在这种情况下,如果在 3x pollTimeout 属性内 poll 没有返回,容器将发布 NonResponsiveConsumerEvent。默认情况下,此检查每 30 秒在每个容器中执行一次。您可以通过在配置监听器容器时在 ContainerProperties 中设置 monitorInterval(默认值为 30 秒)和 noPollThreshold(默认值为 3.0)属性来修改此行为。noPollThreshold 应大于 1.0,以避免由于竞争条件而导致出现虚假事件。收到此类事件后,您可以停止容器,从而唤醒消费者,以便它可以停止。

从 2.6.2 版本开始,如果容器发布了 ListenerContainerIdleEvent,则在随后收到记录时,它将发布 ListenerContainerNoLongerIdleEvent

事件消费

您可以通过实现 ApplicationListener 来捕获这些事件,可以是通用监听器,也可以是仅接收特定事件的监听器。您也可以使用 Spring Framework 4.2 中引入的 @EventListener

以下示例将 @KafkaListener@EventListener 结合到一个类中。您应该了解,应用程序监听器会接收所有容器的事件,因此您可能需要检查监听器 ID,以便根据哪个容器处于空闲状态来采取特定操作。您也可以为此目的使用 @EventListenercondition

有关事件属性的信息,请参见 应用程序事件

事件通常在消费者线程上发布,因此与 Consumer 对象交互是安全的。

以下示例同时使用 @KafkaListener@EventListener

public class Listener {

    @KafkaListener(id = "qux", topics = "annotated")
    public void listen4(@Payload String foo, Acknowledgment ack) {
        ...
    }

    @EventListener(condition = "event.listenerId.startsWith('qux-')")
    public void eventHandler(ListenerContainerIdleEvent event) {
        ...
    }

}
事件监听器会看到所有容器的事件。因此,在前面的示例中,我们根据监听器 ID 缩小接收到的事件范围。由于为 @KafkaListener 创建的容器支持并发,因此实际容器的名称为 id-n,其中 n 是每个实例的唯一值,用于支持并发。这就是我们在条件中使用 startsWith 的原因。
如果您希望使用空闲事件来停止监听器容器,则不应在调用监听器的线程上调用 container.stop()。这样做会导致延迟和不必要的日志消息。相反,您应该将事件传递给另一个线程,然后该线程可以停止容器。此外,如果容器实例是子容器,则不应 stop() 它。您应该停止并发容器。

空闲时的当前位置

请注意,您可以通过在侦听器中实现 ConsumerSeekAware 来获取检测到空闲状态时的当前位置。请参阅 seek 中的 onIdleContainer()