连接 Kafka

从 2.5 版本开始,这些都扩展了 KafkaResourceFactory。这允许通过向其配置中添加 Supplier<String> 来在运行时更改引导服务器:setBootstrapServersSupplier(() -> …​)。这将被调用以获取所有新连接的服务器列表。消费者和生产者通常是长期存在的。要关闭现有的生产者,请在 DefaultKafkaProducerFactory 上调用 reset()。要关闭现有的消费者,请在 KafkaListenerEndpointRegistry 上调用 stop()(然后调用 start())和/或在任何其他监听器容器 bean 上调用 stop()start()

为方便起见,框架还提供了一个 ABSwitchCluster,它支持两组引导服务器;其中一个在任何时候都是活动的。配置 ABSwitchCluster 并将其添加到生产者和消费者工厂以及 KafkaAdmin 中,方法是调用 setBootstrapServersSupplier()。当您想要切换时,调用 primary()secondary() 并调用生产者工厂上的 reset() 以建立新的连接;对于消费者,请对所有监听器容器调用 stop()start()。当使用 @KafkaListener 时,请对 KafkaListenerEndpointRegistry bean 调用 stop()start()

请参阅 Javadoc 以获取更多信息。

工厂监听器

从 2.5 版本开始,DefaultKafkaProducerFactoryDefaultKafkaConsumerFactory 可以配置一个 Listener 来接收每当创建或关闭生产者或消费者时的通知。

生产者工厂监听器
interface Listener<K, V> {

    default void producerAdded(String id, Producer<K, V> producer) {
    }

    default void producerRemoved(String id, Producer<K, V> producer) {
    }

}
消费者工厂监听器
interface Listener<K, V> {

    default void consumerAdded(String id, Consumer<K, V> consumer) {
    }

    default void consumerRemoved(String id, Consumer<K, V> consumer) {
    }

}

在每种情况下,id 都是通过将 client-id 属性(创建后从 metrics() 获取)附加到工厂 beanName 属性,并用 . 分隔来创建的。

例如,这些监听器可用于在创建新客户端时创建和绑定 Micrometer KafkaClientMetrics 实例(并在客户端关闭时关闭它)。

框架提供了执行此操作的监听器;请参见 Micrometer 原生指标

默认客户端 ID 前缀

从 3.2 版本开始,对于使用 spring.application.name 属性定义应用程序名称的 Spring Boot 应用程序,此名称现在用作这些客户端类型的自动生成的客户端 ID 的默认前缀

  • 不使用消费者组的消费者客户端

  • 生产者客户端

  • 管理员客户端

这使得更容易在服务器端识别这些客户端以进行故障排除或应用配额。

表 1. spring.application.name=myapp 的 Spring Boot 应用程序生成的示例客户端 ID
客户端类型 无应用程序名称 有应用程序名称

无消费者组的消费者

consumer-null-1

myapp-consumer-1

带有消费者组“mygroup”的消费者

consumer-mygroup-1

consumer-mygroup-1

生产者

producer-1

myapp-producer-1

管理员

adminclient-1

myapp-admin-1