消费批次

从 3.0 版本开始,当 spring.cloud.stream.bindings.<name>.consumer.batch-mode 设置为 true 时,所有由 Kafka Consumer 轮询接收的记录将作为 List<?> 提供给监听器方法。否则,该方法将每次调用一个记录。批次的大小由 Kafka 消费者属性 max.poll.recordsfetch.min.bytesfetch.max.wait.ms 控制;有关更多信息,请参阅 Kafka 文档。

接收批次时,允许以下类型签名

List<Person>
Message<List<Person>>

List<Person> 的第一个选项中,监听器不会收到任何消息头。如果使用第二个类型签名 (Message<List<Person>>),则可以访问消息头;但是,所有消息头仍然以 Collection 的形式存在。让我们看下面的例子。

假设 Message 包含一个包含十个 Person 对象的列表。MessageMessageHeaders 包含一个消息头映射,键为消息头名称,值为一个列表。此列表包含该消息头的消息头值,顺序与有效负载列表相同。因此,应用程序需要根据有效负载列表的迭代,从 MessageHeaders 映射中正确访问消息头。

请注意,在批量模式下消费时,不允许使用 List<Message<Person>> 形式的类型签名。

4.0.2 版本开始,绑定器在批量模式下消费时支持 DLQ 功能。请记住,当在批量模式下的消费者绑定上使用 DLQ 时,从上次轮询接收到的所有记录都将被传递到 DLQ 主题。

在使用批量模式时,绑定器不支持重试,因此 maxAttempts 将被覆盖为 1。您可以配置一个 DefaultErrorHandler(使用 ListenerContainerCustomizer)来实现与绑定器中重试类似的功能。您也可以使用手动 AckMode 并调用 Ackowledgment.nack(index, sleep) 来提交部分批次的偏移量,并让剩余的记录重新传递。有关这些技术的更多信息,请参阅 Spring for Apache Kafka 文档
在批量模式下接收 KafkaNull 对象时,接收到的列表将包含与 KafkaNull 对象相对应的空元素。这对于 List<Person>Message<List<Person>> 样式的类型签名都是正确的。

在批量模式下消费时的可观察性

在批次消费记录时,不支持直接使用观察跟踪传播功能。这是因为 Kafka 绑定器使用的 Spring for Apache Kafka 库不支持批次监听器的跟踪;它只支持记录监听器。在批次监听器中,接收到的记录可能来自多个主题/分区,也可能来自多个生产者,其中添加跟踪信息是可选的。由于批次中的记录之间可能没有关联,框架无法对跟踪它们做出任何假设,例如将它们提供为单个跟踪 ID 等。如果使用 Message<List<String>> 的类型签名,则可以获取一个名为 kafka_batchConvertedHeaders 的标头,其中包含一个与有效负载条目数量相同的列表。此列表包含一个 Map,其中包含跟踪标头。但是,应用程序需要自行迭代此列表并启动观察。