STOMP 客户端

Spring 提供了 STOMP over WebSocket 客户端和 STOMP over TCP 客户端。

首先,您可以创建和配置 WebSocketStompClient,如下例所示

WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setMessageConverter(new StringMessageConverter());
stompClient.setTaskScheduler(taskScheduler); // for heartbeats

在上面的示例中,您可以用 SockJsClient 替换 StandardWebSocketClient,因为它也是 WebSocketClient 的实现。SockJsClient 可以使用 WebSocket 或 HTTP 作为回退传输。有关更多详细信息,请参阅 SockJsClient

接下来,您可以建立连接并为 STOMP 会话提供处理程序,如下例所示

String url = "ws://127.0.0.1:8080/endpoint";
StompSessionHandler sessionHandler = new MyStompSessionHandler();
stompClient.connect(url, sessionHandler);

当会话准备就绪可以使用时,会通知处理程序,如下例所示

public class MyStompSessionHandler extends StompSessionHandlerAdapter {

	@Override
	public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
		// ...
	}
}

会话建立后,可以发送任何有效负载,并使用配置的MessageConverter进行序列化,如下例所示

session.send("/topic/something", "payload");

您也可以订阅目标。subscribe方法需要一个订阅消息的处理程序,并返回一个Subscription句柄,您可以使用它来取消订阅。对于每个接收到的消息,处理程序可以指定要将有效负载反序列化的目标Object类型,如下例所示

session.subscribe("/topic/something", new StompFrameHandler() {

	@Override
	public Type getPayloadType(StompHeaders headers) {
		return String.class;
	}

	@Override
	public void handleFrame(StompHeaders headers, Object payload) {
		// ...
	}

});

要启用 STOMP 心跳,您可以使用TaskScheduler配置WebSocketStompClient,并可选地自定义心跳间隔(写入不活动 10 秒,这会导致发送心跳,以及读取不活动 10 秒,这会导致关闭连接)。

WebSocketStompClient 仅在不活动的情况下发送心跳,即当没有发送其他消息时。当使用外部代理时,这可能会带来挑战,因为具有非代理目标的消息代表活动,但实际上并未转发到代理。在这种情况下,您可以在初始化外部代理时配置TaskScheduler,这将确保即使仅发送具有非代理目标的消息,心跳也会转发到代理。

当您使用WebSocketStompClient 进行性能测试以模拟来自同一台机器的数千个客户端时,请考虑关闭心跳,因为每个连接都会安排自己的心跳任务,这对于在同一台机器上运行的大量客户端来说不是最佳的。

STOMP 协议还支持回执,其中客户端必须添加一个receipt标头,服务器在处理发送或订阅后使用 RECEIPT 帧进行响应。为了支持这一点,StompSession 提供了setAutoReceipt(boolean),它会导致在每个后续发送或订阅事件中添加receipt标头。或者,您也可以手动将回执标头添加到StompHeaders 中。发送和订阅都返回一个Receiptable实例,您可以使用它来注册回执成功和失败回调。对于此功能,您必须使用TaskScheduler 和回执过期前的时间量(默认情况下为 15 秒)来配置客户端。

请注意,StompSessionHandler 本身是一个StompFrameHandler,它允许它除了处理消息处理的异常的handleException回调和传输级错误(包括ConnectionLostException)的handleTransportError回调之外,还可以处理 ERROR 帧。