RSocket

本节介绍 Spring 框架对 RSocket 协议的支持。

概述

RSocket 是一种用于通过 TCP、WebSocket 和其他字节流传输进行多路复用、双工通信的应用程序协议,使用以下交互模型之一

  • 请求-响应 — 发送一条消息并接收一条回复。

  • 请求-流 — 发送一条消息并接收一系列消息作为回复。

  • 通道 — 在两个方向上发送消息流。

  • 发送并忘记 — 发送单向消息。

建立初始连接后,“客户端”与“服务器”之间的区别将消失,因为双方都变得对称,并且每一侧都可以启动上述交互之一。这就是为什么在协议调用中参与方为“请求方”和“响应方”,而上述交互被称为“请求流”或简称为“请求”。

以下是 RSocket 协议的关键特性和优势

  • 跨网络边界的响应式流语义 — 对于诸如请求-流通道之类的流式请求,背压信号在请求方和响应方之间传递,允许请求方在源处降低响应方的速度,从而减少对网络层拥塞控制的依赖,以及对网络级别或任何级别的缓冲的需求。

  • 请求节流 — 此功能以LEASE帧(可以从每一端发送以限制另一端在给定时间内允许的请求总数)之后的“租赁”命名。租赁会定期续期。

  • 会话恢复 — 这是为连接丢失而设计的,需要维护一些状态。状态管理对应用程序是透明的,并且与背压结合使用效果很好,背压可以在可能的情况下停止生产者并减少所需的状态量。

  • 大型消息的分片和重新组装。

  • 保持活动(心跳)。

RSocket 在多种语言中都有实现Java 库构建在Project ReactorReactor Netty上用于传输。这意味着来自应用程序中响应式流发布者的信号将通过 RSocket 透明地跨网络传播。

协议

RSocket 的优势之一是它在网络上具有明确定义的行为,并且有一个易于阅读的规范以及一些协议扩展。因此,建议阅读规范,而无需依赖语言实现和更高级别的框架 API。本节提供了一个简洁的概述,以建立一些上下文。

连接

最初,客户端通过某种低级流传输(例如 TCP 或 WebSocket)连接到服务器,并向服务器发送SETUP帧以设置连接的参数。

服务器可能会拒绝SETUP帧,但通常在发送(对于客户端)和接收(对于服务器)后,双方都可以开始发出请求,除非SETUP指示使用租赁语义来限制请求数量,在这种情况下,双方都必须等待来自另一端的LEASE帧才能允许发出请求。

发出请求

建立连接后,双方可以通过以下帧之一启动请求:REQUEST_RESPONSEREQUEST_STREAMREQUEST_CHANNELREQUEST_FNF。这些帧中的每一个都承载了从请求方到响应方的一条消息。

然后,响应方可能会返回带有响应消息的PAYLOAD帧,并且在REQUEST_CHANNEL的情况下,请求方也可能会发送带有更多请求消息的PAYLOAD帧。

当请求涉及消息流(例如请求-流通道)时,响应方必须尊重请求方的需求信号。需求表示为消息数量。初始需求在REQUEST_STREAMREQUEST_CHANNEL帧中指定。后续需求通过REQUEST_N帧发出信号。

每一侧还可以通过METADATA_PUSH帧发送元数据通知,这些通知与任何单个请求无关,而是与整个连接有关。

消息格式

RSocket 消息包含数据和元数据。元数据可用于发送路由、安全令牌等。数据和元数据可以采用不同的格式。每种格式的 MIME 类型在SETUP帧中声明,并应用于给定连接上的所有请求。

虽然所有消息都可以有元数据,但通常元数据(例如路由)是每个请求的,因此仅包含在请求的第一个消息中,即使用以下帧之一:REQUEST_RESPONSEREQUEST_STREAMREQUEST_CHANNELREQUEST_FNF

协议扩展定义了应用程序中使用的通用元数据格式

Java 实现

RSocket 的Java 实现构建在Project Reactor上。TCP 和 WebSocket 的传输构建在Reactor Netty上。作为响应式流库,Reactor 简化了协议实现的工作。对于应用程序来说,使用FluxMono以及声明式运算符和透明背压支持是自然的选择。

