ZeroMQ 支持
Spring Integration 提供了组件来支持应用程序中的 ZeroMQ 通信。该实现基于 JeroMQ 库中受良好支持的 Java API。所有组件都封装了 ZeroMQ 套接字生命周期,并为其内部管理线程,使这些组件的交互无锁且线程安全。
项目需要此依赖项
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-zeromq</artifactId>
<version>7.0.0</version>
</dependency>
compile "org.springframework.integration:spring-integration-zeromq:7.0.0"
ZeroMQ 代理
ZeroMqProxy 是内置 ZMQ.proxy() 函数 的 Spring 友好包装器。它封装了套接字生命周期和线程管理。此代理的客户端仍然可以使用标准的 ZeroMQ 套接字连接和交互 API。除了标准的 ZContext,它还需要一个众所周知的 ZeroMQ 代理模式:SUB/PUB、PULL/PUSH 或 ROUTER/DEALER。这样,将使用一对适当的 ZeroMQ 套接字类型用于代理的前端和后端。详见 ZeroMqProxy.Type。
ZeroMqProxy 实现了 SmartLifecycle,用于创建、绑定和配置套接字,并在专用的线程中从 Executor(如果有)启动 ZMQ.proxy()。前端和后端套接字的绑定通过 tcp:// 协议在所有可用的网络接口上使用提供的端口完成。否则,它们将绑定到随机端口,以后可以通过相应的 getFrontendPort() 和 getBackendPort() API 方法获取。
控制套接字作为 SocketType.PAIR 通过 "inproc://" + beanName + ".control" 地址的线程间传输公开;可以通过 getControlAddress() 获取。它应该与同一应用程序中的另一个 SocketType.PAIR 套接字一起使用,以发送 ZMQ.PROXY_TERMINATE、ZMQ.PROXY_PAUSE 和/或 ZMQ.PROXY_RESUME 命令。当为其生命周期调用 stop() 时,ZeroMqProxy 会执行 ZMQ.PROXY_TERMINATE 命令,以终止 ZMQ.proxy() 循环并优雅地关闭所有绑定的套接字。
setExposeCaptureSocket(boolean) 选项导致此组件绑定一个额外的线程间套接字,类型为 SocketType.PUB,用于捕获和发布前端和后端套接字之间的所有通信,正如 ZMQ.proxy() 实现所述。此套接字绑定到 "inproc://" + beanName + ".capture" 地址,不期望任何特定的订阅进行过滤。
前端和后端套接字可以通过额外的属性进行自定义,例如读/写超时或安全性。此自定义分别通过 setFrontendSocketConfigurer(Consumer<ZMQ.Socket>) 和 setBackendSocketConfigurer(Consumer<ZMQ.Socket>) 回调提供。
ZeroMqProxy 可以作为简单的 Bean 提供,如下所示
@Bean
ZeroMqProxy zeroMqProxy() {
ZeroMqProxy proxy = new ZeroMqProxy(CONTEXT, ZeroMqProxy.Type.SUB_PUB);
proxy.setExposeCaptureSocket(true);
proxy.setFrontendPort(6001);
proxy.setBackendPort(6002);
return proxy;
}
所有客户端节点都应该通过 tcp:// 连接到此代理的主机,并使用其感兴趣的相应端口。
ZeroMQ 消息通道
ZeroMqChannel 是一个 SubscribableChannel,它使用一对 ZeroMQ 套接字连接发布者和订阅者进行消息交互。它可以在 PUB/SUB 模式下工作(默认为 PUSH/PULL);它也可以用作本地线程间通道(使用 PAIR 套接字)——在这种情况下不提供 connectUrl。在分布式模式下,它必须连接到外部管理的 ZeroMQ 代理,在那里它可以与其他连接到同一代理的类似通道交换消息。连接 url 选项是一个标准的 ZeroMQ 连接字符串,包含协议和主机,以及一对用于 ZeroMQ 代理前端和后端套接字的端口(用冒号分隔)。为方便起见,如果通道与代理配置在同一个应用程序中,则可以提供 ZeroMqProxy 实例而不是连接字符串。
发送和接收套接字都在各自的专用线程中管理,使该通道具有并发友好性。这样,我们就可以在不同的线程中向/从 ZeroMqChannel 发布和消费,而无需同步。
默认情况下,ZeroMqChannel 使用 EmbeddedHeadersJsonMessageMapper 通过 Jackson JSON 处理器将 Message(包括头)序列化/反序列化为 byte[]。此逻辑可以通过 setMessageMapper(BytesMessageMapper) 进行配置。
发送和接收套接字可以通过各自的 setSendSocketConfigurer(Consumer<ZMQ.Socket>) 和 setSubscribeSocketConfigurer(Consumer<ZMQ.Socket>) 回调进行任何选项(读/写超时、安全性等)的自定义。
ZeroMqChannel 的内部逻辑基于 Project Reactor Flux 和 Mono 操作符的反应式流。这提供了更简单的线程控制,并允许对通道进行无锁并发发布和消费。本地 PUB/SUB 逻辑实现为 Flux.publish() 操作符,以允许该通道的所有本地订阅者接收相同的已发布消息,就像分布式订阅者接收 PUB 套接字的消息一样。
以下是 ZeroMqChannel 配置的一个简单示例
@Bean
ZeroMqChannel zeroMqPubSubChannel(ZContext context) {
ZeroMqChannel channel = new ZeroMqChannel(context, true);
channel.setConnectUrl("tcp://:6001:6002");
channel.setConsumeDelay(Duration.ofMillis(100));
return channel;
}
ZeroMQ 入站通道适配器
ZeroMqMessageProducer 是一个具有响应式语义的 MessageProducerSupport 实现。它以非阻塞方式不断从 ZeroMQ 套接字读取数据,并将消息发布到无限的 Flux,该 Flux 由 FluxMessageChannel 订阅,或者在 start() 方法中显式订阅(如果输出通道不是响应式的)。当套接字上未收到数据时,在下一次读取尝试之前会应用一个 consumeDelay(默认为 1 秒)。
ZeroMqMessageProducer 仅支持 SocketType.PAIR、SocketType.PULL 和 SocketType.SUB。此组件可以连接到远程套接字,也可以绑定到 TCP 协议,并使用提供的或随机端口。在组件启动且 ZeroMQ 套接字绑定后,可以通过 getBoundPort() 获取实际端口。套接字选项(例如安全性或写入超时)可以通过 setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer) 回调进行配置。
如果 receiveRaw 选项设置为 true,从套接字消费的 ZMsg 将按原样作为生成 Message 的 payload 发送:解析和转换 ZMsg 的工作将由下游流完成。否则,将使用 InboundMessageMapper 将消费的数据转换为 Message。如果收到的 ZMsg 是多帧的,则第一帧被视为此 ZeroMQ 消息发布到的 ZeroMqHeaders.TOPIC 头部。
如果 unwrapTopic 选项设置为 false,则传入消息被认为由两帧组成:主题和 ZeroMQ 消息。否则,默认情况下,ZMsg 被认为由三帧组成:第一帧包含主题,最后一帧包含消息,中间有一个空帧。
对于 SocketType.SUB,ZeroMqMessageProducer 使用提供的 topics 选项进行订阅;默认为订阅所有。订阅可以在运行时使用 subscribeToTopics() 和 unsubscribeFromTopics() @ManagedOperation 进行调整。
以下是 ZeroMqMessageProducer 配置的示例
@Bean
ZeroMqMessageProducer zeroMqMessageProducer(ZContext context, MessageChannel outputChannel) {
ZeroMqMessageProducer messageProducer = new ZeroMqMessageProducer(context, SocketType.SUB);
messageProducer.setOutputChannel(outputChannel);
messageProducer.setTopics("some");
messageProducer.setReceiveRaw(true);
messageProducer.setBindPort(7070);
messageProducer.setConsumeDelay(Duration.ofMillis(100));
return messageProducer;
}
ZeroMQ 出站通道适配器
ZeroMqMessageHandler 是一个 ReactiveMessageHandler 实现,用于将发布消息发送到 ZeroMQ 套接字。仅支持 SocketType.PAIR、SocketType.PUSH 和 SocketType.PUB。此组件可以连接到远程套接字,也可以绑定到 TCP 协议,并使用提供的或随机端口。在组件启动且 ZeroMQ 套接字绑定后,可以通过 getBoundPort() 获取实际端口。
当使用 SocketType.PUB 时,将根据请求消息评估 topicExpression,以将主题帧注入 ZeroMQ 消息中(如果它不为空)。订阅者端(SocketType.SUB)必须首先接收主题帧,然后才能解析实际数据。
如果 wrapTopic 选项设置为 false,则 ZeroMQ 消息帧将在注入的主题之后发送(如果存在)。默认情况下,主题和消息之间会发送一个额外的空帧。
当请求消息的有效载荷是 ZMsg 时,不执行任何转换或主题提取:ZMsg 将按原样发送到套接字,并且不会销毁以供将来可能重用。否则,将使用 OutboundMessageMapper<byte[]> 将请求消息(或仅其有效载荷)转换为要发布的 ZeroMQ 帧。默认情况下,使用附带 ConfigurableCompositeMessageConverter 的 ConvertingBytesMessageMapper。套接字选项(例如安全性或写入超时)可以通过 setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer) 回调进行配置。
以下是连接到套接字的 ZeroMqMessageHandler 配置示例
@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
ZeroMqMessageHandler messageHandler =
new ZeroMqMessageHandler(context, "tcp://:6060", SocketType.PUB);
messageHandler.setTopicExpression(
new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
messageHandler.setMessageMapper(new EmbeddedHeadersJsonMessageMapper());
}
以下是绑定到指定端口的 ZeroMqMessageHandler 配置示例
@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
ZeroMqMessageHandler messageHandler =
new ZeroMqMessageHandler(context, 7070, SocketType.PUB);
messageHandler.setTopicExpression(
new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
messageHandler.setMessageMapper(new EmbeddedHeadersJsonMessageMapper());
}
ZeroMQ Java DSL 支持
spring-integration-zeromq 通过 ZeroMq 工厂和上述组件的 IntegrationComponentSpec 实现提供了方便的 Java DSL 流式 API。
这是 ZeroMqChannel 的 Java DSL 示例
.channel(ZeroMq.zeroMqChannel(this.context)
.connectUrl("tcp://:6001:6002")
.consumeDelay(Duration.ofMillis(100)))
}
ZeroMQ Java DSL 的入站通道适配器是
IntegrationFlow.from(
ZeroMq.inboundChannelAdapter(this.context, SocketType.SUB)
.connectUrl("tcp://:9000")
.topics("someTopic")
.receiveRaw(true)
.consumeDelay(Duration.ofMillis(100)))
}
ZeroMQ Java DSL 的出站通道适配器是
.handle(ZeroMq.outboundChannelAdapter(this.context, "tcp://:9001", SocketType.PUB)
.topicFunction(message -> message.getHeaders().get("myTopic")))
}