线程安全
当使用并发消息监听器容器时,所有消费者线程都会调用单个监听器实例。因此,监听器需要是线程安全的,并且最好使用无状态监听器。如果无法使您的监听器线程安全,或者添加同步会显著降低并发带来的好处,您可以使用以下几种技术之一:
-
使用
n个容器,concurrency=1,并使用原型作用域的MessageListenerbean,以便每个容器都有自己的实例(在使用@KafkaListener时无法实现)。 -
将状态保存在
ThreadLocal<?>实例中。 -
让单例监听器委托给在
SimpleThreadScope(或类似作用域)中声明的 bean。
为了方便清理线程状态(针对上述列表中的第二项和第三项),从版本 2.2 开始,监听器容器在每个线程退出时发布 ConsumerStoppedEvent。您可以使用 ApplicationListener 或 @EventListener 方法来消费这些事件,以移除 ThreadLocal<?> 实例或从作用域中 remove() 线程作用域的 bean。请注意,SimpleThreadScope 不会销毁具有销毁接口(如 DisposableBean)的 bean,因此您应该自行 destroy() 实例。
| 默认情况下,应用程序上下文的事件多播器在调用线程上调用事件监听器。如果您将多播器更改为使用异步执行器,线程清理将无效。 |
关于虚拟线程和并发消息监听器容器的特别说明
由于底层库类仍然使用 synchronized 块进行线程协调的某些限制,应用程序在使用虚拟线程与并发消息监听器容器时需要谨慎。当启用虚拟线程时,如果并发数超过可用的平台线程数,虚拟线程很可能会被固定在平台线程上,并可能出现竞态条件。因此,随着 Spring for Apache Kafka 所使用的第三方库发展以完全支持虚拟线程,建议将消息监听器容器的并发数保持等于或小于平台线程数。这样,应用程序可以避免线程之间以及虚拟线程被固定在平台线程上的任何竞态条件。