STOMP 客户端
Spring 提供了一个基于 WebSocket 的 STOMP 客户端和一个基于 TCP 的 STOMP 客户端。
首先,您可以创建和配置 WebSocketStompClient
,如下例所示
WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setMessageConverter(new StringMessageConverter());
stompClient.setTaskScheduler(taskScheduler); // for heartbeats
在前面的示例中,您可以用 SockJsClient
替换 StandardWebSocketClient
,因为它也是 WebSocketClient
的实现。SockJsClient
可以使用 WebSocket 或基于 HTTP 的传输作为回退。有关更多详细信息,请参阅 SockJsClient
。
接下来,您可以建立连接并为 STOMP 会话提供一个处理程序,如下例所示
String url = "ws://127.0.0.1:8080/endpoint";
StompSessionHandler sessionHandler = new MyStompSessionHandler();
stompClient.connect(url, sessionHandler);
当会话准备好使用时,会通知处理程序,如下例所示
public class MyStompSessionHandler extends StompSessionHandlerAdapter {
@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
// ...
}
}
建立会话后,可以发送任何有效负载,并使用配置的 MessageConverter
进行序列化,如下例所示
session.send("/topic/something", "payload");
您还可以订阅目标。subscribe
方法需要一个订阅消息的处理程序,并返回一个 Subscription
句柄,您可以使用它来取消订阅。对于每条接收到的消息,处理程序可以指定有效负载应反序列化的目标 Object
类型,如下例所示
session.subscribe("/topic/something", new StompFrameHandler() {
@Override
public Type getPayloadType(StompHeaders headers) {
return String.class;
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
// ...
}
});
要启用 STOMP 心跳,您可以使用 TaskScheduler
配置 WebSocketStompClient
,并可以选择自定义心跳间隔(写入空闲 10 秒,这会导致发送心跳,读取空闲 10 秒,这会关闭连接)。
WebSocketStompClient
仅在空闲时发送心跳,即当没有发送其他消息时。当使用外部代理时,这可能会带来挑战,因为具有非代理目标的消息表示活动,但实际上不会转发到代理。在这种情况下,您可以在初始化 外部代理 时配置 TaskScheduler
,这可以确保即使仅发送具有非代理目标的消息时,心跳也会转发到代理。
当您使用 WebSocketStompClient 进行性能测试以模拟来自同一台机器的数千个客户端时,请考虑关闭心跳,因为每个连接都会安排其自己的心跳任务,这对于在同一台机器上运行的大量客户端来说并不是最佳的。 |
STOMP 协议还支持回执,其中客户端必须添加一个 receipt
标头,服务器在处理发送或订阅后使用 RECEIPT 帧进行响应。为了支持这一点,StompSession
提供了 setAutoReceipt(boolean)
,它会在每个后续发送或订阅事件中添加 receipt
标头。或者,您也可以手动将回执标头添加到 StompHeaders
中。发送和订阅都返回 Receiptable
的实例,您可以使用它来注册回执成功和失败回调。对于此功能,您必须使用 TaskScheduler
配置客户端,以及回执过期之前的时间(默认为 15 秒)。
请注意,StompSessionHandler
本身是 StompFrameHandler
,这使得它除了处理消息处理异常的 handleException
回调和处理传输级错误(包括 ConnectionLostException
)的 handleTransportError
回调之外,还可以处理 ERROR 帧。
您可以使用 WebSocketStompClient
的 inboundMessageSizeLimit
和 outboundMessageSizeLimit
属性来限制入站和出站 WebSocket 消息的最大大小。当出站 STOMP 消息超过限制时,它会被拆分为部分帧,接收方必须重新组装这些帧。默认情况下,出站消息没有大小限制。当入站 STOMP 消息大小超过配置的限制时,会抛出 StompConversionException
。入站消息的默认大小限制为 64KB
。
WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setInboundMessageSizeLimit(64 * 1024); // 64KB
stompClient.setOutboundMessageSizeLimit(64 * 1024); // 64KB