消息过滤
Spring for Apache Kafka 项目还通过 FilteringMessageListenerAdapter
类提供了一些帮助,该类可以包装您的 MessageListener
。此类采用 RecordFilterStrategy
的实现,您可以在其中实现 filter
方法以指示消息是重复的,应将其丢弃。它还有一个名为 ackDiscarded
的附加属性,指示适配器是否应确认已丢弃的记录。默认情况下为 false
。
当您使用 @KafkaListener
时,请在容器工厂上设置 RecordFilterStrategy
(以及可选的 ackDiscarded
),以便监听器包装在适当的过滤适配器中。
此外,还提供了 FilteringBatchMessageListenerAdapter
,用于您使用批处理消息监听器时。
如果您的 @KafkaListener 接收 ConsumerRecords<?, ?> 而不是 List<ConsumerRecord<?, ?>> ,则 FilteringBatchMessageListenerAdapter 将被忽略,因为 ConsumerRecords 是不可变的。 |
从 2.8.4 版本开始,您可以使用监听器注释上的 filter
属性覆盖监听器容器工厂的默认 RecordFilterStrategy
。
@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
...
}