RSocket 支持
RSocket Spring 集成模块 (spring-integration-rsocket
) 允许执行 RSocket 应用程序协议。
您需要将此依赖项包含到您的项目中
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
<version>6.3.5</version>
</dependency>
compile "org.springframework.integration:spring-integration-rsocket:6.3.5"
此模块从版本 5.2 开始可用,并且基于 Spring 消息传递基础及其 RSocket 组件实现,例如 RSocketRequester
、RSocketMessageHandler
和 RSocketStrategies
。有关 RSocket 协议、术语和组件的更多信息,请参阅 Spring 框架 RSocket 支持。
在通过通道适配器开始集成流处理之前,我们需要在服务器和客户端之间建立 RSocket 连接。为此,Spring 集成 RSocket 支持提供了 ServerRSocketConnector
和 ClientRSocketConnector
的 AbstractRSocketConnector
实现。
ServerRSocketConnector
根据提供的 io.rsocket.transport.ServerTransport
在主机和端口上公开侦听器,以接受来自客户端的连接。内部 RSocketServer
实例可以使用 setServerConfigurer()
进行自定义,以及可以配置的其他选项,例如 RSocketStrategies
和 MimeType
用于有效负载数据和标头元数据。当从客户端请求者提供 setupRoute
时(请参阅下面的 ClientRSocketConnector
),已连接的客户端将作为 RSocketRequester
存储在由 clientRSocketKeyStrategy
BiFunction<Map<String, Object>, DataBuffer, Object>
确定的键下。默认情况下,连接数据用作键,作为转换为字符串的值,使用 UTF-8 字符集。此类 RSocketRequester
注册表可用于应用程序逻辑中,以确定特定客户端连接以与其交互,或将相同消息发布到所有连接的客户端。当从客户端建立连接时,ServerRSocketConnector
会发出 RSocketConnectedEvent
。这类似于 Spring 消息传递模块中 @ConnectMapping
注解提供的内容。映射模式 *
表示接受所有客户端路由。RSocketConnectedEvent
可用于通过 DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER
标头区分不同的路由。
典型的服务器配置可能如下所示
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}
@Bean
public ServerRSocketConnector serverRSocketConnector() {
ServerRSocketConnector serverRSocketConnector = new ServerRSocketConnector("localhost", 0);
serverRSocketConnector.setRSocketStrategies(rsocketStrategies());
serverRSocketConnector.setMetadataMimeType(new MimeType("message", "x.rsocket.routing.v0"));
serverRSocketConnector.setServerConfigurer((server) -> server.payloadDecoder(PayloadDecoder.ZERO_COPY));
serverRSocketConnector.setClientRSocketKeyStrategy((headers, data) -> ""
+ headers.get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER));
return serverRSocketConnector;
}
@EventListener
public void onApplicationEvent(RSocketConnectedEvent event) {
...
}
所有选项(包括 RSocketStrategies
bean 和 RSocketConnectedEvent
的 @EventListener
)都是可选的。有关更多信息,请参阅 ServerRSocketConnector
JavaDocs。
从版本 5.2.1 开始,ServerRSocketMessageHandler
被提取到一个公共的顶级类中,以便可能与现有的 RSocket 服务器连接。当 ServerRSocketConnector
使用 ServerRSocketMessageHandler
的外部实例提供时,它不会在内部创建 RSocket 服务器,而只是将所有处理逻辑委托给提供的实例。此外,ServerRSocketMessageHandler
可以使用 messageMappingCompatible
标志进行配置,以处理 RSocket 控制器的 @MessageMapping
,完全替换标准 RSocketMessageHandler
提供的功能。这在混合配置中非常有用,当经典的 @MessageMapping
方法与 RSocket 通道适配器一起出现在同一个应用程序中,并且应用程序中存在外部配置的 RSocket 服务器时。
ClientRSocketConnector
充当基于通过提供的 ClientTransport
连接的 RSocket
的 RSocketRequester
的持有者。RSocketConnector
可以使用提供的 RSocketConnectorConfigurer
进行自定义。setupRoute
(带可选模板变量)和带元数据的 setupData
也可以在此组件上配置。
典型的客户端配置可能如下所示
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}
@Bean
public ClientRSocketConnector clientRSocketConnector() {
ClientRSocketConnector clientRSocketConnector =
new ClientRSocketConnector("localhost", serverRSocketConnector().getBoundPort().block());
clientRSocketConnector.setRSocketStrategies(rsocketStrategies());
clientRSocketConnector.setSetupRoute("clientConnect/{user}");
clientRSocketConnector.setSetupRouteVariables("myUser");
return clientRSocketConnector;
}
大多数这些选项(包括 RSocketStrategies
bean)都是可选的。请注意我们如何连接到在任意端口上本地启动的 RSocket 服务器。有关 setupData
用例,请参阅 ServerRSocketConnector.clientRSocketKeyStrategy
。有关更多信息,另请参阅 ClientRSocketConnector
及其 AbstractRSocketConnector
超类 JavaDocs。
ClientRSocketConnector
和 ServerRSocketConnector
都负责将其入站通道适配器映射到其 path
配置以路由传入的 RSocket 请求。有关更多信息,请参阅下一节。
RSocket 入站网关
RSocketInboundGateway
负责接收 RSocket 请求并生成响应(如果有)。它需要一个 path
映射数组,该数组可以作为类似于 MVC 请求映射或 @MessageMapping
语义的模式。此外,(从版本 5.2.2 开始),可以在 RSocketInboundGateway
上配置一组交互模型(请参阅 RSocketInteractionModel
),以通过特定的帧类型将 RSocket 请求限制到此端点。默认情况下,支持所有交互模型。此类 bean 根据其 IntegrationRSocketEndpoint
实现(ReactiveMessageHandler
的扩展)由 ServerRSocketConnector
或 ClientRSocketConnector
自动检测,用于传入请求的内部 IntegrationRSocketMessageHandler
中的路由逻辑。可以将 AbstractRSocketConnector
提供给 RSocketInboundGateway
以进行显式端点注册。这样,该 AbstractRSocketConnector
上的自动检测选项将被禁用。RSocketStrategies
也可以注入到 RSocketInboundGateway
中,或者从提供的 AbstractRSocketConnector
中获取,覆盖任何显式注入。解码器用于从这些 RSocketStrategies
中解码请求有效负载,根据提供的 requestElementType
进行解码。如果在传入的 Message
中未提供 RSocketPayloadReturnValueHandler.RESPONSE_HEADER
标头,则 RSocketInboundGateway
将请求视为 fireAndForget
RSocket 交互模型。在这种情况下,RSocketInboundGateway
会对 outputChannel
执行简单的 send
操作。否则,RSocketPayloadReturnValueHandler.RESPONSE_HEADER
标头中的 MonoProcessor
值用于将回复发送到 RSocket。为此,RSocketInboundGateway
对 outputChannel
执行 sendAndReceiveMessageReactive
操作。要发送到下游的消息的 payload
始终是根据 MessagingRSocket
逻辑的 Flux
。在 fireAndForget
RSocket 交互模型中,消息具有简单的转换后的 payload
。回复 payload
可以是普通对象或 Publisher
- RSocketInboundGateway
会根据 RSocketStrategies
中提供的编码器将两者正确地转换为 RSocket 响应。
从版本 5.3 开始,decodeFluxAsUnit
选项(默认值为 false
)已添加到 RSocketInboundGateway
中。默认情况下,传入的 Flux
会以以下方式转换:其每个事件都会单独解码。这是当前 @MessageMapping
语义中存在的精确行为。要恢复以前的行为或根据应用程序要求将整个 Flux
解码为单个单元,必须将 decodeFluxAsUnit
设置为 true
。但是,目标解码逻辑取决于所选的 Decoder
,例如,StringDecoder
需要流中存在换行符(默认情况下)以指示字节缓冲区的结束。
有关如何配置 RSocketInboundGateway
端点并在下游处理有效负载的示例,请参阅 使用 Java 配置 RSocket 端点。
RSocket 出站网关
RSocketOutboundGateway
是一个 AbstractReplyProducingMessageHandler
,用于对 RSocket 执行请求并根据 RSocket 回复(如果有)生成回复。低级 RSocket 协议交互被委托给从提供的 ClientRSocketConnector
或服务器端请求消息中的 RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER
标头解析的 RSocketRequester
。服务器端的目标 RSocketRequester
可以从 RSocketConnectedEvent
解析,或者使用 ServerRSocketConnector.getClientRSocketRequester()
API 根据通过 ServerRSocketConnector.setClientRSocketKeyStrategy()
为连接请求映射选择的某些业务密钥进行解析。有关更多信息,请参阅 ServerRSocketConnector
JavaDocs。
要发送请求的 route
必须显式配置(以及路径变量)或通过针对请求消息计算的 SpEL 表达式进行配置。
可以通过 RSocketInteractionModel
选项或相应的表达式设置提供 RSocket 交互模型。默认情况下,requestResponse
用于常见的网关用例。
当请求消息有效负载为 Publisher
时,可以提供 publisherElementType
选项以根据目标 RSocketRequester
中提供的 RSocketStrategies
对其元素进行编码。此选项的表达式可以计算为 ParameterizedTypeReference
。有关数据及其类型的更多信息,请参阅 RSocketRequester.RequestSpec.data()
JavaDocs。
RSocket 请求还可以使用 metadata
进行增强。为此,可以在 RSocketOutboundGateway
上配置针对请求消息的 metadataExpression
。此类表达式必须计算为 Map<Object, MimeType>
。
当 interactionModel
不是 fireAndForget
时,必须提供 expectedResponseType
。默认为 String.class
。此选项的表达式可以计算为 ParameterizedTypeReference
。有关回复数据及其类型的更多信息,请参阅 RSocketRequester.RetrieveSpec.retrieveMono()
和 RSocketRequester.RetrieveSpec.retrieveFlux()
JavaDocs。
来自 RSocketOutboundGateway
的回复 payload
始终是 Mono
(即使对于 fireAndForget
交互模型,它也是 Mono<Void>
),从而使此组件成为 async
。在生成到常规通道之前,会订阅此类 Mono
,或者按需由 FluxMessageChannel
处理。requestStream
或 requestChannel
交互模型的 Flux
响应也包装在回复 Mono
中。它可以通过 FluxMessageChannel
与直通服务激活器一起在下游展平
@ServiceActivator(inputChannel = "rsocketReplyChannel", outputChannel ="fluxMessageChannel")
public Flux<?> flattenRSocketResponse(Flux<?> payload) {
return payload;
}
或在目标应用程序逻辑中显式订阅。
可以将预期响应类型配置(或通过表达式计算)为 void
,将此网关视为出站通道适配器。但是,仍然必须配置 outputChannel
(即使它只是一个 NullChannel
),以启动对返回的 Mono
的订阅。
有关如何配置 RSocketOutboundGateway
端点并在下游处理有效负载的示例,请参阅 使用 Java 配置 RSocket 端点。
RSocket 命名空间支持
Spring 集成提供了一个 rsocket
命名空间和相应的架构定义。要将其包含在您的配置中,请在您的应用程序上下文配置文件中添加以下命名空间声明
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-rsocket="http://www.springframework.org/schema/integration/rsocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/rsocket
https://www.springframework.org/schema/integration/rsocket/spring-integration-rsocket.xsd">
...
</beans>
入站
要使用 XML 配置 Spring Integration RSocket 入站通道适配器,您需要使用 int-rsocket
命名空间中的适当 inbound-gateway
组件。以下示例展示了如何配置它。
<int-rsocket:inbound-gateway id="inboundGateway"
path="testPath"
interaction-models="requestStream,requestChannel"
rsocket-connector="clientRSocketConnector"
request-channel="requestChannel"
rsocket-strategies="rsocketStrategies"
request-element-type="byte[]"/>
ClientRSocketConnector
和 ServerRSocketConnector
应配置为通用的 <bean>
定义。
出站
<int-rsocket:outbound-gateway id="outboundGateway"
client-rsocket-connector="clientRSocketConnector"
auto-startup="false"
interaction-model="fireAndForget"
route-expression="'testRoute'"
request-channel="requestChannel"
publisher-element-type="byte[]"
expected-response-type="java.util.Date"
metadata-expression="{'metadata': new org.springframework.util.MimeType('*')}"/>
请参阅 spring-integration-rsocket.xsd
以了解所有这些 XML 属性的描述。
使用 Java 配置 RSocket 端点
以下示例展示了如何使用 Java 配置 RSocket 入站端点。
@Bean
public RSocketInboundGateway rsocketInboundGatewayRequestReply() {
RSocketInboundGateway rsocketInboundGateway = new RSocketInboundGateway("echo");
rsocketInboundGateway.setRequestChannelName("requestReplyChannel");
return rsocketInboundGateway;
}
@Transformer(inputChannel = "requestReplyChannel")
public Mono<String> echoTransformation(Flux<String> payload) {
return payload.next().map(String::toUpperCase);
}
此配置中假设使用 ClientRSocketConnector
或 ServerRSocketConnector
,表示在“echo”路径上自动检测此类端点。请注意 @Transformer
签名及其对 RSocket 请求的完全响应式处理以及生成响应式回复。
以下示例展示了如何使用 Java DSL 配置 RSocket 入站网关。
@Bean
public IntegrationFlow rsocketUpperCaseFlow() {
return IntegrationFlow
.from(RSockets.inboundGateway("/uppercase")
.interactionModels(RSocketInteractionModel.requestChannel))
.<Flux<String>, Mono<String>>transform((flux) -> flux.next().map(String::toUpperCase))
.get();
}
此配置中假设使用 ClientRSocketConnector
或 ServerRSocketConnector
,表示在“/uppercase”路径上自动检测此类端点,并且预期交互模型为“请求通道”。
以下示例展示了如何使用 Java 配置 RSocket 出站网关。
@Bean
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
public RSocketOutboundGateway rsocketOutboundGateway() {
RSocketOutboundGateway rsocketOutboundGateway =
new RSocketOutboundGateway(
new FunctionExpression<Message<?>>((m) ->
m.getHeaders().get("route_header")));
rsocketOutboundGateway.setInteractionModelExpression(
new FunctionExpression<Message<?>>((m) -> m.getHeaders().get("rsocket_interaction_model")));
rsocketOutboundGateway.setClientRSocketConnector(clientRSocketConnector());
return rsocketOutboundGateway;
}
setClientRSocketConnector()
仅在客户端需要。在服务器端,必须在请求消息中提供包含 RSocketRequester
值的 RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER
标头。
以下示例展示了如何使用 Java DSL 配置 RSocket 出站网关。
@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
return IntegrationFlow
.from(Function.class)
.handle(RSockets.outboundGateway("/uppercase")
.interactionModel(RSocketInteractionModel.requestResponse)
.expectedResponseType(String.class)
.clientRSocketConnector(clientRSocketConnector))
.get();
}
有关如何在上述流的开头使用提到的 Function
接口的更多信息,请参阅 IntegrationFlow
作为网关。