过滤消息
在某些情况下,例如重新平衡,已经处理过的消息可能会被重新传递。框架无法知道这样的消息是否已被处理。这是一个应用程序级别的功能。这被称为 幂等接收器 模式,Spring Integration 提供了它的 实现。
Apache Kafka 的 Spring 项目还通过 FilteringMessageListenerAdapter
类提供了一些帮助,该类可以包装你的 MessageListener
。此类接受 RecordFilterStrategy
的实现,你可以在其中实现 filter
方法来表明消息是重复的,应该被丢弃。它有一个额外的属性叫做 ackDiscarded
,它指示适配器是否应该确认丢弃的记录。默认情况下它是 false
。
当你使用 @KafkaListener
时,在容器工厂上设置 RecordFilterStrategy
(以及可选的 ackDiscarded
),以便监听器被包装在适当的过滤适配器中。
此外,还提供了一个 FilteringBatchMessageListenerAdapter
,用于当你使用批处理 消息监听器 时。
如果你的 @KafkaListener 接收的是 ConsumerRecords<?, ?> 而不是 List<ConsumerRecord<?, ?>> ,则会忽略 FilteringBatchMessageListenerAdapter ,因为 ConsumerRecords 是不可变的。
|
从 2.8.4 版本开始,你可以使用监听器注释上的 filter
属性来覆盖监听器容器工厂的默认 RecordFilterStrategy
。
@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
...
}