空消息体和“墓碑”记录的对数压缩

使用日志压缩时,您可以发送和接收具有null消息体来识别密钥的删除。您也可能由于其他原因收到null值,例如反序列化器在无法反序列化值时可能会返回null

生成空消息体

您可以通过将null消息参数值传递给send方法之一(例如)来使用ReactivePulsarTemplate发送null值。

reactiveTemplate
        .send(null, Schema.STRING)
        .subscribe();
发送空值时,必须指定模式类型,因为系统无法从null消息体确定消息的类型。

使用空消息体

对于@ReactivePularListenernull消息体将根据其消息参数的类型传递到侦听器方法,如下所示

参数类型 传递的值

基本类型

null

用户定义类型

null

org.apache.pulsar.client.api.Message<T>

非空 Pulsar 消息,其getValue()返回null

org.springframework.messaging.Message<T>

非空 Spring 消息,其getPayload()返回PulsarNull

Flux<org.apache.pulsar.client.api.Message<T>>

非空 Flux,其条目是非空 Pulsar 消息,其getValue()返回null

Flux<org.springframework.messaging.Message<T>>

非空 Flux,其条目是非空 Spring 消息,其getPayload()返回PulsarNull

当传递的值为null(即,具有基本类型或用户定义类型的单记录侦听器)时,必须将@Payload参数注释与required = false一起使用。
当对侦听器有效负载类型使用 Spring 的org.springframework.messaging.Message时,其泛型类型信息必须足够宽泛以接受Message<PulsarNull>(例如,MessageMessage<?>Message<Object>)。这是因为 Spring 消息不允许其有效负载为空,而是使用PulsarNull占位符。

如果是已压缩日志的墓碑消息,通常还需要密钥,以便您的应用程序可以确定哪个密钥被“删除”。以下示例显示了此类配置

@ReactivePulsarListener(
        topics = "my-topic",
        subscriptionName = "my-topic-sub",
        schemaType = SchemaType.STRING)
Mono<Void> myListener(
        @Payload(required = false) String msg,
        @Header(PulsarHeaders.KEY) String key) {
    ...
}
当使用流式消息侦听器(Flux)时,标头支持有限,因此在日志压缩方案中不太有用。