监控

监控监听器性能

从 2.3 版本开始,如果类路径中检测到 `Micrometer` 且应用程序上下文中存在单个 `MeterRegistry`,监听器容器将自动为监听器创建并更新 Micrometer `Timer`。可以通过将 `ContainerProperty` 的 `micrometerEnabled` 设置为 `false` 来禁用计时器。

维护两个计时器 - 一个用于成功调用监听器,一个用于失败。

计时器名为 `spring.kafka.listener`,并具有以下标签:

  • name : (容器 Bean 名称)

  • result : successfailure

  • exception : noneListenerExecutionFailedException

您可以使用 `ContainerProperties` 的 `micrometerTags` 属性添加额外标签。

从 2.9.8、3.0.6 版本开始,您可以在 `ContainerProperties` 的 `micrometerTagsProvider` 中提供一个函数;该函数接收 `ConsumerRecord<?, ?>` 并返回可以基于该记录的标签,并与 `micrometerTags` 中的任何静态标签合并。

对于并发容器,会为每个线程创建计时器,并且 `name` 标签后缀为 `-n`,其中 n 为 `0` 到 `concurrency-1`。

监控 KafkaTemplate 性能

从 2.5 版本开始,如果类路径中检测到 `Micrometer` 且应用程序上下文中存在单个 `MeterRegistry`,模板将自动为发送操作创建并更新 Micrometer `Timer`。可以通过将模板的 `micrometerEnabled` 属性设置为 `false` 来禁用计时器。

维护两个计时器 - 一个用于成功调用监听器,一个用于失败。

计时器名为 `spring.kafka.template`,并具有以下标签:

  • name : (模板 Bean 名称)

  • result : successfailure

  • exception : none 或失败的异常类名

您可以使用模板的 `micrometerTags` 属性添加额外标签。

从 2.9.8、3.0.6 版本开始,您可以提供一个 `KafkaTemplate.setMicrometerTagsProvider(Function<ProducerRecord<?, ?>, Map<String, String>>)` 属性;该函数接收 `ProducerRecord<?, ?>` 并返回可以基于该记录的标签,并与 `micrometerTags` 中的任何静态标签合并。

Micrometer 原生指标

从 2.5 版本开始,该框架提供了 工厂监听器,用于在生产者和消费者创建和关闭时管理 Micrometer `KafkaClientMetrics` 实例。

要启用此功能,只需将监听器添加到您的生产者和消费者工厂中。

@Bean
public ConsumerFactory<String, String> myConsumerFactory() {
    Map<String, Object> configs = consumerConfigs();
    ...
    DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(configs);
    ...
    cf.addListener(new MicrometerConsumerListener<String, String>(meterRegistry(),
            Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
    ...
    return cf;
}

@Bean
public ProducerFactory<String, String> myProducerFactory() {
    Map<String, Object> configs = producerConfigs();
    configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
    ...
    DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
    ...
    pf.addListener(new MicrometerProducerListener<String, String>(meterRegistry(),
            Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
    ...
    return pf;
}

传递给监听器的消费者/生产者 `id` 会添加到指标的标签中,标签名为 `spring.id`。

获取其中一个 Kafka 指标的示例:
double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
                .tag("customTag", "customTagValue")
                .tag("spring.id", "myProducerFactory.myClientId-1")
                .functionCounter()
                .count();

为 `StreamsBuilderFactoryBean` 提供了类似的监听器 - 请参阅 KafkaStreams Micrometer 支持

从 3.3 版本开始,引入了一个 `KafkaMetricsSupport` 抽象类来管理 `io.micrometer.core.instrument.binder.kafka.KafkaMetrics` 绑定到所提供的 Kafka 客户端的 `MeterRegistry` 中。这个类是上述 `MicrometerConsumerListener`、`MicrometerProducerListener` 和 `KafkaStreamsMicrometerListener` 的父类。但是,它可以用于任何 Kafka 客户端用例。该类需要被扩展,并且必须调用其 `bindClient()` 和 `unbindClient()` API 来将 Kafka 客户端指标与 Micrometer 收集器连接起来。

Micrometer 观测

自 3.0 版本起,`KafkaTemplate` 和监听器容器现在支持使用 Micrometer 进行观测。

在 `KafkaTemplate` 和 `ContainerProperties` 上将 `observationEnabled` 设置为 `true` 以启用观测;这将禁用 Micrometer 计时器,因为计时器现在将随每个观测进行管理。

Micrometer Observation 不支持批处理监听器;这将启用 Micrometer Timers。

有关更多信息,请参阅 Micrometer Tracing

要向计时器/跟踪添加标签,请分别配置自定义的 `KafkaTemplateObservationConvention` 或 `KafkaListenerObservationConvention` 到模板或监听器容器。

默认实现为模板观测添加 `bean.name` 标签,为容器添加 `listener.id` 标签。

您可以继承 `DefaultKafkaTemplateObservationConvention` 或 `DefaultKafkaListenerObservationConvention`,或提供全新的实现。

有关记录的默认观测的详细信息,请参阅 Micrometer 观测文档

从 3.0.6 版本开始,您可以根据消费者或生产者记录中的信息,为计时器和跟踪添加动态标签。为此,请分别将自定义的 `KafkaListenerObservationConvention` 和/或 `KafkaTemplateObservationConvention` 添加到监听器容器属性或 `KafkaTemplate` 中。两个观测上下文中的 `record` 属性分别包含 `ConsumerRecord` 或 `ProducerRecord`。

发送方和接收方上下文的 `remoteServiceName` 属性设置为 Kafka `clusterId` 属性;此属性由 `KafkaAdmin` 检索。如果由于某种原因(例如缺少管理权限)您无法检索集群 ID,从 3.1 版本开始,您可以在 `KafkaAdmin` 上手动设置 `clusterId`,并将其注入到 `KafkaTemplate` 和监听器容器中。当其为 `null`(默认值)时,admin 将调用 `describeCluster` 管理操作从 Broker 检索它。

批量监听器观测

使用批处理监听器时,即使存在 `ObservationRegistry`,默认情况下也不会创建观测。这是因为观测的范围与线程绑定,而对于批处理监听器,观测与记录之间没有一对一的映射。

要在批处理监听器中启用每条记录的观测,请将容器工厂属性 `recordObservationsInBatch` 设置为 `true`。

@Bean
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory) {

    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, kafkaConsumerFactory);
    factory.getContainerProperties().setRecordObservationsInBatch(true);
    return factory;
}

当此属性为 `true` 时,将为批处理中的每条记录创建一个观测,但该观测不会传播到监听器方法。然后,应用程序可以使用观测上下文来跟踪批处理中每条记录的处理。这使您可以在批处理上下文中,也能查看每条记录的处理情况。

© . This site is unofficial and not affiliated with VMware.