WebSockets

参考文档的这一部分涵盖了对响应式栈 WebSocket 消息的支持。

WebSocket 简介

WebSocket 协议,RFC 6455,提供了一种标准化方式,通过单一的 TCP 连接在客户端和服务器之间建立全双工、双向通信通道。它是一个不同于 HTTP 的 TCP 协议,但设计为在 HTTP 上工作,使用端口 80 和 443,并允许重用现有防火墙规则。

WebSocket 交互始于一个 HTTP 请求,该请求使用 HTTP Upgrade 头来升级,或者在这种情况下,切换到 WebSocket 协议。以下示例显示了这样的交互:

GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket (1)
Connection: Upgrade (2)
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: https://:8080
1 Upgrade 头。
2 使用 Upgrade 连接。

与通常的 200 状态码不同,支持 WebSocket 的服务器返回类似于以下内容的输出:

HTTP/1.1 101 Switching Protocols (1)
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp
1 协议切换

成功握手后,HTTP 升级请求底层的 TCP 套接字保持打开状态,客户端和服务器都可以继续发送和接收消息。

关于 WebSocket 工作原理的完整介绍超出了本文档的范围。请参阅 RFC 6455、HTML5 的 WebSocket 章,或网络上的许多介绍和教程。

请注意,如果 WebSocket 服务器在 Web 服务器(例如 nginx)后面运行,您可能需要将其配置为将 WebSocket 升级请求传递给 WebSocket 服务器。同样,如果应用程序在云环境中运行,请查看云提供商关于 WebSocket 支持的说明。

HTTP 与 WebSocket

尽管 WebSocket 被设计为与 HTTP 兼容并以 HTTP 请求开始,但重要的是要理解这两种协议导致了非常不同的架构和应用程序编程模型。

在 HTTP 和 REST 中,应用程序被建模为许多 URL。为了与应用程序交互,客户端以请求-响应方式访问这些 URL。服务器根据 HTTP URL、方法和头将请求路由到适当的处理程序。

相比之下,在 WebSockets 中,初始连接通常只有一个 URL。随后,所有应用程序消息都通过同一 TCP 连接流动。这指向了一个完全不同的异步、事件驱动的消息传递架构。

WebSocket 也是一个低级传输协议,与 HTTP 不同,它不对消息内容规定任何语义。这意味着,除非客户端和服务器就消息语义达成一致,否则无法路由或处理消息。

WebSocket 客户端和服务器可以通过 HTTP 握手请求上的 Sec-WebSocket-Protocol 头协商使用更高级别的消息协议(例如 STOMP)。如果没有该头,它们需要制定自己的约定。

何时使用 WebSockets

WebSockets 可以使网页动态和交互。然而,在许多情况下,AJAX 和 HTTP 流或长轮询的组合可以提供一个简单有效的解决方案。

例如,新闻、邮件和社交源需要动态更新,但每隔几分钟更新一次可能完全没问题。另一方面,协作、游戏和金融应用程序需要更接近实时。

延迟本身并不是决定性因素。如果消息量相对较低(例如,监控网络故障),HTTP 流或轮询可以提供有效的解决方案。只有低延迟、高频率和高流量的组合,才能最好地证明使用 WebSocket 的合理性。

还要记住,在互联网上,超出您控制的限制性代理可能会阻止 WebSocket 交互,原因可能是它们未配置为传递 Upgrade 头,或者它们关闭了看起来空闲的长期连接。这意味着,在防火墙内部的内部应用程序使用 WebSocket 比面向公众的应用程序更直接。

WebSocket API

Spring Framework 提供了一个 WebSocket API,您可以使用它来编写处理 WebSocket 消息的客户端和服务器端应用程序。

服务器

要创建 WebSocket 服务器,您可以首先创建一个 WebSocketHandler。以下示例展示了如何实现:

  • Java

  • Kotlin

import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;

public class MyWebSocketHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {
		// ...
	}
}
import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession

class MyWebSocketHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {
		// ...
	}
}

然后您可以将其映射到 URL:

  • Java

  • Kotlin

@Configuration
class WebConfig {

	@Bean
	public HandlerMapping handlerMapping() {
		Map<String, WebSocketHandler> map = new HashMap<>();
		map.put("/path", new MyWebSocketHandler());
		int order = -1; // before annotated controllers

		return new SimpleUrlHandlerMapping(map, order);
	}
}
@Configuration
class WebConfig {

