WebSocket

参考文档的这一部分涵盖了对响应式栈WebSocket消息传递的支持。

WebSocket简介

WebSocket协议,RFC 6455,提供了一种标准化的方法,用于在客户端和服务器之间通过单个TCP连接建立全双工、双向通信通道。它与HTTP是不同的TCP协议,但设计为可以在HTTP上运行,使用端口80和443,并允许重复使用现有的防火墙规则。

WebSocket交互以使用HTTP Upgrade标头升级或切换到WebSocket协议的HTTP请求开始。以下示例显示了此类交互

GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket (1)
Connection: Upgrade (2)
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: https://127.0.0.1:8080
1 Upgrade标头。
2 使用Upgrade连接。

支持WebSocket的服务器,而不是通常的200状态代码,将返回类似于以下内容的输出

HTTP/1.1 101 Switching Protocols (1)
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp
1 协议切换

握手成功后,HTTP升级请求底层的TCP套接字将保持打开状态,以便客户端和服务器继续发送和接收消息。

本文档不包含有关WebSocket工作原理的完整介绍。请参阅RFC 6455、HTML5的WebSocket章节或网络上的众多介绍和教程。

请注意,如果WebSocket服务器在Web服务器(例如nginx)后面运行,则可能需要将其配置为将WebSocket升级请求传递给WebSocket服务器。同样,如果应用程序在云环境中运行,请查看云提供商与WebSocket支持相关的说明。

HTTP与WebSocket

即使WebSocket旨在与HTTP兼容,并从HTTP请求开始,但了解这两个协议会导致截然不同的架构和应用程序编程模型非常重要。

在HTTP和REST中,应用程序被建模为许多URL。要与应用程序交互,客户端以请求-响应方式访问这些URL。服务器根据HTTP URL、方法和标头将请求路由到相应的处理程序。

相比之下,在WebSocket中,通常只有一个URL用于初始连接。随后,所有应用程序消息都通过同一TCP连接流动。这指向了一种完全不同的异步、事件驱动、消息传递架构。

WebSocket也是一种低级传输协议,与HTTP不同,它不会对消息内容规定任何语义。这意味着,除非客户端和服务器就消息语义达成一致,否则无法路由或处理消息。

WebSocket客户端和服务器可以通过HTTP握手请求上的Sec-WebSocket-Protocol标头协商使用更高级别的消息传递协议(例如STOMP)。在没有该标头的情况下,它们需要制定自己的约定。

何时使用WebSocket

WebSocket可以使网页变得动态和交互式。但是,在许多情况下,AJAX和HTTP流或长轮询的组合可以提供简单有效的解决方案。

例如,新闻、邮件和社交信息流需要动态更新,但每隔几分钟更新一次可能完全可以。另一方面,协作、游戏和金融应用程序需要更接近实时。

仅延迟不是决定性因素。如果消息量相对较低(例如,监控网络故障),HTTP流或轮询可以提供有效的解决方案。正是低延迟、高频率和高容量的组合使WebSocket成为最佳选择。

还要记住,在Internet上,超出您控制范围的限制性代理可能会阻止WebSocket交互,因为它们未配置为传递Upgrade标头,或者因为它们关闭了看起来处于空闲状态的长连接。这意味着在防火墙内使用WebSocket进行内部应用程序的决策比面向公众的应用程序更容易。

WebSocket API

Spring框架提供了一个WebSocket API,您可以使用它来编写处理WebSocket消息的客户端和服务器端应用程序。

服务器

要创建WebSocket服务器,您可以首先创建一个WebSocketHandler。以下示例显示了如何执行此操作

  • Java

  • Kotlin

import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;

public class MyWebSocketHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {
		// ...
	}
}
import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession

class MyWebSocketHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {
		// ...
	}
}

然后您可以将其映射到URL

  • Java

  • Kotlin

@Configuration
class WebConfig {

	@Bean
	public HandlerMapping handlerMapping() {
		Map<String, WebSocketHandler> map = new HashMap<>();
		map.put("/path", new MyWebSocketHandler());
		int order = -1; // before annotated controllers

		return new SimpleUrlHandlerMapping(map, order);
	}
}
@Configuration
class WebConfig {