RSocket Java 中的 API 故意保持最简和基本。它专注于协议功能,并将应用程序编程模型(例如,RPC 代码生成与其他)作为更高级别的独立问题。

主契约io.rsocket.RSocket使用Mono表示单个消息的承诺,Flux表示一系列消息,io.rsocket.Payload表示带有对数据和元数据(作为字节缓冲区)访问权限的实际消息,来建模四种请求交互类型。RSocket契约以对称方式使用。对于请求,应用程序会获得一个RSocket来执行请求。对于响应,应用程序实现RSocket来处理请求。

这并非旨在作为彻底的介绍。在大多数情况下,Spring 应用程序无需直接使用其 API。但是,独立于 Spring 查看或试验 RSocket 可能是很重要的。RSocket Java 存储库包含许多示例应用程序,这些应用程序演示了其 API 和协议功能。

Spring 支持

spring-messaging 模块包含以下内容

  • RSocketRequester — 用于通过io.rsocket.RSocket发出请求(包括数据和元数据编码/解码)的流畅 API。

  • 带注解的响应器 — 用于响应的@MessageMapping@RSocketExchange带注解的处理程序方法。

  • RSocket 接口 — RSocket 服务声明为 Java 接口,其中包含@RSocketExchange方法,可用于请求方或响应方。

spring-web 模块包含 RSocket 应用程序可能需要的EncoderDecoder实现,例如 Jackson CBOR/JSON 和 Protobuf。它还包含可用于高效路由匹配的PathPatternParser

Spring Boot 2.2 支持通过 TCP 或 WebSocket 启动 RSocket 服务器,包括在 WebFlux 服务器中通过 WebSocket 公开 RSocket 的选项。还支持客户端和 RSocketRequester.BuilderRSocketStrategies的自动配置。有关更多详细信息,请参阅 Spring Boot 参考中的RSocket 部分

Spring Security 5.2 提供了 RSocket 支持。

Spring Integration 5.2 提供了入站和出站网关,以与 RSocket 客户端和服务器交互。有关更多详细信息,请参阅 Spring Integration 参考手册。

Spring Cloud Gateway 支持 RSocket 连接。

RSocketRequester

RSocketRequester 提供了一个流畅的 API 来执行 RSocket 请求,接受和返回对象以表示数据和元数据,而不是底层的数据缓冲区。它可以对称地使用,用于从客户端发出请求以及从服务器发出请求。

客户端请求器

在客户端获取 RSocketRequester 需要连接到服务器,这涉及发送带有连接设置的 RSocket SETUP 帧。RSocketRequester 提供了一个构建器,用于帮助准备 io.rsocket.core.RSocketConnector,其中包括 SETUP 帧的连接设置。

这是使用默认设置连接的最基本方法

  • Java

  • Kotlin

RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);

URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);
val requester = RSocketRequester.builder().tcp("localhost", 7000)

URI url = URI.create("https://example.org:8080/rsocket");
val requester = RSocketRequester.builder().webSocket(url)

以上代码不会立即连接。当发出请求时,会透明地建立一个共享连接并使用它。

连接设置

RSocketRequester.Builder 提供以下选项来自定义初始 SETUP

  • dataMimeType(MimeType) — 设置连接上数据的 MIME 类型。

  • metadataMimeType(MimeType) — 设置连接上元数据的 MIME 类型。

  • setupData(Object) — 包含在 SETUP 中的数据。

  • setupRoute(String, Object…​) — 包含在 SETUP 中的元数据中的路由。

  • setupMetadata(Object, MimeType) — 包含在 SETUP 中的其他元数据。

对于数据,默认的 MIME 类型是从第一个配置的 Decoder 中派生的。对于元数据,默认的 MIME 类型是 复合元数据,它允许每个请求包含多个元数据值和 MIME 类型对。通常不需要更改两者。

SETUP 帧中的数据和元数据是可选的。在服务器端,可以使用 @ConnectMapping 方法来处理连接的开始和 SETUP 帧的内容。元数据可用于连接级安全性。

