WebSockets 支持

从 4.1 版本开始,Spring Integration 支持 WebSocket。它基于 Spring 框架的 web-socket 模块的架构、基础设施和 API。因此,许多 Spring WebSocket 的组件(例如 SubProtocolHandlerWebSocketClient)和配置选项(例如 @EnableWebSocketMessageBroker)都可以在 Spring Integration 中重复使用。有关更多信息,请参阅 Spring Framework 参考手册中的 Spring Framework WebSocket 支持 章节。

您需要将此依赖项包含到您的项目中

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-websocket</artifactId>
    <version>6.3.5</version>
</dependency>
compile "org.springframework.integration:spring-integration-websocket:6.3.5"

对于服务器端,必须显式包含 org.springframework:spring-webmvc 依赖项。

Spring Framework WebSocket 基础设施基于 Spring 消息传递基础,并提供了一个基于相同 MessageChannel 实现和 MessageHandler 实现的基本消息传递框架,Spring Integration 使用这些实现(以及一些 POJO 方法注释映射)。因此,即使没有 WebSocket 适配器,Spring Integration 也可以直接参与 WebSocket 流。为此,您可以使用适当的注释配置 Spring Integration @MessagingGateway,如下例所示

@MessagingGateway
@Controller
public interface WebSocketGateway {

    @MessageMapping("/greeting")
    @SendToUser("/queue/answer")
    @Gateway(requestChannel = "greetingChannel")
    String greeting(String payload);

}

概述

由于 WebSocket 协议从定义上讲是流式传输的,并且我们可以同时向 WebSocket 发送和接收消息,因此无论是在客户端还是服务器端,我们都可以处理相应的 WebSocketSession。为了封装连接管理和 WebSocketSession 注册表,IntegrationWebSocketContainer 提供了 ClientWebSocketContainerServerWebSocketContainer 实现。感谢 WebSocket API 及其在 Spring Framework 中的实现(具有许多扩展),服务器端和客户端都使用相同的类(当然,从 Java 的角度来看)。因此,大多数连接和 WebSocketSession 注册表选项在两端都是相同的。这使我们能够重复使用许多配置项和基础设施挂钩,以在服务器端和客户端构建 WebSocket 应用程序。以下示例显示了组件如何服务于这两种目的

//Client side
@Bean
public WebSocketClient webSocketClient() {
    return new SockJsClient(Collections.singletonList(new WebSocketTransport(new JettyWebSocketClient())));
}

@Bean
public IntegrationWebSocketContainer clientWebSocketContainer() {
    return new ClientWebSocketContainer(webSocketClient(), "ws://my.server.com/endpoint");
}

//Server side
@Bean
public IntegrationWebSocketContainer serverWebSocketContainer() {
    return new ServerWebSocketContainer("/endpoint").withSockJs();
}

IntegrationWebSocketContainer 旨在实现双向消息传递,可以在入站和出站通道适配器之间共享(见下文),在使用单向(发送或接收)WebSocket 消息传递时,可以仅从其中一个适配器引用它。它可以在没有任何通道适配器的情况下使用,但在这种情况下,IntegrationWebSocketContainer 只充当 WebSocketSession 注册表的角色。

ServerWebSocketContainer 实现 WebSocketConfigurer 以将内部 IntegrationWebSocketContainer.IntegrationWebSocketHandler 注册为 Endpoint。它在提供的 paths 和其他服务器 WebSocket 选项(例如 HandshakeHandlerSockJS fallback)下在目标供应商 WebSocket 容器的 ServletWebSocketHandlerRegistry 中执行此操作。此注册是通过一个基础结构 WebSocketIntegrationConfigurationInitializer 组件实现的,该组件的作用与 @EnableWebSocket 注释相同。这意味着,通过使用 @EnableIntegration(或应用程序上下文中的任何 Spring Integration 命名空间),您可以省略 @EnableWebSocket 声明,因为 Spring Integration 基础设施会检测所有 WebSocket 端点。

从 6.1 版本开始,可以使用提供的 URI 而不是 uriTemplateuriVariables 组合来配置 ClientWebSocketContainer。这在某些 uri 部分需要自定义编码的情况下很有用。请参阅 UriComponentsBuilder API 以获得便利。

WebSocket 入站通道适配器

WebSocketInboundChannelAdapter 实现 WebSocketSession 交互的接收部分。您必须为其提供一个 IntegrationWebSocketContainer,适配器将自身注册为 WebSocketListener 以处理传入的消息和 WebSocketSession 事件。