	@Bean
	fun handlerMapping(): HandlerMapping {
		val map = mapOf("/path" to MyWebSocketHandler())
		val order = -1 // before annotated controllers

		return SimpleUrlHandlerMapping(map, order)
	}
}

如果使用 WebFlux 配置,则无需进行其他操作;否则,如果未使用 WebFlux 配置,则需要声明一个 WebSocketHandlerAdapter,如下所示:

  • Java

  • Kotlin

@Configuration
class WebConfig {

	// ...

	@Bean
	public WebSocketHandlerAdapter handlerAdapter() {
		return new WebSocketHandlerAdapter();
	}
}
@Configuration
class WebConfig {

	// ...

	@Bean
	fun handlerAdapter() =  WebSocketHandlerAdapter()
}

WebSocketHandler

WebSocketHandlerhandle 方法接受 WebSocketSession 并返回 Mono<Void>,以指示会话的应用程序处理何时完成。会话通过两个流进行处理,一个用于入站消息,一个用于出站消息。下表描述了处理这些流的两种方法:

WebSocketSession 方法 描述

Flux<WebSocketMessage> receive()

提供对入站消息流的访问,并在连接关闭时完成。

Mono<Void> send(Publisher<WebSocketMessage>)

接收一个用于出站消息的源,写入消息,并返回一个 Mono<Void>,当源完成并且写入完成时,该 Mono<Void> 完成。

WebSocketHandler 必须将入站和出站流组合成一个统一的流,并返回一个反映该流完成的 Mono<Void>。根据应用程序要求,统一流在以下情况之一完成:

  • 入站或出站消息流完成。

  • 入站流完成(即连接关闭),而出站流是无限的。

  • 在选定的点,通过 WebSocketSessionclose 方法。

当入站和出站消息流组合在一起时,无需检查连接是否打开,因为 Reactive Streams 会发出结束活动的信号。入站流接收完成或错误信号,出站流接收取消信号。

处理程序最基本的实现是处理入站流。以下示例展示了这样的实现:

  • Java

  • Kotlin

class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {
		return session.receive()			(1)
				.doOnNext(message -> {
					// ...					(2)
				})
				.concatMap(message -> {
					// ...					(3)
				})
				.then();					(4)
	}
}
1 访问入站消息流。
2 对每条消息执行操作。
3 执行使用消息内容的嵌套异步操作。
4 返回一个 Mono<Void>,当接收完成时该 Mono<Void> 完成。
class ExampleHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {
		return session.receive()			(1)
				.doOnNext {
					// ...					(2)
				}
				.concatMap {
					// ...					(3)
				}
				.then()						(4)
	}
}
1 访问入站消息流。
2 对每条消息执行操作。
3 执行使用消息内容的嵌套异步操作。
4 返回一个 Mono<Void>,当接收完成时该 Mono<Void> 完成。
对于嵌套的异步操作,您可能需要在使用池化数据缓冲区(例如 Netty)的底层服务器上调用 message.retain()。否则,数据缓冲区可能在您有机会读取数据之前就被释放。有关更多背景信息,请参阅 数据缓冲区和编解码器

以下实现结合了入站和出站流:

  • Java

  • Kotlin

class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {

		Flux<WebSocketMessage> output = session.receive()				(1)
				.doOnNext(message -> {
					// ...
				})
				.concatMap(message -> {
					// ...
				})
				.map(value -> session.textMessage("Echo " + value));	(2)

		return session.send(output);									(3)
	}
}
1 处理入站消息流。
2 创建出站消息,生成组合流。
3 返回一个 Mono<Void>,该 Mono<Void> 在我们继续接收时不会完成。
class ExampleHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {

		val output = session.receive()						(1)
				.doOnNext {
					// ...
				}
				.concatMap {
					// ...
				}
				.map { session.textMessage("Echo $it") }	(2)

		return session.send(output)							(3)
	}
}
1 处理入站消息流。
2 创建出站消息,生成组合流。
3 返回一个 Mono<Void>,该 Mono<Void> 在我们继续接收时不会完成。

入站和出站流可以相互独立,仅在完成时合并,如下例所示:

  • Java

  • Kotlin

class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {

		Mono<Void> input = session.receive()								(1)
				.doOnNext(message -> {
					// ...
				})
				.concatMap(message -> {
					// ...
				})
				.then();

		Flux<String> source = ... ;
		Mono<Void> output = session.send(source.map(session::textMessage));	(2)

		return input.and(output);											(3)
	}
}
1 处理入站消息流。
2 发送出站消息。
3 合并流并返回一个 Mono<Void>,当任一流结束时该 Mono<Void> 完成。
class ExampleHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {

