空消息体和'墓碑'记录的对数压缩
当您使用日志压缩时,您可以发送和接收具有null
消息体的消息以标识键的删除。
您也可能由于其他原因收到null
值,例如反序列化器
在无法反序列化值时可能会返回null
。
要使用KafkaTemplate
发送null
消息体,您可以将null传递到send()
方法的值参数中。一个例外是send(Message<?> message)
变体。由于spring-messaging
Message<?>
不能具有null
消息体,因此您可以使用称为KafkaNull
的特殊消息体类型,框架会发送null
。为方便起见,提供了静态的KafkaNull.INSTANCE
。
当您使用消息监听器容器时,接收到的ConsumerRecord
具有一个null
value()
。
要配置@KafkaListener
以处理null
消息体,您必须使用@Payload
注解并设置required = false
。如果它是已压缩日志的墓碑消息,您通常也需要键,以便您的应用程序可以确定哪个键被“删除
”。以下示例显示了这样的配置
@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
// value == null represents key deletion
}
当您使用具有多个@KafkaHandler
方法的类级@KafkaListener
时,需要一些额外的配置。具体来说,您需要一个具有KafkaNull
消息体的@KafkaHandler
方法。以下示例显示了如何配置一个
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String cat) {
...
}
@KafkaHandler
public void listen(Integer hat) {
...
}
@KafkaHandler
public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) {
...
}
}
请注意,参数为null
,而不是KafkaNull
。
请参阅手动分配所有分区。 |
此功能需要使用KafkaNullAwarePayloadArgumentResolver ,框架在使用默认的MessageHandlerMethodFactory 时会进行配置。当使用自定义的MessageHandlerMethodFactory 时,请参阅将自定义HandlerMethodArgumentResolver 添加到@KafkaListener 。 |