只能在 IntegrationWebSocketContainer 中注册一个 WebSocketListener

对于 WebSocket 子协议,可以使用 SubProtocolHandlerRegistry 作为第二个构造函数参数来配置 WebSocketInboundChannelAdapter。适配器委托给 SubProtocolHandlerRegistry 以确定已接受的 WebSocketSession 的适当 SubProtocolHandler,并根据子协议实现将 WebSocketMessage 转换为 Message

默认情况下,WebSocketInboundChannelAdapter 仅依赖于原始的 PassThruSubProtocolHandler 实现,该实现将 WebSocketMessage 转换为 Message

WebSocketInboundChannelAdapter 仅接受并发送到底层集成流具有 SimpMessageType.MESSAGE 或空 simpMessageType 标头的 Message 实例。所有其他 Message 类型都通过从 SubProtocolHandler 实现(例如 StompSubProtocolHandler)发出的 ApplicationEvent 实例处理。

在服务器端,如果存在 @EnableWebSocketMessageBroker 配置,则可以使用 useBroker = true 选项配置 WebSocketInboundChannelAdapter。在这种情况下,所有 非 MESSAGE Message 类型都委托给提供的 AbstractBrokerMessageHandler。此外,如果代理中继配置了目标前缀,则与代理目标匹配的消息将路由到 AbstractBrokerMessageHandler 而不是 WebSocketInboundChannelAdapteroutputChannel

如果 useBroker = false 并且接收到的消息类型为 SimpMessageType.CONNECT,则 WebSocketInboundChannelAdapter 会立即将 SimpMessageType.CONNECT_ACK 消息发送到 WebSocketSession,而不会将其发送到通道。

Spring 的 WebSocket 支持允许仅配置一个代理中继。因此,我们不需要 AbstractBrokerMessageHandler 引用。它在应用程序上下文中检测到。

有关更多配置选项,请参阅 WebSocket 命名空间支持

WebSocket 出站通道适配器

WebSocketOutboundChannelAdapter

  1. 接受来自其 MessageChannel 的 Spring Integration 消息

  2. MessageHeaders 中确定 WebSocketSession id

  3. 从提供的 IntegrationWebSocketContainer 中检索 WebSocketSession

  4. WebSocketMessage 的转换和发送工作委托给来自提供的 SubProtocolHandlerRegistry 的相应 SubProtocolHandler

在客户端,不需要 WebSocketSession id 消息标头,因为 ClientWebSocketContainer 仅处理单个连接及其 WebSocketSession

要使用 STOMP 子协议,您应该使用 StompSubProtocolHandler 配置此适配器。然后,您可以使用 StompHeaderAccessor.create(StompCommand…​)MessageBuilder 或仅使用 HeaderEnricher(请参阅 标头增强器)将任何 STOMP 消息类型发送到此适配器。

本章的其余部分主要介绍其他配置选项。

WebSocket 命名空间支持

Spring Integration WebSocket 命名空间包含本章其余部分中描述的几个组件。要将其包含在您的配置中,请在您的应用程序上下文配置文件中使用以下命名空间声明

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-websocket="http://www.springframework.org/schema/integration/websocket"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/websocket
    https://www.springframework.org/schema/integration/websocket/spring-integration-websocket.xsd">
    ...
</beans>

<int-websocket:client-container> 属性

以下列表显示了 <int-websocket:client-container> 元素可用的属性

<int-websocket:client-container
                  id=""                             (1)
                  client=""                         (2)
                  uri=""                            (3)
                  uri-variables=""                  (4)
                  origin=""                         (5)
                  send-time-limit=""                (6)
                  send-buffer-size-limit=""         (7)
                  send-buffer-overflow-strategy=""  (8)
                  auto-startup=""                   (9)
                  phase="">                        (10)
                <int-websocket:http-headers>
                  <entry key="" value=""/>
                </int-websocket:http-headers>      (11)
