WebFlux 支持

WebFlux Spring Integration 模块 (spring-integration-webflux) 允许以响应式的方式执行 HTTP 请求并处理入站 HTTP 请求。

您需要将此依赖项包含到您的项目中

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-webflux</artifactId>
    <version>6.3.5</version>
</dependency>
compile "org.springframework.integration:spring-integration-webflux:6.3.5"

在非 Servlet 基于服务器配置的情况下,必须包含 io.projectreactor.netty:reactor-netty 依赖项。

WebFlux 支持包括以下网关实现:WebFluxInboundEndpointWebFluxRequestExecutingMessageHandler。该支持完全基于 Spring WebFluxProject Reactor 基础。有关更多信息,请参阅 HTTP 支持,因为许多选项在响应式和常规 HTTP 组件之间共享。

WebFlux 命名空间支持

Spring Integration 提供了一个 webflux 命名空间和相应的模式定义。要在您的配置中包含它,请在您的应用程序上下文配置文件中添加以下命名空间声明

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-webflux="http://www.springframework.org/schema/integration/webflux"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/webflux
    https://www.springframework.org/schema/integration/webflux/spring-integration-webflux.xsd">
    ...
</beans>

WebFlux 入站组件

从版本 5.0 开始,提供了 WebFluxInboundEndpointWebHandler 实现。此组件类似于基于 MVC 的 HttpRequestHandlingEndpointSupport,它通过新提取的 BaseHttpInboundEndpoint 与其共享一些通用选项。它用于 Spring WebFlux 响应式环境(而不是 MVC)。以下示例显示了 WebFlux 端点的简单实现

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@Bean
public IntegrationFlow inboundChannelAdapterFlow() {
    return IntegrationFlow
        .from(WebFlux.inboundChannelAdapter("/reactivePost")
            .requestMapping(m -> m.methods(HttpMethod.POST))
            .requestPayloadType(ResolvableType.forClassWithGenerics(Flux.class, String.class))
            .statusCodeFunction(m -> HttpStatus.ACCEPTED))
        .channel(c -> c.queue("storeChannel"))
        .get();
}
@Bean
fun inboundChannelAdapterFlow() =
    integrationFlow(
        WebFlux.inboundChannelAdapter("/reactivePost")
            .apply {
                requestMapping { m -> m.methods(HttpMethod.POST) }
                requestPayloadType(ResolvableType.forClassWithGenerics(Flux::class.java, String::class.java))
                statusCodeFunction { m -> HttpStatus.ACCEPTED }
            })
    {
        channel { queue("storeChannel") }
    }
@Configuration
@EnableWebFlux
@EnableIntegration
public class ReactiveHttpConfiguration {

    @Bean
    public WebFluxInboundEndpoint simpleInboundEndpoint() {
        WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
        RequestMapping requestMapping = new RequestMapping();
        requestMapping.setPathPatterns("/test");
        endpoint.setRequestMapping(requestMapping);
        endpoint.setRequestChannelName("serviceChannel");
        return endpoint;
    }

    @ServiceActivator(inputChannel = "serviceChannel")
    String service() {
        return "It works!";
    }

}
<int-webflux:inbound-gateway request-channel="requests" path="/sse">
    <int-webflux:request-mapping produces="text/event-stream"/>
</int-webflux:inbound-gateway>

该配置类似于 HttpRequestHandlingEndpointSupport(在示例之前提到),除了我们使用 @EnableWebFlux 将 WebFlux 基础结构添加到我们的集成应用程序中。此外,WebFluxInboundEndpoint 通过使用背压、按需功能(由响应式 HTTP 服务器实现提供)对下游流执行 sendAndReceive 操作。

回复部分也是非阻塞的,并且基于内部 FutureReplyChannel,该通道被扁平映射到一个回复 Mono 以进行按需解析。

