空有效负载和“墓碑”记录的日志压缩
当您使用 日志压缩 时,您可以发送和接收带有 null
有效负载的消息,以标识键的删除。
您还可以出于其他原因接收 null
值,例如当 Deserializer
在无法反序列化值时可能返回 null
。
要使用 KafkaTemplate
发送 null
有效负载,可以将 null 传递到 send()
方法的 value 参数中。一个例外是 send(Message<?> message)
变体。由于 spring-messaging
Message<?>
不能有 null
有效负载,因此可以使用称为 KafkaNull
的特殊有效负载类型,并且框架发送 null
。为方便起见,提供了静态 KafkaNull.INSTANCE
。
使用消息侦听器容器时,接收的 ConsumerRecord
有一个 null
value()
。
要配置 @KafkaListener
以处理 null
有效负载,必须将 @Payload
注释与 required = false
一起使用。如果它是紧凑日志的墓碑消息,通常还需要密钥,以便应用程序可以确定哪个密钥被“删除
”。以下示例显示了这样的配置
@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
// value == null represents key deletion
}
使用带有多个 @KafkaHandler
方法的类级别 @KafkaListener
时,需要一些额外的配置。具体来说,需要一个带有 KafkaNull
有效负载的 @KafkaHandler
方法。以下示例演示如何配置一个
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String cat) {
...
}
@KafkaHandler
public void listen(Integer hat) {
...
}
@KafkaHandler
public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) {
...
}
}
请注意,参数是 null
,而不是 KafkaNull
。
请参阅 手动分配所有分区。 |
此功能需要使用 KafkaNullAwarePayloadArgumentResolver ,框架在使用默认 MessageHandlerMethodFactory 时会对其进行配置。使用自定义 MessageHandlerMethodFactory 时,请参阅 向 @KafkaListener 添加自定义 HandlerMethodArgumentResolver 。
|