使用记录

在上面的 upppercase 函数中,我们以 Flux<String> 的形式消费记录,然后以 Flux<String> 的形式生成记录。有时,您可能需要以原始接收格式(ReceiverRecord)接收记录。以下是一个这样的函数。

@Bean
public Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<String>> lowercase() {
    return s -> s.map(rec -> new String(rec.value()).toLowerCase());
}

在此函数中,请注意,我们以 Flux<ReceiverRecord<byte[], byte[]>> 的形式消费记录,然后以 Flux<String> 的形式生成记录。ReceiverRecord 是基本接收记录,它是 Reactor Kafka 中的专门 Kafka ConsumerRecord。使用反应式 Kafka 绑定时,上述函数将使您能够访问传入记录的 ReceiverRecord 类型。但是,在这种情况下,您需要为 RecordMessageConverter 提供自定义实现。默认情况下,反应式 Kafka 绑定使用 MessagingMessageConverter,该转换器从 ConsumerRecord 转换有效负载和标头。因此,当您的处理程序方法接收到它时,有效负载已从接收记录中提取并作为第一个函数的情况传递给该方法。通过在应用程序中提供自定义 RecordMessageConverter 实现,您可以覆盖默认行为。例如,如果您想以原始 Flux<ReceiverRecord<byte[], byte[]>> 的形式消费记录,则可以在应用程序中提供以下 bean 定义。

@Bean
RecordMessageConverter fullRawReceivedRecord() {
    return new RecordMessageConverter() {

        private final RecordMessageConverter converter = new MessagingMessageConverter();

        @Override
        public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
                Consumer<?, ?> consumer, Type payloadType) {
            return MessageBuilder.withPayload(record).build();
        }

        @Override
        public ProducerRecord<?, ?> fromMessage(Message<?> message, String defaultTopic) {
            return this.converter.fromMessage(message, defaultTopic);
        }

    };
}

然后,您需要指示框架对所需的绑定使用此转换器。以下是一个基于我们的 lowercase 函数的示例。

spring.cloud.stream.kafka.bindings.lowercase-in-0.consumer.converterBeanName=fullRawReceivedRecord"

lowercase-in-0 是我们的 lowercase 函数的输入绑定名称。对于输出(lowecase-out-0),我们仍然使用常规 MessagingMessageConverter

在上面的 toMessage 实现中,我们接收原始 ConsumerRecord(由于我们在响应式绑定器上下文中,因此为 ReceiverRecord),然后将其包装在 Message 中。然后将该消息有效负载(即 ReceiverRecord)提供给用户方法。

如果 reactiveAutoCommitfalse(默认值),则调用 rec.receiverOffset().acknowledge()(或 commit())以导致提交偏移量;如果 reactiveAutoCommittrue,则 flux 提供 ConsumerRecord。有关更多信息,请参阅 reactor-kafka 文档和 javadoc。