您可以使用自定义 ServerCodecConfigurerRequestedContentTypeResolver,甚至 ReactiveAdapterRegistry 配置 WebFluxInboundEndpoint。后者提供了一种机制,您可以使用它以任何响应式类型返回回复:Reactor Flux、RxJava ObservableFlowable 等。这样,我们可以使用 Spring Integration 组件实现 服务器发送事件 场景,如下例所示

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@Bean
public IntegrationFlow sseFlow() {
    return IntegrationFlow
            .from(WebFlux.inboundGateway("/sse")
                    .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
            .handle((p, h) -> Flux.just("foo", "bar", "baz"))
            .get();
}
@Bean
fun sseFlow() =
     integrationFlow(
            WebFlux.inboundGateway("/sse")
                       .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
            {
                 handle { (p, h) -> Flux.just("foo", "bar", "baz") }
            }
@Bean
public WebFluxInboundEndpoint webfluxInboundGateway() {
    WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
    RequestMapping requestMapping = new RequestMapping();
    requestMapping.setPathPatterns("/sse");
    requestMapping.setProduces(MediaType.TEXT_EVENT_STREAM_VALUE);
    endpoint.setRequestMapping(requestMapping);
    endpoint.setRequestChannelName("requests");
    return endpoint;
}
<int-webflux:inbound-channel-adapter id="reactiveFullConfig" channel="requests"
                               path="test1"
                               auto-startup="false"
                               phase="101"
                               request-payload-type="byte[]"
                               error-channel="errorChannel"
                               payload-expression="payload"
                               supported-methods="PUT"
                               status-code-expression="'202'"
                               header-mapper="headerMapper"
                               codec-configurer="codecConfigurer"
                               reactive-adapter-registry="reactiveAdapterRegistry"
                               requested-content-type-resolver="requestedContentTypeResolver">
            <int-webflux:request-mapping headers="foo"/>
            <int-webflux:cross-origin origin="foo" method="PUT"/>
            <int-webflux:header name="foo" expression="'foo'"/>
</int-webflux:inbound-channel-adapter>

有关更多可能的配置选项,请参阅 请求映射支持跨源资源共享 (CORS) 支持

当请求体为空或 payloadExpression 返回 null 时,请求参数 (MultiValueMap<String, String>) 用于要处理的目标消息的 payload

有效负载验证

从版本 5.2 开始,WebFluxInboundEndpoint 可以使用 Validator 进行配置。与 HTTP 支持 中的 MVC 验证不同,它用于验证请求已由 HttpMessageReader 转换为的 Publisher 中的元素,然后再执行回退和 payloadExpression 函数。框架无法假设在构建最终有效负载后 Publisher 对象的复杂程度。如果需要严格限制最终有效负载(或其 Publisher 元素)的验证可见性,则验证应在下游而不是 WebFlux 端点进行。有关更多信息,请参阅 Spring WebFlux 文档。无效的有效负载将被 IntegrationWebExchangeBindExceptionWebExchangeBindException 扩展)拒绝,其中包含所有验证 Errors。有关验证的更多信息,请参阅 Spring Framework 参考手册

WebFlux 出站组件

WebFluxRequestExecutingMessageHandler(从版本 5.0 开始)的实现类似于 HttpRequestExecutingMessageHandler。它使用 Spring Framework WebFlux 模块中的 WebClient。要配置它,请定义一个类似于以下内容的 bean

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@Bean
public IntegrationFlow outboundReactive() {
    return f -> f
        .handle(WebFlux.<MultiValueMap<String, String>>outboundGateway(m ->
                UriComponentsBuilder.fromUriString("https://127.0.0.1:8080/foo")
                        .queryParams(m.getPayload())
                        .build()
                        .toUri())
                .httpMethod(HttpMethod.GET)
                .expectedResponseType(String.class));
}
@Bean
fun outboundReactive() =
    integrationFlow {
        handle(
            WebFlux.outboundGateway<MultiValueMap<String, String>>({ m ->
                UriComponentsBuilder.fromUriString("https://127.0.0.1:8080/foo")
                    .queryParams(m.getPayload())
                    .build()
                    .toUri()
            })
                .httpMethod(HttpMethod.GET)
                .expectedResponseType(String::class.java)
        )
    }
@ServiceActivator(inputChannel = "reactiveHttpOutRequest")
@Bean
public WebFluxRequestExecutingMessageHandler reactiveOutbound(WebClient client) {
    WebFluxRequestExecutingMessageHandler handler =
        new WebFluxRequestExecutingMessageHandler("https://127.0.0.1:8080/foo", client);
    handler.setHttpMethod(HttpMethod.POST);
    handler.setExpectedResponseType(String.class);
    return handler;
}
<int-webflux:outbound-gateway id="reactiveExample1"
    request-channel="requests"
    url="https://127.0.0.1/test"
    http-method-expression="headers.httpMethod"
    extract-request-payload="false"
    expected-response-type-expression="payload"
    charset="UTF-8"
    reply-timeout="1234"
    reply-channel="replies"/>

<int-webflux:outbound-channel-adapter id="reactiveExample2"
    url="https://127.0.0.1/example"
    http-method="GET"
    channel="requests"
    charset="UTF-8"
    extract-payload="false"
    expected-response-type="java.lang.String"
    order="3"
    auto-startup="false"/>

WebClientexchange() 操作返回一个 Mono<ClientResponse>,该操作被映射(使用多个 Mono.map() 步骤)到一个 AbstractIntegrationMessageBuilder 作为 WebFluxRequestExecutingMessageHandler 的输出。与作为 outputChannelReactiveChannel 结合使用,Mono<ClientResponse> 的评估将延迟到下游订阅进行。否则,它将被视为 async 模式,并且 Mono 响应将适配到 SettableListenableFuture 以获取来自 WebFluxRequestExecutingMessageHandler 的异步回复。输出消息的目标有效负载取决于 WebFluxRequestExecutingMessageHandler 配置。setExpectedResponseType(Class<?>)setExpectedResponseTypeExpression(Expression) 标识响应体元素转换的目标类型。如果 replyPayloadToFlux 设置为 true,则响应体将转换为带有提供的 expectedResponseTypeFlux,用于每个元素,并且此 Flux 将作为有效负载发送到下游。之后,您可以使用 拆分器 以响应式方式遍历此 Flux

此外,可以将 BodyExtractor<?, ClientHttpResponse> 注入 WebFluxRequestExecutingMessageHandler 而不是 expectedResponseTypereplyPayloadToFlux 属性。它可用于低级访问 ClientHttpResponse 以及对主体和 HTTP 标头转换的更多控制。Spring Integration 提供 ClientHttpResponseBodyExtractor 作为标识函数以生成(下游)整个 ClientHttpResponse 和任何其他可能的自定义逻辑。

从版本 5.2 开始,WebFluxRequestExecutingMessageHandler 支持响应式 PublisherResourceMultiValueMap 类型作为请求消息有效负载。内部使用相应的 BodyInserter 填充到 WebClient.RequestBodySpec 中。当有效负载是响应式 Publisher 时,可以使用配置的 publisherElementTypepublisherElementTypeExpression 来确定发布者元素类型的类型。表达式必须解析为 Class<?>String(解析为目标 Class<?>)或 ParameterizedTypeReference

从版本 5.5 开始,WebFluxRequestExecutingMessageHandler 公开了一个 extractResponseBody 标志(默认为 true),以仅返回响应体,或返回整个 ResponseEntity 作为回复消息有效负载,独立于提供的 expectedResponseTypereplyPayloadToFlux。如果 ResponseEntity 中不存在主体,则忽略此标志并返回整个 ResponseEntity

有关更多可能的配置选项,请参阅 HTTP 出站组件

WebFlux 标头映射

由于 WebFlux 组件完全基于 HTTP 协议,因此 HTTP 标头映射没有区别。有关更多可能的选项和用于映射标头的组件,请参阅 HTTP 标头映射

WebFlux 请求属性

从版本 6.0 开始,可以配置 WebFluxRequestExecutingMessageHandler 以通过 setAttributeVariablesExpression() 评估请求属性。此 SpEL 表达式必须在 Map 中进行评估。然后,此类映射将传播到 WebClient.RequestBodySpec.attributes(Consumer<Map<String, Object>> attributesConsumer) HTTP 请求配置回调。如果需要将键值对象形式的信息从 Message 传递到请求,并且下游过滤器将访问这些属性以进行进一步处理,这将非常有用。