	@Bean
	fun handlerMapping(): HandlerMapping {
		val map = mapOf("/path" to MyWebSocketHandler())
		val order = -1 // before annotated controllers

		return SimpleUrlHandlerMapping(map, order)
	}
}

如果使用WebFlux配置,则无需执行其他操作,否则,如果未使用WebFlux配置,则需要声明一个WebSocketHandlerAdapter,如下所示

  • Java

  • Kotlin

@Configuration
class WebConfig {

	// ...

	@Bean
	public WebSocketHandlerAdapter handlerAdapter() {
		return new WebSocketHandlerAdapter();
	}
}
@Configuration
class WebConfig {

	// ...

	@Bean
	fun handlerAdapter() =  WebSocketHandlerAdapter()
}

WebSocketHandler

WebSocketHandlerhandle方法接收WebSocketSession并返回Mono<Void>以指示应用程序何时完成会话处理。会话通过两个流处理,一个用于入站消息,一个用于出站消息。下表描述了处理流的两种方法

WebSocketSession方法 描述

Flux<WebSocketMessage> receive()

提供对入站消息流的访问,并在连接关闭时完成。

Mono<Void> send(Publisher<WebSocketMessage>)

接收传出消息的源,写入消息,并返回一个Mono<Void>,该Mono在源完成且写入完成后完成。

WebSocketHandler必须将入站和出站流组合成一个统一的流,并返回一个反映该流完成的Mono<Void>。根据应用程序的要求,统一流在以下情况下完成

  • 入站或出站消息流完成。

  • 入站流完成(即连接关闭),而出站流是无限的。

  • 在选定的点,通过WebSocketSessionclose方法。

当入站和出站消息流组合在一起时,无需检查连接是否打开,因为Reactive Streams信号结束活动。入站流接收完成或错误信号,出站流接收取消信号。

处理程序最基本的实现是处理入站流的实现。以下示例显示了这种实现

  • Java

  • Kotlin

class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {
		return session.receive()			(1)
				.doOnNext(message -> {
					// ...					(2)
				})
				.concatMap(message -> {
					// ...					(3)
				})
				.then();					(4)
	}
}
1 访问入站消息流。
2 对每条消息执行某些操作。
3 执行使用消息内容的嵌套异步操作。
4 返回一个在接收完成时完成的Mono<Void>
class ExampleHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {
		return session.receive()            (1)
				.doOnNext {
					// ...					(2)
				}
				.concatMap {
					// ...					(3)
				}
				.then()                     (4)
	}
}
1 访问入站消息流。
2 对每条消息执行某些操作。
3 执行使用消息内容的嵌套异步操作。
4 返回一个在接收完成时完成的Mono<Void>
对于嵌套的异步操作,您可能需要在使用池化数据缓冲区的底层服务器(例如,Netty)上对message.retain()进行调用。否则,在您有机会读取数据之前,数据缓冲区可能会被释放。有关更多背景信息,请参阅数据缓冲区和编解码器

以下实现组合了入站和出站流

  • Java

  • Kotlin

class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {

		Flux<WebSocketMessage> output = session.receive()				(1)
				.doOnNext(message -> {
					// ...
				})
				.concatMap(message -> {
					// ...
				})
				.map(value -> session.textMessage("Echo " + value));	(2)

		return session.send(output);									(3)
	}
}
1 处理入站消息流。
2 创建传出消息,生成组合流。
3 返回一个在我们继续接收时不会完成的Mono<Void>
class ExampleHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {

		val output = session.receive()                      (1)
				.doOnNext {
					// ...
				}
				.concatMap {
					// ...
				}
				.map { session.textMessage("Echo $it") }    (2)

		return session.send(output)                         (3)
	}
}
1 处理入站消息流。
2 创建传出消息,生成组合流。
3 返回一个在我们继续接收时不会完成的Mono<Void>

入站和出站流可以是独立的,并且仅用于完成,如下例所示

  • Java

  • Kotlin

class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {

		Mono<Void> input = session.receive()								(1)
				.doOnNext(message -> {
					// ...
				})
				.concatMap(message -> {
					// ...
				})
				.then();

		Flux<String> source = ... ;
		Mono<Void> output = session.send(source.map(session::textMessage));	(2)

		return Mono.zip(input, output).then();								(3)
	}
}
1 处理入站消息流。
2 发送传出消息。
3 连接流并返回一个在任一流结束时完成的Mono<Void>
class ExampleHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {

