WebFlux 支持
WebFlux Spring Integration 模块 (spring-integration-webflux
) 允许以响应式的方式执行 HTTP 请求并处理入站 HTTP 请求。
您需要将此依赖项包含到您的项目中
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-webflux</artifactId>
<version>6.3.5</version>
</dependency>
compile "org.springframework.integration:spring-integration-webflux:6.3.5"
在非 Servlet 基于服务器配置的情况下,必须包含 io.projectreactor.netty:reactor-netty
依赖项。
WebFlux 支持包括以下网关实现:WebFluxInboundEndpoint
和 WebFluxRequestExecutingMessageHandler
。该支持完全基于 Spring WebFlux 和 Project Reactor 基础。有关更多信息,请参阅 HTTP 支持,因为许多选项在响应式和常规 HTTP 组件之间共享。
WebFlux 命名空间支持
Spring Integration 提供了一个 webflux
命名空间和相应的模式定义。要在您的配置中包含它,请在您的应用程序上下文配置文件中添加以下命名空间声明
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-webflux="http://www.springframework.org/schema/integration/webflux"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/webflux
https://www.springframework.org/schema/integration/webflux/spring-integration-webflux.xsd">
...
</beans>
WebFlux 入站组件
从版本 5.0 开始,提供了 WebFluxInboundEndpoint
的 WebHandler
实现。此组件类似于基于 MVC 的 HttpRequestHandlingEndpointSupport
,它通过新提取的 BaseHttpInboundEndpoint
与其共享一些通用选项。它用于 Spring WebFlux 响应式环境(而不是 MVC)。以下示例显示了 WebFlux 端点的简单实现
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow inboundChannelAdapterFlow() {
return IntegrationFlow
.from(WebFlux.inboundChannelAdapter("/reactivePost")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(ResolvableType.forClassWithGenerics(Flux.class, String.class))
.statusCodeFunction(m -> HttpStatus.ACCEPTED))
.channel(c -> c.queue("storeChannel"))
.get();
}
@Bean
fun inboundChannelAdapterFlow() =
integrationFlow(
WebFlux.inboundChannelAdapter("/reactivePost")
.apply {
requestMapping { m -> m.methods(HttpMethod.POST) }
requestPayloadType(ResolvableType.forClassWithGenerics(Flux::class.java, String::class.java))
statusCodeFunction { m -> HttpStatus.ACCEPTED }
})
{
channel { queue("storeChannel") }
}
@Configuration
@EnableWebFlux
@EnableIntegration
public class ReactiveHttpConfiguration {
@Bean
public WebFluxInboundEndpoint simpleInboundEndpoint() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setPathPatterns("/test");
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannelName("serviceChannel");
return endpoint;
}
@ServiceActivator(inputChannel = "serviceChannel")
String service() {
return "It works!";
}
}
<int-webflux:inbound-gateway request-channel="requests" path="/sse">
<int-webflux:request-mapping produces="text/event-stream"/>
</int-webflux:inbound-gateway>
该配置类似于 HttpRequestHandlingEndpointSupport
(在示例之前提到),除了我们使用 @EnableWebFlux
将 WebFlux 基础结构添加到我们的集成应用程序中。此外,WebFluxInboundEndpoint
通过使用背压、按需功能(由响应式 HTTP 服务器实现提供)对下游流执行 sendAndReceive
操作。
回复部分也是非阻塞的,并且基于内部 FutureReplyChannel ,该通道被扁平映射到一个回复 Mono 以进行按需解析。 |
您可以使用自定义 ServerCodecConfigurer
、RequestedContentTypeResolver
,甚至 ReactiveAdapterRegistry
配置 WebFluxInboundEndpoint
。后者提供了一种机制,您可以使用它以任何响应式类型返回回复:Reactor Flux
、RxJava Observable
、Flowable
等。这样,我们可以使用 Spring Integration 组件实现 服务器发送事件 场景,如下例所示
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow sseFlow() {
return IntegrationFlow
.from(WebFlux.inboundGateway("/sse")
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
.handle((p, h) -> Flux.just("foo", "bar", "baz"))
.get();
}
@Bean
fun sseFlow() =
integrationFlow(
WebFlux.inboundGateway("/sse")
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
{
handle { (p, h) -> Flux.just("foo", "bar", "baz") }
}
@Bean
public WebFluxInboundEndpoint webfluxInboundGateway() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setPathPatterns("/sse");
requestMapping.setProduces(MediaType.TEXT_EVENT_STREAM_VALUE);
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannelName("requests");
return endpoint;
}
<int-webflux:inbound-channel-adapter id="reactiveFullConfig" channel="requests"
path="test1"
auto-startup="false"
phase="101"
request-payload-type="byte[]"
error-channel="errorChannel"
payload-expression="payload"
supported-methods="PUT"
status-code-expression="'202'"
header-mapper="headerMapper"
codec-configurer="codecConfigurer"
reactive-adapter-registry="reactiveAdapterRegistry"
requested-content-type-resolver="requestedContentTypeResolver">
<int-webflux:request-mapping headers="foo"/>
<int-webflux:cross-origin origin="foo" method="PUT"/>
<int-webflux:header name="foo" expression="'foo'"/>
</int-webflux:inbound-channel-adapter>
有关更多可能的配置选项,请参阅 请求映射支持 和 跨源资源共享 (CORS) 支持。
当请求体为空或 payloadExpression
返回 null
时,请求参数 (MultiValueMap<String, String>
) 用于要处理的目标消息的 payload
。
有效负载验证
从版本 5.2 开始,WebFluxInboundEndpoint
可以使用 Validator
进行配置。与 HTTP 支持 中的 MVC 验证不同,它用于验证请求已由 HttpMessageReader
转换为的 Publisher
中的元素,然后再执行回退和 payloadExpression
函数。框架无法假设在构建最终有效负载后 Publisher
对象的复杂程度。如果需要严格限制最终有效负载(或其 Publisher
元素)的验证可见性,则验证应在下游而不是 WebFlux 端点进行。有关更多信息,请参阅 Spring WebFlux 文档。无效的有效负载将被 IntegrationWebExchangeBindException
(WebExchangeBindException
扩展)拒绝,其中包含所有验证 Errors
。有关验证的更多信息,请参阅 Spring Framework 参考手册。
WebFlux 出站组件
WebFluxRequestExecutingMessageHandler
(从版本 5.0 开始)的实现类似于 HttpRequestExecutingMessageHandler
。它使用 Spring Framework WebFlux 模块中的 WebClient
。要配置它,请定义一个类似于以下内容的 bean
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@Bean
public IntegrationFlow outboundReactive() {
return f -> f
.handle(WebFlux.<MultiValueMap<String, String>>outboundGateway(m ->
UriComponentsBuilder.fromUriString("https://127.0.0.1:8080/foo")
.queryParams(m.getPayload())
.build()
.toUri())
.httpMethod(HttpMethod.GET)
.expectedResponseType(String.class));
}
@Bean
fun outboundReactive() =
integrationFlow {
handle(
WebFlux.outboundGateway<MultiValueMap<String, String>>({ m ->
UriComponentsBuilder.fromUriString("https://127.0.0.1:8080/foo")
.queryParams(m.getPayload())
.build()
.toUri()
})
.httpMethod(HttpMethod.GET)
.expectedResponseType(String::class.java)
)
}
@ServiceActivator(inputChannel = "reactiveHttpOutRequest")
@Bean
public WebFluxRequestExecutingMessageHandler reactiveOutbound(WebClient client) {
WebFluxRequestExecutingMessageHandler handler =
new WebFluxRequestExecutingMessageHandler("https://127.0.0.1:8080/foo", client);
handler.setHttpMethod(HttpMethod.POST);
handler.setExpectedResponseType(String.class);
return handler;
}
<int-webflux:outbound-gateway id="reactiveExample1"
request-channel="requests"
url="https://127.0.0.1/test"
http-method-expression="headers.httpMethod"
extract-request-payload="false"
expected-response-type-expression="payload"
charset="UTF-8"
reply-timeout="1234"
reply-channel="replies"/>
<int-webflux:outbound-channel-adapter id="reactiveExample2"
url="https://127.0.0.1/example"
http-method="GET"
channel="requests"
charset="UTF-8"
extract-payload="false"
expected-response-type="java.lang.String"
order="3"
auto-startup="false"/>
WebClient
的 exchange()
操作返回一个 Mono<ClientResponse>
,该操作被映射(使用多个 Mono.map()
步骤)到一个 AbstractIntegrationMessageBuilder
作为 WebFluxRequestExecutingMessageHandler
的输出。与作为 outputChannel
的 ReactiveChannel
结合使用,Mono<ClientResponse>
的评估将延迟到下游订阅进行。否则,它将被视为 async
模式,并且 Mono
响应将适配到 SettableListenableFuture
以获取来自 WebFluxRequestExecutingMessageHandler
的异步回复。输出消息的目标有效负载取决于 WebFluxRequestExecutingMessageHandler
配置。setExpectedResponseType(Class<?>)
或 setExpectedResponseTypeExpression(Expression)
标识响应体元素转换的目标类型。如果 replyPayloadToFlux
设置为 true
,则响应体将转换为带有提供的 expectedResponseType
的 Flux
,用于每个元素,并且此 Flux
将作为有效负载发送到下游。之后,您可以使用 拆分器 以响应式方式遍历此 Flux
。
此外,可以将 BodyExtractor<?, ClientHttpResponse>
注入 WebFluxRequestExecutingMessageHandler
而不是 expectedResponseType
和 replyPayloadToFlux
属性。它可用于低级访问 ClientHttpResponse
以及对主体和 HTTP 标头转换的更多控制。Spring Integration 提供 ClientHttpResponseBodyExtractor
作为标识函数以生成(下游)整个 ClientHttpResponse
和任何其他可能的自定义逻辑。
从版本 5.2 开始,WebFluxRequestExecutingMessageHandler
支持响应式 Publisher
、Resource
和 MultiValueMap
类型作为请求消息有效负载。内部使用相应的 BodyInserter
填充到 WebClient.RequestBodySpec
中。当有效负载是响应式 Publisher
时,可以使用配置的 publisherElementType
或 publisherElementTypeExpression
来确定发布者元素类型的类型。表达式必须解析为 Class<?>
、String
(解析为目标 Class<?>
)或 ParameterizedTypeReference
。
从版本 5.5 开始,WebFluxRequestExecutingMessageHandler
公开了一个 extractResponseBody
标志(默认为 true
),以仅返回响应体,或返回整个 ResponseEntity
作为回复消息有效负载,独立于提供的 expectedResponseType
或 replyPayloadToFlux
。如果 ResponseEntity
中不存在主体,则忽略此标志并返回整个 ResponseEntity
。
有关更多可能的配置选项,请参阅 HTTP 出站组件。
WebFlux 标头映射
由于 WebFlux 组件完全基于 HTTP 协议,因此 HTTP 标头映射没有区别。有关更多可能的选项和用于映射标头的组件,请参阅 HTTP 标头映射。
WebFlux 请求属性
从版本 6.0 开始,可以配置 WebFluxRequestExecutingMessageHandler
以通过 setAttributeVariablesExpression()
评估请求属性。此 SpEL 表达式必须在 Map
中进行评估。然后,此类映射将传播到 WebClient.RequestBodySpec.attributes(Consumer<Map<String, Object>> attributesConsumer)
HTTP 请求配置回调。如果需要将键值对象形式的信息从 Message
传递到请求,并且下游过滤器将访问这些属性以进行进一步处理,这将非常有用。