监控
监控监听器性能
从 2.3 版本开始,如果类路径中检测到 Micrometer
并且应用程序上下文只有一个 MeterRegistry
,则监听器容器将自动创建和更新 Micrometer Timer
。可以通过将 ContainerProperty
的 micrometerEnabled
设置为 false
来禁用计时器。
维护两个计时器——一个用于监听器的成功调用,另一个用于失败。
计时器的名称为 spring.kafka.listener
,并具有以下标签:
-
name
:(容器 bean 名称) -
result
:success
或failure
-
exception
:none
或ListenerExecutionFailedException
可以使用 ContainerProperties
的 micrometerTags
属性添加其他标签。
从 2.9.8 和 3.0.6 版本开始,可以在 ContainerProperties
的 micrometerTagsProvider
中提供一个函数;该函数接收 ConsumerRecord<?, ?>
并返回可以基于该记录的标签,并与 micrometerTags
中的任何静态标签合并。
使用并发容器时,会为每个线程创建计时器,并且 name 标签后缀为 -n ,其中 n 为 0 到 concurrency-1 。 |
监控 KafkaTemplate 性能
从 2.5 版本开始,如果类路径中检测到 Micrometer
并且应用程序上下文只有一个 MeterRegistry
,则模板将自动创建和更新用于发送操作的 Micrometer Timer
。可以通过将模板的 micrometerEnabled
属性设置为 false
来禁用计时器。
维护两个计时器——一个用于监听器的成功调用,另一个用于失败。
计时器的名称为 spring.kafka.template
,并具有以下标签:
-
name
:(模板 bean 名称) -
result
:success
或failure
-
exception
:none
或失败的异常类名
可以使用模板的 micrometerTags
属性添加其他标签。
从 2.9.8 和 3.0.6 版本开始,可以提供一个 KafkaTemplate.setMicrometerTagsProvider(Function<ProducerRecord<?, ?>, Map<String, String>>)
属性;该函数接收 ProducerRecord<?, ?>
并返回可以基于该记录的标签,并与 micrometerTags
中的任何静态标签合并。
Micrometer 原生指标
从 2.5 版本开始,框架提供 工厂监听器 来管理 Micrometer KafkaClientMetrics
实例,无论何时创建和关闭生产者和消费者。
要启用此功能,只需将监听器添加到生产者和消费者工厂。
@Bean
public ConsumerFactory<String, String> myConsumerFactory() {
Map<String, Object> configs = consumerConfigs();
...
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(configs);
...
cf.addListener(new MicrometerConsumerListener<String, String>(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
...
return cf;
}
@Bean
public ProducerFactory<String, String> myProducerFactory() {
Map<String, Object> configs = producerConfigs();
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
...
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
...
pf.addListener(new MicrometerProducerListener<String, String>(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
...
return pf;
}
传递给监听器的消费者/生产者 id
将添加到计量器的标签中,标签名为 spring.id
。
double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
.tag("customTag", "customTagValue")
.tag("spring.id", "myProducerFactory.myClientId-1")
.functionCounter()
.count();
为 StreamsBuilderFactoryBean
提供了类似的监听器 - 请参见 KafkaStreams Micrometer 支持。
Micrometer 观察
从 3.0 版本开始,现在支持使用 Micrometer 进行观察,用于 KafkaTemplate
和监听器容器。
将 KafkaTemplate
和 ContainerProperties
上的 observationEnabled
设置为 true
以启用观察;这将禁用 Micrometer 计时器,因为计时器现在将随每次观察一起管理。
Micrometer 观察不支持批处理监听器;这将启用 Micrometer 计时器 |
更多信息,请参考 Micrometer 追踪。
要向计时器/追踪器添加标签,请分别为模板或侦听器容器配置自定义的KafkaTemplateObservationConvention
或KafkaListenerObservationConvention
。
默认实现为模板观测添加bean.name
标签,为容器添加listener.id
标签。
您可以继承DefaultKafkaTemplateObservationConvention
或DefaultKafkaListenerObservationConvention
,也可以提供全新的实现。
有关记录的默认观测的详细信息,请参阅Micrometer 观测文档。
从 3.0.6 版本开始,您可以根据使用者或生产者记录中的信息,向计时器和追踪器添加动态标签。为此,请分别向侦听器容器属性或KafkaTemplate
添加自定义的KafkaListenerObservationConvention
和/或KafkaTemplateObservationConvention
。record
属性在两种观测上下文中都包含ConsumerRecord
或ProducerRecord
。
发送方和接收方的上下文remoteServiceName
属性设置为 Kafka 的clusterId
属性;这是通过KafkaAdmin
检索的。如果由于某些原因(例如缺乏管理员权限)无法检索集群 ID,从 3.1 版本开始,您可以在KafkaAdmin
上设置手动clusterId
并将其注入到KafkaTemplate
和侦听器容器中。当它为null
(默认值)时,管理员将调用describeCluster
管理员操作以从代理检索它。