监控

监控侦听器性能

从版本 2.3 开始,如果在类路径中检测到 Micrometer 并且应用程序上下文中存在单个 MeterRegistry,侦听器容器将自动为侦听器创建和更新 Micrometer Timer。可以通过将 ContainerPropertymicrometerEnabled 设置为 false 来禁用计时器。

维护两个计时器 - 一个用于对侦听器的成功调用,另一个用于失败。

计时器名为 spring.kafka.listener,并带有以下标记

  • name : (容器 bean 名称)

  • result : successfailure

  • exception : noneListenerExecutionFailedException

你可以使用 ContainerPropertiesmicrometerTags 属性添加其他标记。

从版本 2.9.8、3.0.6 开始,你可以在 ContainerPropertiesmicrometerTagsProvider 中提供一个函数;该函数接收 ConsumerRecord<?, ?> 并返回可以基于该记录的标记,并与 micrometerTags 中的任何静态标记合并。

对于并发容器,将为每个线程创建计时器,并且name 标记会后缀为 -n,其中 n 为 0concurrency-1

监控 KafkaTemplate 性能

从版本 2.5 开始,如果在类路径中检测到 Micrometer,并且应用程序上下文中存在一个 MeterRegistry,则该模板将自动创建和更新发送操作的 Micrometer Timer。可以通过将模板的 micrometerEnabled 属性设置为 false 来禁用计时器。

维护两个计时器 - 一个用于对侦听器的成功调用,另一个用于失败。

计时器的名称为 spring.kafka.template,并且具有以下标记

  • name : (模板 bean 名称)

  • result : successfailure

  • exception : none 或失败的异常类名称

可以使用模板的 micrometerTags 属性添加其他标记。

从版本 2.9.8、3.0.6 开始,你可以提供一个 KafkaTemplate.setMicrometerTagsProvider(Function<ProducerRecord<?, ?>, Map<String, String>>) 属性;该函数接收 ProducerRecord<?, ?>,并返回可以基于该记录的标记,并与 micrometerTags 中的任何静态标记合并。

Micrometer 原生指标

从版本 2.5 开始,该框架提供了 工厂侦听器,用于在创建和关闭生产者和消费者时管理 Micrometer KafkaClientMetrics 实例。

要启用此功能,只需将侦听器添加到你的生产者和消费者工厂即可

@Bean
public ConsumerFactory<String, String> myConsumerFactory() {
    Map<String, Object> configs = consumerConfigs();
    ...
    DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(configs);
    ...
    cf.addListener(new MicrometerConsumerListener<String, String>(meterRegistry(),
            Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
    ...
    return cf;
}

@Bean
public ProducerFactory<String, String> myProducerFactory() {
    Map<String, Object> configs = producerConfigs();
    configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
    ...
    DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
    ...
    pf.addListener(new MicrometerProducerListener<String, String>(meterRegistry(),
            Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
    ...
    return pf;
}

传递给侦听器的消费者/生产者 id 会使用标记名称 spring.id 添加到指标的标记中。

获取 Kafka 指标之一的示例
double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
                .tag("customTag", "customTagValue")
                .tag("spring.id", "myProducerFactory.myClientId-1")
                .functionCounter()
                .count();

StreamsBuilderFactoryBean 提供了类似的侦听器 - 请参阅 KafkaStreams Micrometer 支持

Micrometer 观察

从版本 3.0 开始,现在支持对 KafkaTemplate 和侦听器容器使用 Micrometer 进行观察。

KafkaTemplateContainerProperties 上将 observationEnabled 设置为 true 以启用观察;这将禁用 Micrometer Timers,因为计时器现在将通过每次观察进行管理。

Micrometer 观察不支持批处理侦听器;这将启用 Micrometer Timers

有关更多信息,请参阅 Micrometer Tracing

要向计时器/跟踪添加标记,请分别为模板或侦听器容器配置自定义 KafkaTemplateObservationConventionKafkaListenerObservationConvention

默认实现为模板观察添加 bean.name 标记,为容器添加 listener.id 标记。

你可以子类化 DefaultKafkaTemplateObservationConventionDefaultKafkaListenerObservationConvention,或提供全新的实现。

有关记录的默认观察的详细信息,请参阅 Micrometer 观察文档

从 3.0.6 版本开始,您可以根据消费者或生产者记录中的信息,向计时器和跟踪添加动态标签。为此,请分别向侦听器容器属性或 KafkaTemplate 添加自定义 KafkaListenerObservationConvention 和/或 KafkaTemplateObservationConvention。观察上下文中的 record 属性分别包含 ConsumerRecordProducerRecord

发送方和接收方上下文 remoteServiceName 属性设置为 Kafka clusterId 属性;这是由 KafkaAdmin 检索的。如果由于某种原因(可能是缺少管理权限)而无法检索集群 ID,从 3.1 版本开始,您可以在 KafkaAdmin 上设置手动 clusterId,并将其注入到 KafkaTemplate 和侦听器容器中。当它为 null(默认值)时,管理程序将调用 describeCluster 管理操作从代理中检索它。