WebFlux 支持
WebFlux Spring 集成模块 (spring-integration-webflux
) 允许以响应式方式执行 HTTP 请求和处理入站 HTTP 请求。
您需要将此依赖项包含到您的项目中
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-webflux</artifactId>
<version>6.3.0</version>
</dependency>
compile "org.springframework.integration:spring-integration-webflux:6.3.0"
如果使用非 Servlet 服务器配置,则必须包含 io.projectreactor.netty:reactor-netty
依赖项。
WebFlux 支持包含以下网关实现:WebFluxInboundEndpoint
和 WebFluxRequestExecutingMessageHandler
。该支持完全基于 Spring WebFlux 和 Project Reactor 基础。有关更多信息,请参阅 HTTP 支持,因为许多选项在反应式和常规 HTTP 组件之间共享。
WebFlux 命名空间支持
Spring Integration 提供了一个 webflux
命名空间和相应的模式定义。要在您的配置中包含它,请在您的应用程序上下文配置文件中添加以下命名空间声明
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-webflux="http://www.springframework.org/schema/integration/webflux"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/webflux
https://www.springframework.org/schema/integration/webflux/spring-integration-webflux.xsd">
...
</beans>
WebFlux 入站组件
从版本 5.0 开始,提供了 WebFluxInboundEndpoint
的 WebHandler
实现。此组件类似于基于 MVC 的 HttpRequestHandlingEndpointSupport
,它通过新提取的 BaseHttpInboundEndpoint
与其共享一些通用选项。它用于 Spring WebFlux 反应式环境(而不是 MVC)。以下示例显示了 WebFlux 端点的简单实现
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow inboundChannelAdapterFlow() {
return IntegrationFlow
.from(WebFlux.inboundChannelAdapter("/reactivePost")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(ResolvableType.forClassWithGenerics(Flux.class, String.class))
.statusCodeFunction(m -> HttpStatus.ACCEPTED))
.channel(c -> c.queue("storeChannel"))
.get();
}
@Bean
fun inboundChannelAdapterFlow() =
integrationFlow(
WebFlux.inboundChannelAdapter("/reactivePost")
.apply {
requestMapping { m -> m.methods(HttpMethod.POST) }
requestPayloadType(ResolvableType.forClassWithGenerics(Flux::class.java, String::class.java))
statusCodeFunction { m -> HttpStatus.ACCEPTED }
})
{
channel { queue("storeChannel") }
}
@Configuration
@EnableWebFlux
@EnableIntegration
public class ReactiveHttpConfiguration {
@Bean
public WebFluxInboundEndpoint simpleInboundEndpoint() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setPathPatterns("/test");
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannelName("serviceChannel");
return endpoint;
}
@ServiceActivator(inputChannel = "serviceChannel")
String service() {
return "It works!";
}
}
<int-webflux:inbound-gateway request-channel="requests" path="/sse">
<int-webflux:request-mapping produces="text/event-stream"/>
</int-webflux:inbound-gateway>
该配置类似于 HttpRequestHandlingEndpointSupport
(在示例之前提到),除了我们使用 @EnableWebFlux
将 WebFlux 基础设施添加到我们的集成应用程序中。此外,WebFluxInboundEndpoint
通过使用反应式 HTTP 服务器实现提供的背压、按需功能,对下游流执行 sendAndReceive
操作。
回复部分也是非阻塞的,并且基于内部 FutureReplyChannel ,该通道被扁平映射到一个回复 Mono 以进行按需解析。
|
您可以使用自定义 ServerCodecConfigurer
、RequestedContentTypeResolver
甚至 ReactiveAdapterRegistry
来配置 WebFluxInboundEndpoint
。后者提供了一种机制,您可以使用它以任何反应式类型返回回复:Reactor Flux
、RxJava Observable
、Flowable
等。这样,我们可以使用 Spring Integration 组件实现 服务器发送事件 场景,如下例所示
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow sseFlow() {
return IntegrationFlow
.from(WebFlux.inboundGateway("/sse")
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
.handle((p, h) -> Flux.just("foo", "bar", "baz"))
.get();
}
@Bean
fun sseFlow() =
integrationFlow(
WebFlux.inboundGateway("/sse")
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
{
handle { (p, h) -> Flux.just("foo", "bar", "baz") }
}
@Bean
public WebFluxInboundEndpoint webfluxInboundGateway() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setPathPatterns("/sse");
requestMapping.setProduces(MediaType.TEXT_EVENT_STREAM_VALUE);
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannelName("requests");
return endpoint;
}
<int-webflux:inbound-channel-adapter id="reactiveFullConfig" channel="requests"
path="test1"
auto-startup="false"
phase="101"
request-payload-type="byte[]"
error-channel="errorChannel"
payload-expression="payload"
supported-methods="PUT"
status-code-expression="'202'"
header-mapper="headerMapper"
codec-configurer="codecConfigurer"
reactive-adapter-registry="reactiveAdapterRegistry"
requested-content-type-resolver="requestedContentTypeResolver">
<int-webflux:request-mapping headers="foo"/>
<int-webflux:cross-origin origin="foo" method="PUT"/>
<int-webflux:header name="foo" expression="'foo'"/>
</int-webflux:inbound-channel-adapter>
有关更多可能的配置选项,请参阅 请求映射支持 和 跨域资源共享 (CORS) 支持。
当请求体为空或payloadExpression
返回null
时,请求参数(MultiValueMap<String, String>
)将用作要处理的目标消息的payload
。
有效载荷验证
从版本 5.2 开始,WebFluxInboundEndpoint
可以配置一个 Validator
。与 HTTP 支持 中的 MVC 验证不同,它用于在执行回退和 payloadExpression
函数之前验证 Publisher
中的元素,请求已通过 HttpMessageReader
转换为该元素。框架无法假设在构建最终有效载荷后 Publisher
对象的复杂程度。如果需要限制对最终有效载荷(或其 Publisher
元素)的验证可见性,则验证应下移而不是 WebFlux 端点。有关更多信息,请参阅 Spring WebFlux 文档。无效的有效载荷将被拒绝,并出现 IntegrationWebExchangeBindException
(WebExchangeBindException
的扩展),其中包含所有验证 Errors
。有关验证的更多信息,请参阅 Spring Framework 参考手册。
WebFlux 出站组件
WebFluxRequestExecutingMessageHandler
(从版本 5.0 开始)的实现类似于 HttpRequestExecutingMessageHandler
。它使用来自 Spring Framework WebFlux 模块的 WebClient
。要配置它,请定义一个类似于以下内容的 bean
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow outboundReactive() {
return f -> f
.handle(WebFlux.<MultiValueMap<String, String>>outboundGateway(m ->
UriComponentsBuilder.fromUriString("https://127.0.0.1:8080/foo")
.queryParams(m.getPayload())
.build()
.toUri())
.httpMethod(HttpMethod.GET)
.expectedResponseType(String.class));
}
@Bean
fun outboundReactive() =
integrationFlow {
handle(
WebFlux.outboundGateway<MultiValueMap<String, String>>({ m ->
UriComponentsBuilder.fromUriString("https://127.0.0.1:8080/foo")
.queryParams(m.getPayload())
.build()
.toUri()
})
.httpMethod(HttpMethod.GET)
.expectedResponseType(String::class.java)
)
}
@ServiceActivator(inputChannel = "reactiveHttpOutRequest")
@Bean
public WebFluxRequestExecutingMessageHandler reactiveOutbound(WebClient client) {
WebFluxRequestExecutingMessageHandler handler =
new WebFluxRequestExecutingMessageHandler("https://127.0.0.1:8080/foo", client);
handler.setHttpMethod(HttpMethod.POST);
handler.setExpectedResponseType(String.class);
return handler;
}
<int-webflux:outbound-gateway id="reactiveExample1"
request-channel="requests"
url="https://127.0.0.1/test"
http-method-expression="headers.httpMethod"
extract-request-payload="false"
expected-response-type-expression="payload"
charset="UTF-8"
reply-timeout="1234"
reply-channel="replies"/>
<int-webflux:outbound-channel-adapter id="reactiveExample2"
url="https://127.0.0.1/example"
http-method="GET"
channel="requests"
charset="UTF-8"
extract-payload="false"
expected-response-type="java.lang.String"
order="3"
auto-startup="false"/>
WebClient
的 exchange()
操作返回一个 Mono<ClientResponse>
,它被映射(通过使用多个 Mono.map()
步骤)到一个 AbstractIntegrationMessageBuilder
作为 WebFluxRequestExecutingMessageHandler
的输出。与作为 outputChannel
的 ReactiveChannel
一起,Mono<ClientResponse>
的评估被推迟到下游订阅被创建。否则,它被视为 async
模式,并且 Mono
响应被适配到 SettableListenableFuture
用于从 WebFluxRequestExecutingMessageHandler
获取异步回复。输出消息的目标有效负载取决于 WebFluxRequestExecutingMessageHandler
的配置。setExpectedResponseType(Class<?>)
或 setExpectedResponseTypeExpression(Expression)
标识响应体元素转换的目标类型。如果 replyPayloadToFlux
设置为 true
,则响应体将被转换为具有提供的 expectedResponseType
的 Flux
,并且此 Flux
将作为有效负载发送到下游。之后,您可以使用 splitter 以响应式方式迭代此 Flux
。
此外,可以将 BodyExtractor<?, ClientHttpResponse>
注入到 WebFluxRequestExecutingMessageHandler
中,而不是 expectedResponseType
和 replyPayloadToFlux
属性。它可以用于对 ClientHttpResponse
进行低级访问,并对主体和 HTTP 头部转换进行更多控制。Spring Integration 提供 ClientHttpResponseBodyExtractor
作为身份函数来生成(下游)整个 ClientHttpResponse
和任何其他可能的自定义逻辑。
从版本 5.2 开始,WebFluxRequestExecutingMessageHandler
支持响应式 Publisher
、Resource
和 MultiValueMap
类型作为请求消息有效负载。一个相应的 BodyInserter
被内部使用,以填充到 WebClient.RequestBodySpec
中。当有效负载是响应式 Publisher
时,可以使用配置的 publisherElementType
或 publisherElementTypeExpression
来确定发布者元素类型的类型。表达式必须解析为 Class<?>
、String
(解析为目标 Class<?>
)或 ParameterizedTypeReference
。
从版本 5.5 开始,WebFluxRequestExecutingMessageHandler
公开了一个 extractResponseBody
标志(默认情况下为 true
),用于仅返回响应体,或返回整个 ResponseEntity
作为回复消息有效负载,独立于提供的 expectedResponseType
或 replyPayloadToFlux
。如果 ResponseEntity
中没有主体,则忽略此标志,并返回整个 ResponseEntity
。
有关更多可能的配置选项,请参阅 HTTP 出站组件。
WebFlux 头部映射
由于 WebFlux 组件完全基于 HTTP 协议,因此 HTTP 头部映射没有区别。有关更多可能的选项和用于映射头部的组件,请参阅 HTTP 头部映射。
WebFlux 请求属性
从 6.0 版本开始,WebFluxRequestExecutingMessageHandler
可以通过 setAttributeVariablesExpression()
配置为评估请求属性。此 SpEL 表达式必须在 Map
中进行评估。然后,此类映射将传播到 WebClient.RequestBodySpec.attributes(Consumer<Map<String, Object>> attributesConsumer)
HTTP 请求配置回调。如果需要将键值对象形式的信息从 Message
传递到请求,并且下游过滤器需要访问这些属性以进行进一步处理,这将非常有用。