线程安全

当使用并发消息监听器容器时,所有消费者线程都会调用单个监听器实例。因此,监听器需要是线程安全的,最好使用无状态监听器。如果无法使监听器线程安全,或者添加同步会显着降低添加并发的益处,您可以使用以下几种技术之一

  • 使用 n 个容器,concurrency=1,并使用原型范围的 MessageListener bean,以便每个容器都获得自己的实例(在使用 @KafkaListener 时无法做到这一点)。

  • 将状态保存在 ThreadLocal<?> 实例中。

  • 让单例监听器委托给在 SimpleThreadScope(或类似范围)中声明的 bean。

为了便于清理线程状态(对于前面列表中的第二项和第三项),从版本 2.2 开始,监听器容器在每个线程退出时发布 ConsumerStoppedEvent。您可以使用 ApplicationListener@EventListener 方法来使用这些事件来删除 ThreadLocal<?> 实例或从范围中 remove() 线程范围的 bean。请注意,SimpleThreadScope 不会销毁具有销毁接口(如 DisposableBean)的 bean,因此您应该自己 destroy() 实例。

默认情况下,应用程序上下文的事件多播器在调用线程上调用事件监听器。如果您将多播器更改为使用异步执行器,则线程清理将无效。

关于虚拟线程和并发消息监听器容器的特别说明

由于底层库类仍然使用 synchronized 块进行线程协调,因此在使用虚拟线程和并发消息监听器容器时,应用程序需要谨慎。当启用虚拟线程时,如果并发量超过可用平台线程数,虚拟线程很可能被固定在平台线程上,并可能出现竞争条件。因此,随着 Spring for Apache Kafka 使用的第三方库演变为完全支持虚拟线程,建议将消息监听器容器上的并发量保持为等于或小于平台线程数。这样,应用程序可以避免线程之间以及虚拟线程被固定在平台线程上的任何竞争条件。