使用 Reactive Kafka Binder 的基本示例
在本节中,我们将展示一些使用响应式绑定器编写响应式 Kafka 应用程序的基本代码片段及其相关细节。
@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
return s -> s.map(String::toUpperCase);
}
您可以将上述upppercase
函数与基于消息通道的 Kafka 绑定器(spring-cloud-stream-binder-kafka
)以及本节讨论的反应式 Kafka 绑定器(spring-cloud-stream-binder-kafka-reactive
)一起使用。当使用此函数与常规 Kafka 绑定器时,尽管您在应用程序中使用反应式类型(即在uppercase
函数中),但您只会在函数执行过程中获得反应式流。在函数执行上下文之外,没有反应式优势,因为底层绑定器不是基于反应式堆栈的。因此,虽然这看起来像是带来了完整的端到端反应式堆栈,但此应用程序只是部分反应式的。
现在假设您正在使用 Kafka 的正确反应式绑定器 - spring-cloud-stream-binder-kafka-reactive
与上述函数的应用程序。此绑定器实现将提供从链条顶端的消费到链条底端的发布的完整反应式优势。这是因为底层绑定器构建在Reactor Kafka 的核心 API 之上。在消费者端,它使用KafkaReceiver,它是 Kafka 消费者的反应式实现。类似地,在生产者端,它使用KafkaSender API,它是 Kafka 生产者的反应式实现。由于反应式 Kafka 绑定器的基础构建在适当的反应式 Kafka API 之上,因此应用程序可以充分利用使用反应式技术的优势。使用此反应式 Kafka 绑定器时,应用程序会内置自动背压等其他反应式功能。
从版本 4.0.2 开始,您可以通过分别提供一个或多个ReceiverOptionsCustomizer
或SenderOptionsCustomizer
bean 来自定义ReceiverOptions
和SenderOptions
。它们是BiFunction
,接收绑定名称和初始选项,返回自定义选项。这些接口扩展了Ordered
,因此当存在多个自定义器时,它们将按要求的顺序应用。
默认情况下,绑定器不会提交偏移量。从 4.0.2 版本开始,KafkaHeaders.ACKNOWLEDGMENT 标头包含一个 ReceiverOffset 对象,允许您通过调用其 acknowledge() 或 commit() 方法来提交偏移量。
|
@Bean
public Consumer<Flux<Message<String>> consume() {
return msg -> {
process(msg.getPayload());
msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, ReceiverOffset.class).acknowledge();
}
}
有关更多信息,请参阅 reactor-kafka
文档和 javadoc。
此外,从 4.0.3 版本开始,可以将 Kafka 消费者属性 reactiveAtmostOnce
设置为 true
,绑定器将在处理每次轮询返回的记录之前自动提交偏移量。同样,从 4.0.3 版本开始,您可以将消费者属性 reactiveAutoCommit
设置为 true
,绑定器将在处理每次轮询返回的记录之后自动提交偏移量。在这些情况下,确认标头不存在。
4.0.2 也提供了 reactiveAutoCommit ,但实现不正确,其行为类似于 reactiveAtmostOnce 。
|
以下是如何使用 reaciveAutoCommit
的示例。
@Bean
Consumer<Flux<Flux<ConsumerRecord<?, String>>>> input() {
return flux -> flux
.doOnNext(inner -> inner
.doOnNext(val -> {
log.info(val.value());
})
.subscribe())
.subscribe();
}
请注意,当使用自动提交时,reactor-kafka
返回一个 Flux<Flux<ConsumerRecord<?, ?>>>
。鉴于 Spring 无法访问内部流的内容,应用程序必须处理本机 ConsumerRecord
;不会对内容应用消息转换或转换服务。这需要使用本机解码(通过在配置中指定适当类型的 Deserializer
)来返回所需类型的记录键/值。