</int-websocket:client-container>
1 组件 bean 名称。
2 WebSocketClient bean 引用。
3 到目标 WebSocket 服务的 uriuriTemplate。如果将其用作带有 URI 变量占位符的 uriTemplate,则需要 uri-variables 属性。
4 uri 属性值中 URI 变量占位符的逗号分隔值。这些值将根据其在 uri 中的顺序替换到占位符中。请参阅 UriComponents.expand(Object…​uriVariableValues)
5 Origin 握手 HTTP 标头值。
6 WebSocket 会话“发送”超时限制。默认为 10000
7 WebSocket 会话“发送”消息大小限制。默认为 524288
8 WebSocket 会话发送缓冲区溢出策略决定会话的出站消息缓冲区达到 send-buffer-size-limit 时的行为。请参阅 ConcurrentWebSocketSessionDecorator.OverflowStrategy 以了解可能的值和更多详细信息。
9 布尔值,指示此端点是否应自动启动。默认为 false,假设此容器是从 WebSocket 入站适配器 启动的。
10 在此端点应启动和停止的生命周期阶段。值越低,此端点启动越早,停止越晚。默认值为 Integer.MAX_VALUE。值可以为负。请参阅 SmartLifeCycle
11 要与握手请求一起使用的 HttpHeadersMap

<int-websocket:server-container> 属性

以下列表显示了 <int-websocket:server-container> 元素可用的属性

<int-websocket:server-container
          id=""                             (1)
          path=""                           (2)
          handshake-handler=""              (3)
          handshake-interceptors=""         (4)
          decorator-factories=""            (5)
          send-time-limit=""                (6)
          send-buffer-size-limit=""         (7)
          send-buffer-overflow-strategy=""  (8)
          allowed-origins="">               (9)
          <int-websocket:sockjs
            client-library-url=""          (10)
            stream-bytes-limit=""          (11)
            session-cookie-needed=""       (12)
            heartbeat-time=""              (13)
            disconnect-delay=""            (14)
            message-cache-size=""          (15)
            websocket-enabled=""           (16)
            scheduler=""                   (17)
            message-codec=""               (18)
            transport-handlers=""          (19)
            suppress-cors="true" />        (20)
