reactive()
端点
从 5.5 版本开始,ConsumerEndpointSpec
提供了一个 reactive()
配置属性,它带有一个可选的自定义器 Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>
。此选项将目标端点配置为 ReactiveStreamsConsumer
实例,独立于输入通道类型,该类型通过 IntegrationReactiveUtils.messageChannelToFlux()
转换为 Flux
。提供的函数从 Flux.transform()
运算符中使用,用于自定义(publishOn()
、log()
、doOnNext()
等)来自输入通道的响应式流源。
以下示例演示了如何独立于最终订阅者和生产者,从输入通道更改发布线程到 DirectChannel
。
@Bean
public IntegrationFlow reactiveEndpointFlow() {
return IntegrationFlow
.from("inputChannel")
.transformWith(t -> t
.<String, Integer>transformer(Integer::parseInt)
.reactive(flux -> flux.publishOn(Schedulers.parallel()))
)
.get();
}
有关更多信息,请参阅 响应式流支持。