空消息体和'墓碑'记录的对数压缩

当您使用日志压缩时,您可以发送和接收具有null消息体的消息以标识键的删除。

您也可能由于其他原因收到null值,例如反序列化器在无法反序列化值时可能会返回null

要使用KafkaTemplate发送null消息体,您可以将null传递到send()方法的值参数中。一个例外是send(Message<?> message)变体。由于spring-messaging Message<?>不能具有null消息体,因此您可以使用称为KafkaNull的特殊消息体类型,框架会发送null。为方便起见,提供了静态的KafkaNull.INSTANCE

当您使用消息监听器容器时,接收到的ConsumerRecord具有一个null value()

要配置@KafkaListener以处理null消息体,您必须使用@Payload注解并设置required = false。如果它是已压缩日志的墓碑消息,您通常也需要键,以便您的应用程序可以确定哪个键被“删除”。以下示例显示了这样的配置

@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
    // value == null represents key deletion
}

当您使用具有多个@KafkaHandler方法的类级@KafkaListener时,需要一些额外的配置。具体来说,您需要一个具有KafkaNull消息体的@KafkaHandler方法。以下示例显示了如何配置一个

@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {

    @KafkaHandler
    public void listen(String cat) {
        ...
    }

    @KafkaHandler
    public void listen(Integer hat) {
        ...
    }

    @KafkaHandler
    public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) {
        ...
    }

}

请注意,参数为null,而不是KafkaNull

此功能需要使用KafkaNullAwarePayloadArgumentResolver,框架在使用默认的MessageHandlerMethodFactory时会进行配置。当使用自定义的MessageHandlerMethodFactory时,请参阅将自定义HandlerMethodArgumentResolver添加到@KafkaListener