		val input = session.receive()									(1)
				.doOnNext {
					// ...
				}
				.concatMap {
					// ...
				}
				.then()

		val source: Flux<String> = ...
		val output = session.send(source.map(session::textMessage))		(2)

		return input.and(output)										(3)
	}
}
1 处理入站消息流。
2 发送出站消息。
3 合并流并返回一个 Mono<Void>,当任一流结束时该 Mono<Void> 完成。

DataBuffer

DataBuffer 是 WebFlux 中字节缓冲区的表示。参考资料的 Spring Core 部分在 数据缓冲区和编解码器 一节中有更多关于它的内容。需要理解的关键是,在某些服务器(如 Netty)上,字节缓冲区是池化和引用计数的,必须在消耗后释放以避免内存泄漏。

在 Netty 上运行时,如果应用程序希望保留输入数据缓冲区以确保它们不被释放,则必须使用 DataBufferUtils.retain(dataBuffer),并在消耗缓冲区后使用 DataBufferUtils.release(dataBuffer)

握手

WebSocketHandlerAdapter 将委托给 WebSocketService。默认情况下,它是一个 HandshakeWebSocketService 实例,该实例对 WebSocket 请求执行基本检查,然后使用 RequestUpgradeStrategy 来处理正在使用的服务器。目前,内置支持 Reactor Netty、Tomcat 和 Jetty。

HandshakeWebSocketService 暴露了一个 sessionAttributePredicate 属性,允许设置一个 Predicate<String> 来从 WebSession 中提取属性并将其插入到 WebSocketSession 的属性中。

服务器配置

每个服务器的 RequestUpgradeStrategy 都会公开特定于底层 WebSocket 服务器引擎的配置。当使用 WebFlux Java 配置时,您可以自定义此类属性,如 WebFlux 配置 的相应部分所示,否则,如果未使用 WebFlux 配置,请使用以下内容:

  • Java

  • Kotlin

@Configuration
class WebConfig {

	@Bean
	public WebSocketHandlerAdapter handlerAdapter() {
		return new WebSocketHandlerAdapter(webSocketService());
	}

	@Bean
	public WebSocketService webSocketService() {
		TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
		strategy.setMaxSessionIdleTimeout(0L);
		return new HandshakeWebSocketService(strategy);
	}
}
@Configuration
class WebConfig {

	@Bean
	fun handlerAdapter() =
			WebSocketHandlerAdapter(webSocketService())

	@Bean
	fun webSocketService(): WebSocketService {
		val strategy = TomcatRequestUpgradeStrategy().apply {
			setMaxSessionIdleTimeout(0L)
		}
		return HandshakeWebSocketService(strategy)
	}
}

请查看您服务器的升级策略,以了解可用的选项。目前,只有 Tomcat 和 Jetty 提供了此类选项。

CORS

配置 CORS 和限制 WebSocket 端点访问的最简单方法是让您的 WebSocketHandler 实现 CorsConfigurationSource 并返回一个包含允许源、头和其他详细信息的 CorsConfiguration。如果无法做到这一点,您还可以设置 SimpleUrlHandler 上的 corsConfigurations 属性,通过 URL 模式指定 CORS 设置。如果两者都指定,它们将通过使用 CorsConfiguration 上的 combine 方法进行组合。

客户端

Spring WebFlux 提供了一个 WebSocketClient 抽象,并为 Reactor Netty、Tomcat、Jetty 和标准 Java(即 JSR-356)提供了实现。

Tomcat 客户端实际上是标准 Java 客户端的扩展,在 WebSocketSession 处理中增加了一些额外功能,以利用 Tomcat 特定的 API 暂停接收消息以实现背压。

要启动 WebSocket 会话,您可以创建一个客户端实例并使用其 execute 方法:

  • Java

  • Kotlin

WebSocketClient client = new ReactorNettyWebSocketClient();

URI url = new URI("ws://:8080/path");
client.execute(url, session ->
		session.receive()
				.doOnNext(System.out::println)
				.then());
val client = ReactorNettyWebSocketClient()

		val url = URI("ws://:8080/path")
		client.execute(url) { session ->
			session.receive()
					.doOnNext(::println)
			.then()
		}

一些客户端,例如 Jetty,实现了 Lifecycle,需要在您使用它们之前停止和启动。所有客户端都具有与底层 WebSocket 客户端配置相关的构造函数选项。

© . This site is unofficial and not affiliated with VMware.