Reactive 核心
spring-web
模块包含以下对响应式 Web 应用程序的基础支持
-
对于服务器请求处理,有两个级别的支持。
-
HttpHandler: 用于 HTTP 请求处理的基本契约,具有非阻塞 I/O 和响应式流背压,以及 Reactor Netty、Undertow、Tomcat、Jetty 和任何 Servlet 容器的适配器。
-
WebHandler
API: 基于请求处理的通用 Web API,级别略高,在此基础上构建了诸如带注释的控制器和函数式端点等具体编程模型。
-
-
对于客户端,有一个基本的
ClientHttpConnector
合同,用于执行具有非阻塞 I/O 和 Reactive Streams 背压的 HTTP 请求,以及针对 Reactor Netty、响应式 Jetty HttpClient 和 Apache HttpComponents 的适配器。应用程序中使用的更高级别的 WebClient 基于此基本合同。 -
对于客户端和服务器,编解码器 用于 HTTP 请求和响应内容的序列化和反序列化。
HttpHandler
HttpHandler 是一个简单的契约,只有一个方法来处理请求和响应。它有意保持最小化,其主要且唯一的目的是作为不同 HTTP 服务器 API 的最小抽象。
下表描述了支持的服务器 API
服务器名称 | 使用的服务器 API | Reactive Streams 支持 |
---|---|---|
Netty |
Netty API |
|
Undertow |
Undertow API |
spring-web: Undertow 到 Reactive Streams 的桥接 |
Tomcat |
Servlet 非阻塞 I/O;Tomcat API 用于读取和写入 ByteBuffers 与 byte[] |
spring-web: Servlet 非阻塞 I/O 到 Reactive Streams 的桥接 |
Jetty |
Servlet 非阻塞 I/O;Jetty API 用于写入 ByteBuffers 与 byte[] |
spring-web: Servlet 非阻塞 I/O 到 Reactive Streams 的桥接 |
Servlet 容器 |
Servlet 非阻塞 I/O |
spring-web: Servlet 非阻塞 I/O 到 Reactive Streams 的桥接 |
下表描述了服务器依赖项(另请参阅 支持的版本)
服务器名称 | 组 ID | 构件名称 |
---|---|---|
Reactor Netty |
io.projectreactor.netty |
reactor-netty |
Undertow |
io.undertow |
undertow-core |
Tomcat |
org.apache.tomcat.embed |
tomcat-embed-core |
Jetty |
org.eclipse.jetty |
jetty-server, jetty-servlet |
以下代码片段展示了使用 HttpHandler
适配器与每个服务器 API 的示例
Reactor Netty
-
Java
-
Kotlin
HttpHandler handler = ...
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
HttpServer.create().host(host).port(port).handle(adapter).bindNow();
val handler: HttpHandler = ...
val adapter = ReactorHttpHandlerAdapter(handler)
HttpServer.create().host(host).port(port).handle(adapter).bindNow()
Undertow
-
Java
-
Kotlin
HttpHandler handler = ...
UndertowHttpHandlerAdapter adapter = new UndertowHttpHandlerAdapter(handler);
Undertow server = Undertow.builder().addHttpListener(port, host).setHandler(adapter).build();
server.start();
val handler: HttpHandler = ...
val adapter = UndertowHttpHandlerAdapter(handler)
val server = Undertow.builder().addHttpListener(port, host).setHandler(adapter).build()
server.start()
Tomcat
-
Java
-
Kotlin
HttpHandler handler = ...
Servlet servlet = new TomcatHttpHandlerAdapter(handler);
Tomcat server = new Tomcat();
File base = new File(System.getProperty("java.io.tmpdir"));
Context rootContext = server.addContext("", base.getAbsolutePath());
Tomcat.addServlet(rootContext, "main", servlet);
rootContext.addServletMappingDecoded("/", "main");
server.setHost(host);
server.setPort(port);
server.start();
val handler: HttpHandler = ...
val servlet = TomcatHttpHandlerAdapter(handler)
val server = Tomcat()
val base = File(System.getProperty("java.io.tmpdir"))
val rootContext = server.addContext("", base.absolutePath)
Tomcat.addServlet(rootContext, "main", servlet)
rootContext.addServletMappingDecoded("/", "main")
server.host = host
server.setPort(port)
server.start()
Jetty
-
Java
-
Kotlin
HttpHandler handler = ...
Servlet servlet = new JettyHttpHandlerAdapter(handler);
Server server = new Server();
ServletContextHandler contextHandler = new ServletContextHandler(server, "");
contextHandler.addServlet(new ServletHolder(servlet), "/");
contextHandler.start();
ServerConnector connector = new ServerConnector(server);
connector.setHost(host);
connector.setPort(port);
server.addConnector(connector);
server.start();
val handler: HttpHandler = ...
val servlet = JettyHttpHandlerAdapter(handler)
val server = Server()
val contextHandler = ServletContextHandler(server, "")
contextHandler.addServlet(ServletHolder(servlet), "/")
contextHandler.start();
val connector = ServerConnector(server)
connector.host = host
connector.port = port
server.addConnector(connector)
server.start()
Servlet 容器
要将应用程序部署为 WAR 文件到任何 Servlet 容器,您可以扩展并包含 AbstractReactiveWebInitializer
在 WAR 文件中。该类将 HttpHandler
包装在 ServletHttpHandlerAdapter
中,并将其注册为 Servlet
。
WebHandler
API
org.springframework.web.server
包基于 HttpHandler
契约,提供了一个通用的 Web API,用于通过多个 WebExceptionHandler
、多个 WebFilter
和单个 WebHandler
组件链来处理请求。该链可以通过 WebHttpHandlerBuilder
构建,只需指向一个 Spring ApplicationContext
,其中组件是 自动检测 的,或者通过向构建器注册组件。
虽然 HttpHandler
的目标是抽象不同 HTTP 服务器的使用,但 WebHandler
API 旨在提供更广泛的功能集,这些功能通常用于 Web 应用程序,例如
-
带有属性的用户会话。
-
请求属性。
-
为请求解析的
Locale
或Principal
。 -
访问解析的和缓存的表单数据。
-
多部分数据的抽象。
-
等等。
特殊 Bean 类型
下表列出了 WebHttpHandlerBuilder
可以在 Spring ApplicationContext 中自动检测到的组件,或者可以直接向其注册的组件。
Bean 名称 | Bean 类型 | 数量 | 描述 |
---|---|---|---|
<any> |
|
0..N |
提供对来自 |
<any> |
|
0..N |
在过滤器链的其余部分和目标 |
|
|
1 |
请求的处理程序。 |
|
|
0..1 |
通过 |
|
|
0..1 |
用于访问 |
|
|
0..1 |
通过 |
|
|
0..1 |
用于处理转发类型标头,可以通过提取和删除它们,或者只删除它们。默认情况下不使用。 |
表单数据
ServerWebExchange
公开了以下方法用于访问表单数据
-
Java
-
Kotlin
Mono<MultiValueMap<String, String>> getFormData();
suspend fun getFormData(): MultiValueMap<String, String>
DefaultServerWebExchange
使用配置的 HttpMessageReader
将表单数据 (application/x-www-form-urlencoded
) 解析为 MultiValueMap
。默认情况下,FormHttpMessageReader
被配置为 ServerCodecConfigurer
bean 使用(参见 Web 处理程序 API)。
多部分数据
ServerWebExchange
公开了以下方法用于访问多部分数据
-
Java
-
Kotlin
Mono<MultiValueMap<String, Part>> getMultipartData();
suspend fun getMultipartData(): MultiValueMap<String, Part>
DefaultServerWebExchange
使用配置的 HttpMessageReader<MultiValueMap<String, Part>>
将 multipart/form-data
、multipart/mixed
和 multipart/related
内容解析为 MultiValueMap
。默认情况下,这是 DefaultPartHttpMessageReader
,它没有任何第三方依赖项。或者,可以使用 SynchronossPartHttpMessageReader
,它基于 Synchronoss NIO Multipart 库。两者都通过 ServerCodecConfigurer
bean 配置(参见 Web 处理程序 API)。
要以流式方式解析多部分数据,可以使用 PartEventHttpMessageReader
返回的 Flux<PartEvent>
,而不是使用 @RequestPart
,因为这意味着按名称对各个部分进行类似 Map
的访问,因此需要完全解析多部分数据。相反,您可以使用 @RequestBody
将内容解码为 Flux<PartEvent>
,而无需收集到 MultiValueMap
中。
转发标头
当请求通过负载均衡器等代理时,主机、端口和方案可能会发生变化,这使得从客户端角度创建指向正确主机、端口和方案的链接成为一项挑战。
RFC 7239 定义了 Forwarded
HTTP 标头,代理可以使用它来提供有关原始请求的信息。
非标准标头
还有其他非标准标头,包括 X-Forwarded-Host
、X-Forwarded-Port
、X-Forwarded-Proto
、X-Forwarded-Ssl
和 X-Forwarded-Prefix
。
X-Forwarded-Host
虽然不是标准,但 X-Forwarded-Host: <host>
是一个事实上的标准头,用于将原始主机信息传递给下游服务器。例如,如果将 example.com/resource
的请求发送到一个代理,代理将请求转发到 localhost:8080/resource
,那么可以发送 X-Forwarded-Host: example.com
头来告知服务器原始主机是 example.com
。
X-Forwarded-Port
虽然不是标准,但 X-Forwarded-Port: <port>
是一个事实上的标准头,用于将原始端口信息传递给下游服务器。例如,如果将 example.com/resource
的请求发送到一个代理,代理将请求转发到 localhost:8080/resource
,那么可以发送 X-Forwarded-Port: 443
头来告知服务器原始端口是 443
。
X-Forwarded-Proto
虽然不是标准,但 X-Forwarded-Proto: (https|http)
是一个事实上的标准头,用于将原始协议(例如 https / https)信息传递给下游服务器。例如,如果将 example.com/resource
的请求发送到一个代理,代理将请求转发到 localhost:8080/resource
,那么可以发送 X-Forwarded-Proto: https
头来告知服务器原始协议是 https
。
X-Forwarded-Ssl
虽然不是标准,但 X-Forwarded-Ssl: (on|off)
是一个事实上的标准头,用于将原始协议(例如 https / https)信息传递给下游服务器。例如,如果将 example.com/resource
的请求发送到一个代理,代理将请求转发到 localhost:8080/resource
,那么可以发送 X-Forwarded-Ssl: on
头来告知服务器原始协议是 https
。
X-Forwarded-Prefix
虽然不是标准,但 X-Forwarded-Prefix: <prefix>
是一个事实上的标准头,用于将原始 URL 路径前缀信息传递给下游服务器。
X-Forwarded-Prefix
的使用因部署场景而异,需要灵活地允许替换、删除或添加目标服务器的路径前缀。
场景 1:覆盖路径前缀
https://example.com/api/{path} -> https://127.0.0.1:8080/app1/{path}
前缀是捕获组{path}
之前的路径开头。对于代理,前缀是/api
,而对于服务器,前缀是/app1
。在这种情况下,代理可以发送X-Forwarded-Prefix: /api
,以使原始前缀/api
覆盖服务器前缀/app1
。
场景 2:移除路径前缀
有时,应用程序可能希望移除前缀。例如,考虑以下代理到服务器的映射
https://app1.example.com/{path} -> https://127.0.0.1:8080/app1/{path} https://app2.example.com/{path} -> https://127.0.0.1:8080/app2/{path}
代理没有前缀,而应用程序app1
和app2
分别具有路径前缀/app1
和/app2
。代理可以发送X-Forwarded-Prefix:
,以使空前缀覆盖服务器前缀/app1
和/app2
。
这种部署场景的常见情况是,许可证按生产应用程序服务器付费,并且最好在每个服务器上部署多个应用程序以降低费用。另一个原因是在同一服务器上运行更多应用程序,以共享服务器运行所需的资源。 在这些情况下,应用程序需要一个非空的上下文根,因为同一服务器上有多个应用程序。但是,这在公共 API 的 URL 路径中不应该可见,应用程序可以使用不同的子域,这提供了以下好处:
|
场景 3:插入路径前缀
在其他情况下,可能需要添加前缀。例如,考虑以下代理到服务器的映射
https://example.com/api/app1/{path} -> https://127.0.0.1:8080/app1/{path}
在这种情况下,代理具有/api/app1
的前缀,而服务器具有/app1
的前缀。代理可以发送X-Forwarded-Prefix: /api/app1
,以使原始前缀/api/app1
覆盖服务器前缀/app1
。
ForwardedHeaderTransformer
ForwardedHeaderTransformer
是一个组件,它根据转发标头修改请求的主机、端口和方案,然后删除这些标头。如果将其声明为名为forwardedHeaderTransformer
的 bean,它将被检测到并使用。
在 5.1 版本中,ForwardedHeaderFilter 已被弃用,并由 ForwardedHeaderTransformer 取代,以便在创建交换之前更早地处理转发标头。如果过滤器仍然配置,它将从过滤器列表中移除,并使用 ForwardedHeaderTransformer 代替。
|
过滤器
在 WebHandler
API 中,您可以使用 WebFilter
在过滤器处理链和目标 WebHandler
的其余部分之前和之后应用拦截式逻辑。当使用 WebFlux 配置 时,注册 WebFilter
就像将其声明为 Spring bean 并(可选地)通过在 bean 声明中使用 @Order
或实现 Ordered
来表达优先级一样简单。
CORS
Spring WebFlux 通过控制器上的注释提供对 CORS 配置的细粒度支持。但是,当您将其与 Spring Security 一起使用时,我们建议依赖内置的 CorsFilter
,它必须在 Spring Security 的过滤器链之前排序。
有关更多详细信息,请参阅有关 CORS 和 CORS WebFilter
的部分。
异常
在 WebHandler
API 中,您可以使用 WebExceptionHandler
处理来自 WebFilter
实例链和目标 WebHandler
的异常。当使用 WebFlux 配置 时,注册 WebExceptionHandler
就像将其声明为 Spring bean 并(可选地)通过在 bean 声明中使用 @Order
或实现 Ordered
来表达优先级一样简单。
下表描述了可用的 WebExceptionHandler
实现
异常处理程序 | 描述 |
---|---|
|
通过将响应设置为异常的 HTTP 状态代码,为类型为 |
|
此处理程序在 WebFlux 配置 中声明。 |
编解码器
spring-web
和 spring-core
模块通过使用 Reactive Streams 背压的非阻塞 I/O,为将字节内容序列化和反序列化为更高层对象提供支持。以下是对此支持的描述
-
HttpMessageReader
和HttpMessageWriter
是用于编码和解码 HTTP 消息内容的契约。 -
Encoder
可以用EncoderHttpMessageWriter
包装以适应在 Web 应用程序中使用,而Decoder
可以用DecoderHttpMessageReader
包装。 -
DataBuffer
抽象化了不同的字节缓冲区表示(例如 NettyByteBuf
、java.nio.ByteBuffer
等),并且是所有编解码器都使用的对象。有关此主题的更多信息,请参阅“Spring Core”部分中的 数据缓冲区和编解码器。
spring-core
模块提供 byte[]
、ByteBuffer
、DataBuffer
、Resource
和 String
编码器和解码器实现。spring-web
模块提供 Jackson JSON、Jackson Smile、JAXB2、Protocol Buffers 和其他编码器和解码器,以及表单数据、多部分内容、服务器发送事件等专用于 Web 的 HTTP 消息读取器和写入器实现。
ClientCodecConfigurer
和 ServerCodecConfigurer
通常用于配置和自定义应用程序中使用的编解码器。有关配置 HTTP 消息编解码器 的部分,请参见。
Jackson JSON
当存在 Jackson 库时,JSON 和二进制 JSON (Smile) 都受支持。
Jackson2Decoder
的工作原理如下
-
Jackson 的异步非阻塞解析器用于将字节块流聚合到
TokenBuffer
中,每个TokenBuffer
代表一个 JSON 对象。 -
每个
TokenBuffer
都传递给 Jackson 的ObjectMapper
以创建更高级别的对象。 -
当解码为单值发布者(例如
Mono
)时,只有一个TokenBuffer
。 -
当解码为多值发布者(例如
Flux
)时,一旦接收到足够多的字节以形成完整的对象,每个TokenBuffer
就会传递给ObjectMapper
。输入内容可以是 JSON 数组,或任何 行分隔 JSON 格式,例如 NDJSON、JSON Lines 或 JSON 文本序列。
Jackson2Encoder
的工作原理如下
-
对于单值发布者(例如
Mono
),只需通过ObjectMapper
对其进行序列化即可。 -
对于具有
application/json
的多值发布者,默认情况下,使用Flux#collectToList()
收集值,然后序列化结果集合。 -
对于具有流媒体类型(例如
application/x-ndjson
或application/stream+x-jackson-smile
)的多值发布者,使用 行分隔 JSON 格式单独编码、写入和刷新每个值。其他流媒体类型可以与编码器一起注册。 -
对于 SSE,
Jackson2Encoder
会针对每个事件调用,并且输出会被刷新以确保及时传递。
默认情况下, |
表单数据
FormHttpMessageReader
和 FormHttpMessageWriter
支持解码和编码 application/x-www-form-urlencoded
内容。
在服务器端,表单内容通常需要从多个地方访问,ServerWebExchange
提供了一个专用的 getFormData()
方法,该方法通过 FormHttpMessageReader
解析内容,然后缓存结果以供重复访问。请参见 表单数据,位于 WebHandler
API 部分。
一旦使用 getFormData()
,就无法再从请求正文中读取原始原始内容。出于这个原因,应用程序应始终通过 ServerWebExchange
访问缓存的表单数据,而不是从原始请求正文中读取。
Multipart
MultipartHttpMessageReader
和 MultipartHttpMessageWriter
支持对 "multipart/form-data"、"multipart/mixed" 和 "multipart/related" 内容进行解码和编码。MultipartHttpMessageReader
将实际解析委托给另一个 HttpMessageReader
,以生成 Flux<Part>
,然后简单地将这些部分收集到 MultiValueMap
中。默认情况下,使用 DefaultPartHttpMessageReader
,但可以通过 ServerCodecConfigurer
进行更改。有关 DefaultPartHttpMessageReader
的更多信息,请参阅 DefaultPartHttpMessageReader
的 javadoc。
在服务器端,如果需要从多个地方访问 multipart 表单内容,ServerWebExchange
提供了一个专门的 getMultipartData()
方法,该方法通过 MultipartHttpMessageReader
解析内容,然后缓存结果以供重复访问。请参阅 Multipart 数据,位于 WebHandler
API 部分。
一旦使用 getMultipartData()
,就无法再从请求主体读取原始的原始内容。因此,应用程序必须始终如一地使用 getMultipartData()
来重复地以类似于映射的方式访问部分,或者依赖 SynchronossPartHttpMessageReader
来一次性访问 Flux<Part>
。
限制
缓冲部分或全部输入流的 Decoder
和 HttpMessageReader
实现可以配置一个限制,用于限制在内存中缓冲的最大字节数。在某些情况下,缓冲发生是因为输入被聚合并表示为单个对象——例如,带有 @RequestBody byte[]
、x-www-form-urlencoded
数据等的控制器方法。缓冲也可能发生在流式传输时,当拆分输入流时——例如,分隔文本、JSON 对象流等。对于这些流式传输情况,限制适用于与流中一个对象关联的字节数。
要配置缓冲区大小,您可以检查给定的 Decoder
或 HttpMessageReader
是否公开 maxInMemorySize
属性,如果是,Javadoc 将包含有关默认值的详细信息。在服务器端,ServerCodecConfigurer
提供了一个单一位置,可以从中设置所有编解码器,请参阅 HTTP 消息编解码器。在客户端,可以在 WebClient.Builder 中更改所有编解码器的限制。
对于 多部分解析,maxInMemorySize
属性限制了非文件部分的大小。对于文件部分,它决定了将部分写入磁盘的阈值。对于写入磁盘的文件部分,还有一个额外的 maxDiskUsagePerPart
属性来限制每个部分的磁盘空间量。还有一个 maxParts
属性来限制多部分请求中部分的总数。要在 WebFlux 中配置所有三个属性,您需要向 ServerCodecConfigurer
提供一个预先配置的 MultipartHttpMessageReader
实例。
流式传输
当流式传输到 HTTP 响应(例如,text/event-stream
、application/x-ndjson
)时,重要的是要定期发送数据,以便尽早可靠地检测到断开的客户端。这种发送可以是仅包含注释的空 SSE 事件,或任何其他“无操作”数据,这些数据实际上可以充当心跳。
DataBuffer
DataBuffer
是 WebFlux 中字节缓冲区的表示。本参考的 Spring Core 部分在关于 数据缓冲区和编解码器 的部分中对此进行了更多介绍。需要理解的关键点是,在 Netty 等一些服务器上,字节缓冲区是池化的,并进行引用计数,必须在使用后释放它们,以避免内存泄漏。
WebFlux 应用程序通常不需要关心这些问题,除非它们直接使用或生成数据缓冲区,而不是依赖编解码器来转换到更高层级的对象或从更高层级的对象转换,或者除非它们选择创建自定义编解码器。对于这种情况,请查看 数据缓冲区和编解码器 中的信息,尤其是关于 使用 DataBuffer 的部分。
日志记录
Spring WebFlux 中的 DEBUG
级别的日志记录旨在简洁、最小化且对人类友好。它侧重于高价值的信息片段,这些信息在反复使用时很有用,而其他信息只有在调试特定问题时才有用。
TRACE
级别的日志记录通常遵循与 DEBUG
相同的原则(例如,也不应该是一个信息洪流),但可以用于调试任何问题。此外,一些日志消息在 TRACE
和 DEBUG
下可能会显示不同的详细程度。
良好的日志记录源于使用日志的经验。如果您发现任何不符合既定目标的内容,请告知我们。
日志 ID
在 WebFlux 中,单个请求可以在多个线程上运行,线程 ID 对关联属于特定请求的日志消息没有用。这就是 WebFlux 日志消息默认情况下以请求特定 ID 为前缀的原因。
在服务器端,日志 ID 存储在 ServerWebExchange
属性中 (LOG_ID_ATTRIBUTE
),而基于该 ID 的完整格式前缀可从 ServerWebExchange#getLogPrefix()
获取。在 WebClient
端,日志 ID 存储在 ClientRequest
属性中 (LOG_ID_ATTRIBUTE
),而完整格式的前缀可从 ClientRequest#logPrefix()
获取。
敏感数据
DEBUG
和 TRACE
日志记录可能会记录敏感信息。这就是表单参数和标头默认情况下被屏蔽,并且您必须明确启用它们以完全记录的原因。
以下示例展示了如何在服务器端请求中执行此操作
-
Java
-
Kotlin
@Configuration
@EnableWebFlux
class MyConfig implements WebFluxConfigurer {
@Override
public void configureHttpMessageCodecs(ServerCodecConfigurer configurer) {
configurer.defaultCodecs().enableLoggingRequestDetails(true);
}
}
@Configuration
@EnableWebFlux
class MyConfig : WebFluxConfigurer {
override fun configureHttpMessageCodecs(configurer: ServerCodecConfigurer) {
configurer.defaultCodecs().enableLoggingRequestDetails(true)
}
}
以下示例展示了如何在客户端请求中执行此操作
-
Java
-
Kotlin
Consumer<ClientCodecConfigurer> consumer = configurer ->
configurer.defaultCodecs().enableLoggingRequestDetails(true);
WebClient webClient = WebClient.builder()
.exchangeStrategies(strategies -> strategies.codecs(consumer))
.build();
val consumer: (ClientCodecConfigurer) -> Unit = { configurer -> configurer.defaultCodecs().enableLoggingRequestDetails(true) }
val webClient = WebClient.builder()
.exchangeStrategies({ strategies -> strategies.codecs(consumer) })
.build()
自定义编解码器
应用程序可以注册自定义编解码器以支持其他媒体类型或默认编解码器不支持的特定行为。
以下示例展示了如何在客户端请求中执行此操作
-
Java
-
Kotlin
WebClient webClient = WebClient.builder()
.codecs(configurer -> {
CustomDecoder decoder = new CustomDecoder();
configurer.customCodecs().registerWithDefaultConfig(decoder);
})
.build();
val webClient = WebClient.builder()
.codecs({ configurer ->
val decoder = CustomDecoder()
configurer.customCodecs().registerWithDefaultConfig(decoder)
})
.build()