		val input = session.receive()									(1)
				.doOnNext {
					// ...
				}
				.concatMap {
					// ...
				}
				.then()

		val source: Flux<String> = ...
		val output = session.send(source.map(session::textMessage))		(2)

		return Mono.zip(input, output).then()							(3)
	}
}
1 处理入站消息流。
2 发送传出消息。
3 连接流并返回一个在任一流结束时完成的Mono<Void>

DataBuffer

DataBuffer是WebFlux中字节缓冲区的表示形式。参考的Spring Core部分在数据缓冲区和编解码器部分中对此进行了更多介绍。需要理解的关键点是在某些服务器(如Netty)上,字节缓冲区是池化的并进行引用计数,并且必须在使用后释放以避免内存泄漏。

在Netty上运行时,如果应用程序希望保留输入数据缓冲区,则必须使用DataBufferUtils.retain(dataBuffer)以确保它们不会被释放,并在使用缓冲区后使用DataBufferUtils.release(dataBuffer)

握手

WebSocketHandlerAdapter委托给WebSocketService。默认情况下,这是一个HandshakeWebSocketService实例,它对WebSocket请求执行基本检查,然后使用正在使用的服务器的RequestUpgradeStrategy。目前,内置支持Reactor Netty、Tomcat、Jetty和Undertow。

HandshakeWebSocketService公开了一个sessionAttributePredicate属性,该属性允许设置一个Predicate<String>以从WebSession中提取属性并将其插入到WebSocketSession的属性中。

服务器配置

每个服务器的RequestUpgradeStrategy公开特定于底层WebSocket服务器引擎的配置。当使用WebFlux Java配置时,您可以自定义此类属性,如WebFlux配置的相关部分所示,或者如果不使用WebFlux配置,则使用以下方法

  • Java

  • Kotlin

@Configuration
class WebConfig {

	@Bean
	public WebSocketHandlerAdapter handlerAdapter() {
		return new WebSocketHandlerAdapter(webSocketService());
	}

	@Bean
	public WebSocketService webSocketService() {
		TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
		strategy.setMaxSessionIdleTimeout(0L);
		return new HandshakeWebSocketService(strategy);
	}
}
@Configuration
class WebConfig {

	@Bean
	fun handlerAdapter() =
			WebSocketHandlerAdapter(webSocketService())

	@Bean
	fun webSocketService(): WebSocketService {
		val strategy = TomcatRequestUpgradeStrategy().apply {
			setMaxSessionIdleTimeout(0L)
		}
		return HandshakeWebSocketService(strategy)
	}
}

检查您服务器的升级策略以查看可用的选项。目前,只有Tomcat和Jetty公开了此类选项。

CORS

配置CORS并限制对WebSocket端点的访问的最简单方法是让您的WebSocketHandler实现CorsConfigurationSource并返回一个包含允许的来源、标头和其他详细信息的CorsConfiguration。如果无法做到这一点,您还可以设置SimpleUrlHandler上的corsConfigurations属性以通过URL模式指定CORS设置。如果两者都指定,则通过在CorsConfiguration上使用combine方法将它们组合。

客户端

Spring WebFlux提供了一个WebSocketClient抽象,其中包含Reactor Netty、Tomcat、Jetty、Undertow和标准Java(即JSR-356)的实现。

Tomcat客户端实际上是标准Java客户端的扩展,在WebSocketSession处理中具有一些额外的功能,以利用Tomcat特定的API来暂停接收消息以进行背压。

要启动WebSocket会话,您可以创建客户端的实例并使用其execute方法

  • Java

  • Kotlin

WebSocketClient client = new ReactorNettyWebSocketClient();

URI url = new URI("ws://127.0.0.1:8080/path");
client.execute(url, session ->
		session.receive()
				.doOnNext(System.out::println)
				.then());
val client = ReactorNettyWebSocketClient()

		val url = URI("ws://127.0.0.1:8080/path")
		client.execute(url) { session ->
			session.receive()
					.doOnNext(::println)
			.then()
		}

某些客户端(例如Jetty)实现了Lifecycle,并且在使用它们之前需要停止和启动。所有客户端都具有与底层WebSocket客户端配置相关的构造函数选项。