策略

RSocketRequester.Builder 接受 RSocketStrategies 来配置请求器。您需要使用它来提供用于数据和元数据值的(反)序列化编解码器。默认情况下,仅注册来自 spring-coreStringbyte[]ByteBuffer 的基本编解码器。添加 spring-web 可以访问更多编解码器,可以按如下方式注册

  • Java

  • Kotlin

RSocketStrategies strategies = RSocketStrategies.builder()
	.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
	.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
	.build();

RSocketRequester requester = RSocketRequester.builder()
	.rsocketStrategies(strategies)
	.tcp("localhost", 7000);
val strategies = RSocketStrategies.builder()
		.encoders { it.add(Jackson2CborEncoder()) }
		.decoders { it.add(Jackson2CborDecoder()) }
		.build()

val requester = RSocketRequester.builder()
		.rsocketStrategies(strategies)
		.tcp("localhost", 7000)

RSocketStrategies 旨在重复使用。在某些情况下,例如同一个应用程序中的客户端和服务器,最好在 Spring 配置中声明它。

客户端响应器

RSocketRequester.Builder 可用于配置对服务器请求的响应器。

您可以使用带注释的处理程序进行客户端响应,它基于服务器上使用的相同基础设施,但以编程方式注册,如下所示

  • Java

  • Kotlin

RSocketStrategies strategies = RSocketStrategies.builder()
	.routeMatcher(new PathPatternRouteMatcher())  (1)
	.build();

SocketAcceptor responder =
	RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)

RSocketRequester requester = RSocketRequester.builder()
	.rsocketConnector(connector -> connector.acceptor(responder)) (3)
	.tcp("localhost", 7000);
1 如果存在 spring-web,则使用 PathPatternRouteMatcher 进行高效的路由匹配。
2 从具有 @MessageMapping 和/或 @ConnectMapping 方法的类创建响应器。
3 注册响应器。
val strategies = RSocketStrategies.builder()
		.routeMatcher(PathPatternRouteMatcher())  (1)
		.build()

val responder =
	RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)

val requester = RSocketRequester.builder()
		.rsocketConnector { it.acceptor(responder) } (3)
		.tcp("localhost", 7000)
1 如果存在 spring-web,则使用 PathPatternRouteMatcher 进行高效的路由匹配。
2 从具有 @MessageMapping 和/或 @ConnectMapping 方法的类创建响应器。
3 注册响应器。

请注意,以上仅是为客户端响应器的编程注册设计的快捷方式。对于客户端响应器位于 Spring 配置中的其他场景,您仍然可以将 RSocketMessageHandler 声明为 Spring bean,然后按如下方式应用

  • Java

  • Kotlin

ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);

RSocketRequester requester = RSocketRequester.builder()
	.rsocketConnector(connector -> connector.acceptor(handler.responder()))
	.tcp("localhost", 7000);
import org.springframework.beans.factory.getBean

val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()

val requester = RSocketRequester.builder()
		.rsocketConnector { it.acceptor(handler.responder()) }
		.tcp("localhost", 7000)

对于以上情况,您可能还需要在 RSocketMessageHandler 中使用 setHandlerPredicate 切换到不同的策略来检测客户端响应器,例如,基于自定义注解(如 @RSocketClientResponder)而不是默认的 @Controller。这在同一个应用程序中存在客户端和服务器或多个客户端的场景中是必要的。

另请参阅 带注释的响应器,以了解有关编程模型的更多信息。

高级

RSocketRequesterBuilder 提供了一个回调来公开底层的 io.rsocket.core.RSocketConnector,以进一步配置保持活动间隔、会话恢复、拦截器等选项。您可以按如下方式在该级别配置选项

  • Java

  • Kotlin

RSocketRequester requester = RSocketRequester.builder()
	.rsocketConnector(connector -> {
		// ...
	})
	.tcp("localhost", 7000);
val requester = RSocketRequester.builder()
		.rsocketConnector {
			//...
		}
		.tcp("localhost", 7000)

服务器请求器