</int-websocket:server-container>
1 组件 bean 名称。
2 将特定请求映射到 WebSocketHandler 的路径(或逗号分隔的路径)。支持精确路径映射 URI(例如 /myPath)和 ant 风格的路径模式(例如 /myPath/**)。
3 HandshakeHandler bean 引用。默认为 DefaultHandshakeHandler
4 HandshakeInterceptor bean 引用的列表。
5 一个或多个工厂(WebSocketHandlerDecoratorFactory)的列表,这些工厂装饰用于处理 WebSocket 消息的处理程序。这可能对某些高级用例有用(例如,允许 Spring Security 在相应的 HTTP 会话过期时强制关闭 WebSocket 会话)。有关更多信息,请参阅 Spring Session 项目
6 请参阅 <int-websocket:client-container> 上的相同选项。
7 请参阅 <int-websocket:client-container> 上的相同选项。
8 WebSocket 会话发送缓冲区溢出策略决定会话的出站消息缓冲区达到 send-buffer-size-limit 时的行为。请参阅 ConcurrentWebSocketSessionDecorator.OverflowStrategy 以了解可能的值和更多详细信息。
9 允许的来源标头值。您可以将多个来源指定为逗号分隔的列表。此检查主要设计用于浏览器客户端。没有任何东西可以阻止其他类型的客户端修改来源标头值。当启用 SockJS 并限制允许的来源时,不使用来源标头进行跨源请求的传输类型(jsonp-pollingiframe-xhr-pollingiframe-eventsourceiframe-htmlfile)将被禁用。因此,不支持 IE6 和 IE7,并且仅在没有 cookie 的情况下支持 IE8 和 IE9。默认情况下,允许所有来源。

10 对于没有原生跨域通信功能的传输(例如 eventsourcehtmlfile),必须从“外部”域获取一个简单的页面,并将其嵌入到一个不可见的 iframe 中,以便 iframe 中的代码可以从 SockJS 服务器本地域运行。由于 iframe 需要加载 SockJS JavaScript 客户端库,因此此属性允许您指定加载库的位置。默认情况下,它指向 d1fxtkz8shb9d2.cloudfront.net/sockjs-0.3.4.min.js。但是,您也可以将其设置为指向应用程序提供的 URL。请注意,可以指定相对 URL,在这种情况下,URL 必须相对于 iframe URL。例如,假设 SockJS 端点映射到 /sockjs,并且生成的 iframe URL 为 /sockjs/iframe.html,则相对 URL 必须以 "../../" 开头,以便向上遍历到 SockJS 映射上方的位置。对于基于前缀的 Servlet 映射,您可能需要再向上遍历一次。
11 通过单个 HTTP 流请求可以发送的最小字节数,超过此字节数后请求将关闭。默认为 128K(即 128*1024 或 131072 字节)。
12 来自 SockJs /info 端点的响应中的 cookie_needed 值。此属性指示应用程序是否需要 JSESSIONID cookie 才能正常运行(例如,用于负载均衡或在 Java Servlet 容器中使用 HTTP 会话)。
13 服务器在没有发送任何消息的情况下,经过多长时间(以毫秒为单位)后应该向客户端发送心跳帧,以防止连接断开。默认值为 25,000(25 秒)。
14 在客户端没有接收连接(即服务器可以向客户端发送数据的活动连接)后,经过多长时间(以毫秒为单位)才将其视为断开连接。默认值为 5000
15 会话在等待客户端的下一个 HTTP 轮询请求时可以缓存多少个服务器到客户端的消息。默认大小为 100
16 某些负载均衡器不支持 WebSockets。将此选项设置为 false 以禁用服务器端 WebSocket 传输。默认值为 true
17 TaskScheduler bean 的引用。如果未提供值,则会创建一个新的 ThreadPoolTaskScheduler 实例。此调度程序实例用于调度心跳消息。
18 用于编码和解码 SockJS 消息的 SockJsMessageCodec bean 的引用。默认情况下,使用 Jackson2SockJsMessageCodec,这需要在类路径上存在 Jackson 库。
19 TransportHandler bean 引用的列表。
20 是否禁用为 SockJS 请求自动添加 CORS 标头。默认值为 false

<int-websocket:outbound-channel-adapter> 属性

以下列表显示了 <int-websocket:outbound-channel-adapter> 元素可用的属性

<int-websocket:outbound-channel-adapter
                          id=""                             (1)
                          channel=""                        (2)
                          container=""                      (3)
                          default-protocol-handler=""       (4)
                          protocol-handlers=""              (5)
                          message-converters=""             (6)
                          merge-with-default-converters=""  (7)
                          auto-startup=""                   (8)
                          phase=""/>                        (9)
1 组件 bean 名称。如果您未提供 channel 属性,则会创建一个 DirectChannel,并使用此 id 属性作为 bean 名称在应用程序上下文中注册。在这种情况下,端点将使用 bean 名称 id 加上 .adapter 注册。并且 MessageHandler 将使用 bean 别名 id 加上 .handler 注册。
2 标识附加到此适配器的通道。
3 IntegrationWebSocketContainer bean 的引用,该 bean 封装了低级连接和 WebSocketSession 处理操作。必需。
4 SubProtocolHandler 实例的可选引用。当客户端未请求子协议或它是一个单一协议处理程序时使用。如果没有提供此引用或 protocol-handlers 列表,则默认使用 PassThruSubProtocolHandler
5 此通道适配器的 SubProtocolHandler bean 引用的列表。如果您只提供单个 bean 引用并且未提供 default-protocol-handler,则该单个 SubProtocolHandler 将用作 default-protocol-handler。如果您未设置此属性或 default-protocol-handler,则默认使用 PassThruSubProtocolHandler
6 此通道适配器的 MessageConverter bean 引用的列表。
7 布尔值,指示是否应在任何自定义转换器之后注册默认转换器。仅当提供 message-converters 时才使用此标志。否则,将注册所有默认转换器。默认为 false。默认转换器(按顺序)为:StringMessageConverterByteArrayMessageConverterMappingJackson2MessageConverter(如果类路径上存在 Jackson 库)。
8 布尔值,指示此端点是否应自动启动。默认为 true
9 此端点应启动和停止的生命周期阶段。值越低,此端点启动越早,停止越晚。默认为 Integer.MIN_VALUE。值可以为负数。请参阅 SmartLifeCycle

<int-websocket:inbound-channel-adapter> 属性

以下列表显示了 <int-websocket:outbound-channel-adapter> 元素可用的属性

<int-websocket:inbound-channel-adapter
                            id=""  (1)
                            channel=""  (2)
                            error-channel=""  (3)
                            container=""  (4)
                            default-protocol-handler=""  (5)
                            protocol-handlers=""  (6)
                            message-converters=""  (7)
                            merge-with-default-converters=""  (8)
                            send-timeout=""  (9)
                            payload-type=""  (10)
                            use-broker=""  (11)
                            auto-startup=""  (12)
                            phase=""/>  (13)
1 组件 bean 名称。如果您未设置 channel 属性,则会创建一个 DirectChannel,并使用此 id 属性作为 bean 名称在应用程序上下文中注册。在这种情况下,端点将使用 bean 名称 id 加上 .adapter 注册。
2 标识附加到此适配器的通道。
3 MessageChannel bean 的引用,应将 ErrorMessage 实例发送到该引用。
4 请参阅 <int-websocket:outbound-channel-adapter> 上的相同选项。
5 请参阅 <int-websocket:outbound-channel-adapter> 上的相同选项。
6 请参阅 <int-websocket:outbound-channel-adapter> 上的相同选项。
7 请参阅 <int-websocket:outbound-channel-adapter> 上的相同选项。
8 请参阅 <int-websocket:outbound-channel-adapter> 上的相同选项。
9 如果通道可以阻塞,则在将消息发送到通道时要等待的最长时间(以毫秒为单位)。例如,如果 QueueChannel 的最大容量已达到,则它可能会阻塞,直到有可用空间为止。
10 要从传入的 WebSocketMessage 转换的目标 payload 的 Java 类型的完全限定名称。默认为 java.lang.String
11 指示此适配器是否将 non-MESSAGE WebSocketMessage 实例和具有代理目标的消息发送到应用程序上下文中的 AbstractBrokerMessageHandler。当此属性为 true 时,需要 Broker Relay 配置。此属性仅在服务器端使用。在客户端上,它会被忽略。默认为 false
12 请参阅 <int-websocket:outbound-channel-adapter> 上的相同选项。
13 请参阅 <int-websocket:outbound-channel-adapter> 上的相同选项。

使用 ClientStompEncoder

从 4.3.13 版本开始,Spring Integration 提供了 ClientStompEncoder(作为标准 StompEncoder 的扩展),用于 WebSocket 通道适配器的客户端。为了正确准备客户端消息,必须将 ClientStompEncoder 的实例注入 StompSubProtocolHandler。默认 StompSubProtocolHandler 的一个问题是它设计用于服务器端,因此它将 SEND stompCommand 标头更新为 MESSAGE(如 STOMP 协议对服务器端的要求)。如果客户端未在其适当的 SEND web socket 帧中发送其消息,则某些 STOMP 代理不会接受它们。在这种情况下,ClientStompEncoder 的目的是覆盖 stompCommand 标头,并在将消息编码为 byte[] 之前将其设置为 SEND 值。

动态 WebSocket 端点注册

从 5.5 版本开始,WebSocket 服务器端点(基于 ServerWebSocketContainer 的通道适配器)现在可以在运行时注册(和删除) - ServerWebSocketContainer 映射的 paths 通过 HandlerMapping 公开到 DispatcherServlet 中,并且 WebSocket 客户端可以访问。 动态和运行时集成流 支持有助于以透明的方式注册这些端点

@Autowired
IntegrationFlowContext integrationFlowContext;

@Autowired
HandshakeHandler handshakeHandler;
...
ServerWebSocketContainer serverWebSocketContainer =
       new ServerWebSocketContainer("/dynamic")
               .setHandshakeHandler(this.handshakeHandler);

WebSocketInboundChannelAdapter webSocketInboundChannelAdapter =
       new WebSocketInboundChannelAdapter(serverWebSocketContainer);

QueueChannel dynamicRequestsChannel = new QueueChannel();

IntegrationFlow serverFlow =
       IntegrationFlow.from(webSocketInboundChannelAdapter)
               .channel(dynamicRequestsChannel)
               .get();

IntegrationFlowContext.IntegrationFlowRegistration dynamicServerFlow =
       this.integrationFlowContext.registration(serverFlow)
               .addBean(serverWebSocketContainer)
               .register();
...
dynamicServerFlow.destroy();
在动态流注册上调用 .addBean(serverWebSocketContainer) 以将 ServerWebSocketContainer 的实例添加到 ApplicationContext 中以进行端点注册非常重要。当动态流注册被销毁时,关联的 ServerWebSocketContainer 实例也会被销毁,以及相应的端点注册,包括 URL 路径映射。
动态 Websocket 端点只能通过 Spring Integration 机制注册:当使用常规 Spring @EnableWebsocket 时,Spring Integration 配置会退避,并且不会注册动态端点的基础结构。