连接到 Kafka

从 2.5 版本开始,这些类都扩展了 KafkaResourceFactory。这允许通过在配置中添加 Supplier<String> 来在运行时更改引导服务器:setBootstrapServersSupplier(() -> …​)。这将用于所有新的连接以获取服务器列表。消费者和生产者通常是长生命周期的。要关闭现有的生产者,请在 DefaultKafkaProducerFactory 上调用 reset()。要关闭现有的消费者,请在 KafkaListenerEndpointRegistry 上调用 stop()(然后调用 start())或在任何其他监听器容器 bean 上调用 stop()start()

为了方便起见,框架还提供了一个ABSwitchCluster,它支持两组引导服务器;其中一个在任何时候都是活动的。配置ABSwitchCluster并将其添加到生产者和消费者工厂以及KafkaAdmin中,方法是调用setBootstrapServersSupplier()。当您想要切换时,调用primary()secondary(),并在生产者工厂上调用reset()以建立新的连接;对于消费者,stop()start()所有监听器容器。当使用@KafkaListener时,stop()start()KafkaListenerEndpointRegistry bean。

有关更多信息,请参见 Javadoc。

工厂监听器

从版本 2.5 开始,DefaultKafkaProducerFactoryDefaultKafkaConsumerFactory 可以配置一个Listener,以便在创建或关闭生产者或消费者时接收通知。

生产者工厂监听器
interface Listener<K, V> {

    default void producerAdded(String id, Producer<K, V> producer) {
    }

    default void producerRemoved(String id, Producer<K, V> producer) {
    }

}
消费者工厂监听器
interface Listener<K, V> {

    default void consumerAdded(String id, Consumer<K, V> consumer) {
    }

    default void consumerRemoved(String id, Consumer<K, V> consumer) {
    }

}

在每种情况下,id都是通过将client-id属性(在创建后从metrics()获取)追加到工厂beanName属性,并用.分隔来创建的。

例如,这些监听器可用于在创建新客户端时创建并绑定 Micrometer KafkaClientMetrics 实例(并在客户端关闭时关闭它)。

框架提供了执行此操作的监听器;请参见Micrometer 本机指标

默认客户端 ID 前缀

从版本 3.2 开始,对于使用spring.application.name属性定义应用程序名称的 Spring Boot 应用程序,此名称现在用作这些客户端类型的自动生成的客户端 ID 的默认前缀

  • 不使用消费者组的消费者客户端

  • 生产者客户端

  • 管理员客户端

这使得在服务器端更易于识别这些客户端以进行故障排除或应用配额。

表 1. 对于spring.application.name=myapp的 Spring Boot 应用程序,示例客户端 ID 结果
客户端类型 没有应用程序名称 有应用程序名称

没有消费者组的消费者

consumer-null-1

myapp-consumer-1

具有消费者组“mygroup”的消费者

consumer-mygroup-1

consumer-mygroup-1

生产者

producer-1

myapp-producer-1

管理员

adminclient-1

myapp-admin-1