从服务器向已连接的客户端发出请求,只需从服务器获取已连接客户端的请求器即可。

带注释的响应器 中,@ConnectMapping@MessageMapping 方法支持 RSocketRequester 参数。使用它来访问连接的请求器。请记住,@ConnectMapping 方法本质上是 SETUP 帧的处理程序,必须在请求开始之前进行处理。因此,一开始的请求必须与处理分离。例如

  • Java

  • Kotlin

@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
	requester.route("status").data("5")
		.retrieveFlux(StatusReport.class)
		.subscribe(bar -> { (1)
			// ...
		});
	return ... (2)
}
1 异步启动请求,独立于处理。
2 执行处理并返回完成 Mono<Void>
@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
	GlobalScope.launch {
		requester.route("status").data("5").retrieveFlow<StatusReport>().collect { (1)
			// ...
		}
	}
	/// ... (2)
}
1 异步启动请求,独立于处理。
2 在挂起函数中执行处理。

请求

一旦您拥有 客户端服务器 请求器,您可以按如下方式发出请求

  • Java

  • Kotlin

ViewBox viewBox = ... ;

Flux<AirportLocation> locations = requester.route("locate.radars.within") (1)
		.data(viewBox) (2)
		.retrieveFlux(AirportLocation.class); (3)
1 指定要包含在请求消息元数据中的路由。
2 提供请求消息的数据。
3 声明预期的响应。
val viewBox: ViewBox = ...

val locations = requester.route("locate.radars.within") (1)
		.data(viewBox) (2)
		.retrieveFlow<AirportLocation>() (3)
1 指定要包含在请求消息元数据中的路由。
2 提供请求消息的数据。
3 声明预期的响应。

交互类型根据输入和输出的基数隐式确定。以上示例是 Request-Stream,因为发送了一个值并接收了一系列值。在大多数情况下,您不需要考虑这一点,只要输入和输出的选择与 RSocket 交互类型匹配,以及响应器期望的输入和输出类型即可。唯一一个无效组合的示例是多对一。

data(Object) 方法还接受任何 Reactive Streams Publisher,包括 FluxMono,以及在 ReactiveAdapterRegistry 中注册的任何其他值(或值集)生产者。对于产生相同类型值的多分值 Publisher(如 Flux),请考虑使用重载的 data 方法之一,以避免对每个元素进行类型检查和 Encoder 查找

data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);

data(Object) 步骤是可选的。对于不发送数据的请求,请跳过此步骤

  • Java

  • Kotlin

