消费批次
从 3.0 版本开始,当 spring.cloud.stream.bindings.<name>.consumer.batch-mode 设置为 true 时,通过轮询 Kafka Consumer 接收到的所有记录都将作为 List<?> 呈现给监听器方法。否则,该方法将一次接收一条记录。批量的大小由 Kafka 消费者属性 max.poll.records、fetch.min.bytes、fetch.max.wait.ms 控制;有关更多信息,请参阅 Kafka 文档。
接收批次时,允许使用以下类型签名:
List<Person>
Message<List<Person>>
在 List<Person> 的第一个选项中,监听器将不会获取任何消息头。如果使用第二个类型签名 (Message<List<Person>>),则可以访问消息头;但是,所有消息头仍将以 Collection 的形式存在。让我们看以下示例。
假设 Message 包含一个包含十个 Person 对象的列表。Message 的 MessageHeaders 包含一个消息头映射,其中键是消息头名称,值是一个列表。此列表包含该消息头的值,顺序与有效载荷列表相同。因此,应用程序需要根据有效载荷列表的迭代,从 MessageHeaders 映射中正确访问消息头。
请注意,在批处理模式下消费时,不允许使用 List<Message<Person>> 形式的类型签名。
从 4.0.2 版本开始,该绑定器在批量消费模式下支持 DLQ 功能。请记住,当在批量模式下对消费者绑定使用 DLQ 时,从上一次轮询接收到的所有记录都将发送到 DLQ 主题。
在批量模式下使用时,绑定器内的重试不受支持,因此 maxAttempts 将被覆盖为 1。您可以配置 DefaultErrorHandler(使用 ListenerContainerCustomizer)以实现与绑定器中重试类似的功能。您还可以使用手动 AckMode 并调用 Ackowledgment.nack(index, sleep) 来提交部分批次的偏移量,并让剩余记录重新传递。有关这些技术的更多信息,请参阅 Spring for Apache Kafka 文档。 |
在批量模式下接收 KafkaNull 对象时,接收到的列表将包含对应 KafkaNull 对象的空元素。这对于 List<Person> 和 Message<List<Person>> 样式类型签名都适用。 |
批量消费模式下的可观测性
在批量消费记录时,不支持直接的观察跟踪传播功能。这是因为 Kafka 绑定器使用的 Spring for Apache Kafka 库不支持批处理监听器的跟踪;它只支持记录监听器。在批处理监听器中,接收到的记录可能来自多个主题/分区和多个生产者,其中添加跟踪信息是可选的。由于批处理中的记录之间可能没有任何关联,框架无法对它们进行任何跟踪假设,例如将它们作为单个跟踪 ID 提供等。如果您使用 Message<List<String>> 的类型签名,则可以获得一个名为 kafka_batchConvertedHeaders 的消息头,其中包含一个与有效负载具有相同数量条目的列表。此列表包含一个包含跟踪消息头的 Map。但是,应用程序需要正确迭代此列表并开始观察。