Mono<AirportLocation> location = requester.route("find.radar.EWR"))
	.retrieveMono(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveAndAwait

val location = requester.route("find.radar.EWR")
	.retrieveAndAwait<AirportLocation>()

如果使用 复合元数据(默认值)并且如果值受注册的 Encoder 支持,则可以添加额外的元数据值。例如

  • Java

  • Kotlin

String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");

Flux<AirportLocation> locations = requester.route("locate.radars.within")
		.metadata(securityToken, mimeType)
		.data(viewBox)
		.retrieveFlux(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveFlow

val requester: RSocketRequester = ...

val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")

val locations = requester.route("locate.radars.within")
		.metadata(securityToken, mimeType)
		.data(viewBox)
		.retrieveFlow<AirportLocation>()

对于 Fire-and-Forget,请使用返回 Mono<Void>send() 方法。请注意,Mono 仅表示消息已成功发送,而不是已处理。

对于 Metadata-Push,请使用 sendMetadata() 方法,其返回值为 Mono<Void>

带注释的响应器

RSocket 响应器可以实现为 @MessageMapping@ConnectMapping 方法。@MessageMapping 方法处理单个请求,而 @ConnectMapping 方法处理连接级事件(设置和元数据推送)。带注释的响应器对称地支持,用于从服务器端响应以及从客户端端响应。

服务器响应器

要在服务器端使用带注释的响应器,请将 RSocketMessageHandler 添加到您的 Spring 配置中,以检测具有 @MessageMapping@ConnectMapping 方法的 @Controller bean

  • Java

  • Kotlin

@Configuration
static class ServerConfig {

	@Bean
	public RSocketMessageHandler rsocketMessageHandler() {
		RSocketMessageHandler handler = new RSocketMessageHandler();
		handler.routeMatcher(new PathPatternRouteMatcher());
		return handler;
	}
}
@Configuration
class ServerConfig {

	@Bean
	fun rsocketMessageHandler() = RSocketMessageHandler().apply {
		routeMatcher = PathPatternRouteMatcher()
	}
}

然后通过 Java RSocket API 启动 RSocket 服务器,并将 RSocketMessageHandler 作为响应器插入,如下所示

  • Java

  • Kotlin

ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);

CloseableChannel server =
	RSocketServer.create(handler.responder())
		.bind(TcpServerTransport.create("localhost", 7000))
		.block();
import org.springframework.beans.factory.getBean

val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()

val server = RSocketServer.create(handler.responder())
		.bind(TcpServerTransport.create("localhost", 7000))
		.awaitSingle()

RSocketMessageHandler 默认支持 复合路由 元数据。如果您需要切换到不同的 MIME 类型或注册其他元数据 MIME 类型,可以设置其 MetadataExtractor

您需要设置支持的元数据和数据格式所需的 EncoderDecoder 实例。您可能需要 spring-web 模块来实现编解码器。

默认情况下,SimpleRouteMatcher 用于通过 AntPathMatcher 匹配路由。我们建议插入来自 spring-webPathPatternRouteMatcher 以进行高效的路由匹配。RSocket 路由可以是分层的,但不是 URL 路径。两个路由匹配器都配置为默认使用 "." 作为分隔符,并且与 HTTP URL 不同,没有 URL 解码。

RSocketMessageHandler 可以通过 RSocketStrategies 进行配置,如果您需要在同一进程中的客户端和服务器之间共享配置,这将很有用

  • Java

  • Kotlin

@Configuration
static class ServerConfig {

	@Bean
	public RSocketMessageHandler rsocketMessageHandler() {
		RSocketMessageHandler handler = new RSocketMessageHandler();
		handler.setRSocketStrategies(rsocketStrategies());
		return handler;
	}

	@Bean
	public RSocketStrategies rsocketStrategies() {
		return RSocketStrategies.builder()
			.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
			.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
			.routeMatcher(new PathPatternRouteMatcher())
			.build();
	}
}
@Configuration
class ServerConfig {

	@Bean
	fun rsocketMessageHandler() = RSocketMessageHandler().apply {
		rSocketStrategies = rsocketStrategies()
	}

	@Bean
	fun rsocketStrategies() = RSocketStrategies.builder()
			.encoders { it.add(Jackson2CborEncoder()) }
			.decoders { it.add(Jackson2CborDecoder()) }
			.routeMatcher(PathPatternRouteMatcher())
			.build()
}

客户端响应器

客户端上的带注释的响应器需要在 RSocketRequester.Builder 中进行配置。有关详细信息,请参阅 客户端响应器

@MessageMapping

一旦 服务器客户端 响应器配置就绪,就可以按如下方式使用 @MessageMapping 方法

  • Java

  • Kotlin

@Controller
public class RadarsController {

	@MessageMapping("locate.radars.within")
	public Flux<AirportLocation> radars(MapRequest request) {
		// ...
	}
}
@Controller
class RadarsController {

	@MessageMapping("locate.radars.within")
	fun radars(request: MapRequest): Flow<AirportLocation> {
		// ...
	}
}

以上 @MessageMapping 方法响应具有路由 "locate.radars.within" 的 Request-Stream 交互。它支持灵活的方法签名,可以选择使用以下方法参数

方法参数 描述

@Payload

请求的有效负载。这可以是具体值或异步类型(如 MonoFlux)。

注意:使用该注解是可选的。如果一个方法参数不是简单类型,也不是任何其他支持的参数,则假定它是预期的有效负载。

RSocketRequester

用于向远程端发出请求的请求器。

@DestinationVariable

根据映射模式中的变量从路由中提取的值,例如 @MessageMapping("find.radar.{id}")

@Header

为提取注册的元数据值,如 MetadataExtractor 中所述。

@Headers Map<String, Object>

为提取注册的所有元数据值,如 MetadataExtractor 中所述。

返回值应为一个或多个要序列化为响应有效负载的对象。这可以是异步类型(如 MonoFlux)、具体值或 void 或无值异步类型(如 Mono<Void>)。

@MessageMapping 方法支持的 RSocket 交互类型由输入(即 @Payload 参数)和输出的基数确定,其中基数表示以下含义

基数 描述

1

具体值或单值异步类型(如 Mono<T>)。

多值异步类型(如 Flux<T>)。

0

对于输入,这意味着该方法没有 @Payload 参数。

对于输出,它是 void 或无值异步类型(如 Mono<Void>)。

下表显示了所有输入和输出基数组合以及相应的交互类型

输入基数 输出基数 交互类型

0, 1

0

Fire-and-Forget、Request-Response

0, 1

1

Request-Response

0, 1

Request-Stream

0、1、多

Request-Channel

@RSocketExchange

作为@MessageMapping的替代方案,您还可以使用@RSocketExchange方法处理请求。此类方法在RSocket 接口上声明,并且可以通过RSocketServiceProxyFactory用作请求方,或由响应方实现。

例如,要作为响应方处理请求

  • Java

  • Kotlin

public interface RadarsService {

	@RSocketExchange("locate.radars.within")
	Flux<AirportLocation> radars(MapRequest request);
}

@Controller
public class RadarsController implements RadarsService {

	public Flux<AirportLocation> radars(MapRequest request) {
		// ...
	}
}
interface RadarsService {

	@RSocketExchange("locate.radars.within")
	fun radars(request: MapRequest): Flow<AirportLocation>
}

@Controller
class RadarsController : RadarsService {

	override fun radars(request: MapRequest): Flow<AirportLocation> {
		// ...
	}
}

@RSocketExhange@MessageMapping之间存在一些差异,因为前者需要保持适用于请求方和响应方。例如,虽然可以声明@MessageMapping来处理任意数量的路由,并且每个路由都可以是模式,但@RSocketExchange必须使用单个具体路由进行声明。在支持的方法参数中也存在细微差别,这些参数与元数据相关,请参阅@MessageMappingRSocket 接口以获取支持参数的列表。

可以在类型级别使用@RSocketExchange来指定给定 RSocket 服务接口的所有路由的通用前缀。

@ConnectMapping

@ConnectMapping处理 RSocket 连接开始时的SETUP帧,以及通过METADATA_PUSH帧进行的任何后续元数据推送通知,即io.rsocket.RSocket中的metadataPush(Payload)

@ConnectMapping方法支持与@MessageMapping相同的参数,但基于来自SETUPMETADATA_PUSH帧的元数据和数据。@ConnectMapping可以具有模式以将处理范围缩小到元数据中具有路由的特定连接,或者如果未声明任何模式,则所有连接都匹配。

@ConnectMapping方法不能返回数据,并且必须声明为voidMono<Void>作为返回值。如果处理对新连接返回错误,则会拒绝该连接。处理不得被阻止以对连接的RSocketRequester发出请求。有关详细信息,请参阅服务器请求方

MetadataExtractor

响应方必须解释元数据。复合元数据允许独立格式化的元数据值(例如,用于路由、安全、跟踪),每个值都有自己的 MIME 类型。应用程序需要一种方法来配置要支持的元数据 MIME 类型,以及一种访问提取值的方法。

MetadataExtractor是一个契约,用于获取序列化后的元数据并返回解码后的名称-值对,然后可以通过名称(例如,通过带注释的处理程序方法中的@Header)访问这些对。

可以向DefaultMetadataExtractor提供Decoder实例来解码元数据。它开箱即用地支持"message/x.rsocket.routing.v0",它将其解码为String并保存在“route”键下。对于任何其他 MIME 类型,您都需要提供一个Decoder并按如下方式注册 MIME 类型

  • Java

  • Kotlin

DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(fooMimeType, Foo.class, "foo");
import org.springframework.messaging.rsocket.metadataToExtract

val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Foo>(fooMimeType, "foo")

复合元数据非常适合组合独立的元数据值。但是,请求方可能不支持复合元数据,或者可能选择不使用它。为此,DefaultMetadataExtractor可能需要自定义逻辑将解码后的值映射到输出映射。以下是一个使用 JSON 作为元数据的示例

  • Java

  • Kotlin

DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(
	MimeType.valueOf("application/vnd.myapp.metadata+json"),
	new ParameterizedTypeReference<Map<String,String>>() {},
	(jsonMap, outputMap) -> {
		outputMap.putAll(jsonMap);
	});
import org.springframework.messaging.rsocket.metadataToExtract

val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Map<String, String>>(MimeType.valueOf("application/vnd.myapp.metadata+json")) { jsonMap, outputMap ->
	outputMap.putAll(jsonMap)
}

通过RSocketStrategies配置MetadataExtractor时,您可以让RSocketStrategies.Builder使用配置的解码器创建提取器,并简单地使用回调自定义注册,如下所示

  • Java

  • Kotlin

RSocketStrategies strategies = RSocketStrategies.builder()
	.metadataExtractorRegistry(registry -> {
		registry.metadataToExtract(fooMimeType, Foo.class, "foo");
		// ...
	})
	.build();
import org.springframework.messaging.rsocket.metadataToExtract

val strategies = RSocketStrategies.builder()
		.metadataExtractorRegistry { registry: MetadataExtractorRegistry ->
			registry.metadataToExtract<Foo>(fooMimeType, "foo")
			// ...
		}
		.build()

RSocket 接口

Spring 框架允许您使用带有@RSocketExchange方法的 Java 接口定义 RSocket 服务。您可以将此类接口传递给RSocketServiceProxyFactory以创建代理,该代理通过RSocketRequester执行请求。您还可以实现该接口作为处理请求的响应方。

首先使用@RSocketExchange方法创建接口

interface RadarService {

	@RSocketExchange("radars")
	Flux<AirportLocation> getRadars(@Payload MapRequest request);

	// more RSocket exchange methods...

}

现在,您可以创建一个代理,当调用方法时执行请求

RSocketRequester requester = ... ;
RSocketServiceProxyFactory factory = RSocketServiceProxyFactory.builder(requester).build();

RadarService service = factory.createClient(RadarService.class);

您还可以实现该接口以作为响应方处理请求。请参阅带注释的响应方

方法参数

带注释的 RSocket 交换方法支持具有以下方法参数的灵活方法签名

方法参数 描述

@DestinationVariable

添加一个路由变量以传递给RSocketRequester以及来自@RSocketExchange注释的路由,以便扩展路由中的模板占位符。此变量可以是 String 或任何 Object,然后通过toString()进行格式化。

@Payload

设置请求的输入有效负载。这可以是具体值,也可以是任何值的生产者,这些值可以通过ReactiveAdapterRegistry适配到 Reactive Streams Publisher。除非required属性设置为false,或者参数根据MethodParameter#isOptional被标记为可选,否则必须提供有效负载。

Object,后跟MimeType

输入有效负载中元数据条目的值。只要下一个参数是元数据条目的MimeType,这就可以是任何Object。该值可以是具体值,也可以是任何可以适配到 Reactive Streams Publisher(通过ReactiveAdapterRegistry)的单个值的生产者。

MimeType

元数据条目的MimeType。预期前面的方法参数是元数据值。

返回值

带注释的 RSocket 交换方法支持作为具体值或任何可以适配到 Reactive Streams Publisher(通过ReactiveAdapterRegistry)的值的生产者的返回值。

默认情况下,具有同步(阻塞)方法签名的 RSocket 服务方法的行为取决于底层 RSocket ClientTransport的响应超时设置以及 RSocket 保活设置。RSocketServiceProxyFactory.Builder确实公开了blockTimeout选项,该选项还允许您配置对响应阻塞的最长时间,但我们建议在 RSocket 级别配置超时值以获得更多控制。