ZeroMQ 支持

Spring Integration 提供组件以支持应用程序中的 ZeroMQ 通信。该实现基于 JeroMQ 库得到良好支持的 Java API。所有组件都封装了 ZeroMQ 套接字生命周期并为其内部管理线程,从而使与这些组件的交互无锁且线程安全。

您需要将此依赖项包含到您的项目中

  • Maven

  • Gradle

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-zeromq</artifactId>
    <version>6.3.5</version>
</dependency>
compile "org.springframework.integration:spring-integration-zeromq:6.3.5"

ZeroMQ 代理

ZeroMqProxy 是内置 ZMQ.proxy() 函数 的 Spring 友好包装器。它封装了套接字生命周期和线程管理。此代理的客户端仍然可以使用标准 ZeroMQ 套接字连接和交互 API。与标准 ZContext 一起,它需要众所周知的 ZeroMQ 代理模式之一:SUB/PUB、PULL/PUSH 或 ROUTER/DEALER。这样,适当的一对 ZeroMQ 套接字类型将用于代理的前端和后端。有关详细信息,请参见 ZeroMqProxy.Type

ZeroMqProxy 实现 SmartLifecycle 以创建、绑定和配置套接字,并从 Executor(如果存在)中的专用线程启动 ZMQ.proxy()。前端和后端套接字的绑定通过 tcp:// 协议在所有可用的网络接口上使用提供的端口进行。否则,它们将绑定到稍后可以通过相应的 getFrontendPort()getBackendPort() API 方法获得的随机端口。

控制套接字作为 SocketType.PAIR 公开,在 "inproc://" + beanName + ".control" 地址上进行线程间传输;可以通过 getControlAddress() 获取它。它应该与来自另一个 SocketType.PAIR 套接字的相同应用程序一起使用,以发送 ZMQ.PROXY_TERMINATEZMQ.PROXY_PAUSE 和/或 ZMQ.PROXY_RESUME 命令。当为其生命周期调用 stop() 以终止 ZMQ.proxy() 循环并优雅地关闭所有已绑定套接字时,ZeroMqProxy 执行 ZMQ.PROXY_TERMINATE 命令。

setExposeCaptureSocket(boolean) 选项导致此组件绑定一个额外的具有 SocketType.PUB 的线程间套接字,以捕获和发布前端和后端套接字之间的所有通信,因为它与 ZMQ.proxy() 实现一起声明。此套接字绑定到 "inproc://" + beanName + ".capture" 地址,并且不期望任何特定订阅进行过滤。

可以使用其他属性(例如读/写超时或安全性)自定义前端和后端套接字。此自定义可分别通过 setFrontendSocketConfigurer(Consumer<ZMQ.Socket>)setBackendSocketConfigurer(Consumer<ZMQ.Socket>) 回调实现。

ZeroMqProxy 可以作为简单的 bean 提供,如下所示

@Bean
ZeroMqProxy zeroMqProxy() {
    ZeroMqProxy proxy = new ZeroMqProxy(CONTEXT, ZeroMqProxy.Type.SUB_PUB);
    proxy.setExposeCaptureSocket(true);
    proxy.setFrontendPort(6001);
    proxy.setBackendPort(6002);
    return proxy;
}

所有客户端节点都应通过 tcp:// 连接到此代理的主机,并使用它们感兴趣的相应端口。

ZeroMQ 消息通道

ZeroMqChannel 是一个 SubscribableChannel,它使用一对 ZeroMQ 套接字来连接发布者和订阅者以进行消息交互。它可以在 PUB/SUB 模式下工作(默认为 PUSH/PULL);它也可以用作本地线程间通道(使用 PAIR 套接字) - 在这种情况下不提供 connectUrl。在分布式模式下,它必须连接到外部管理的 ZeroMQ 代理,在那里它可以与连接到同一代理的其他类似通道交换消息。连接 url 选项是具有协议和主机以及冒号分隔的前端和后端套接字端口对的标准 ZeroMQ 连接字符串。为方便起见,如果通道在与代理相同的应用程序中配置,则可以使用 ZeroMqProxy 实例代替连接字符串。

发送和接收套接字都在其各自的专用线程中管理,使此通道对并发友好。这样,我们就可以在不同的线程中从 ZeroMqChannel 发布和消费,而无需同步。

默认情况下,ZeroMqChannel 使用 EmbeddedJsonHeadersMessageMapper 使用 Jackson JSON 处理器将 Message(包括标头)从 byte[] (反)序列化。此逻辑可以通过 setMessageMapper(BytesMessageMapper) 进行配置。

可以通过相应的 setSendSocketConfigurer(Consumer<ZMQ.Socket>)setSubscribeSocketConfigurer(Consumer<ZMQ.Socket>) 回调为任何选项(读/写超时、安全性等)自定义发送和接收套接字。

ZeroMqChannel 的内部逻辑基于通过 Project Reactor FluxMono 运算符的反应流。这提供了更轻松的线程控制,并允许对通道进行无锁并发发布和消费。本地 PUB/SUB 逻辑实现为 Flux.publish() 运算符,以允许此通道的所有本地订阅者接收相同的已发布消息,作为对 PUB 套接字的分布式订阅者。

以下是 ZeroMqChannel 配置的简单示例

@Bean
ZeroMqChannel zeroMqPubSubChannel(ZContext context) {
    ZeroMqChannel channel = new ZeroMqChannel(context, true);
    channel.setConnectUrl("tcp://127.0.0.1:6001:6002");
    channel.setConsumeDelay(Duration.ofMillis(100));
    return channel;
}

ZeroMQ 入站通道适配器

ZeroMqMessageProducer 是具有反应式语义的 MessageProducerSupport 实现。它以非阻塞方式不断从 ZeroMQ 套接字读取数据,并将消息发布到一个无限 Flux,该 FluxFluxMessageChannel 订阅,或者如果输出通道不是反应式的,则在 start() 方法中显式订阅。当套接字上没有接收到数据时,将在下一次读取尝试之前应用 consumeDelay(默认为 1 秒)。

ZeroMqMessageProducer 只支持 SocketType.PAIRSocketType.PULLSocketType.SUB。此组件可以连接到远程套接字或使用提供的或随机端口绑定到 TCP 协议。在此组件启动并绑定 ZeroMQ 套接字后,可以通过 getBoundPort() 获取实际端口。可以通过 setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer) 回调配置套接字选项(例如安全性或写入超时)。

如果 receiveRaw 选项设置为 true,则从套接字消耗的 ZMsg 将按原样发送到生成的 Message 的有效负载中:下游流负责解析和转换 ZMsg。否则,将使用 InboundMessageMapper 将消耗的数据转换为 Message。如果接收到的 ZMsg 是多帧的,则第一帧将被视为此 ZeroMQ 消息发布到的 ZeroMqHeaders.TOPIC 标头。

如果 unwrapTopic 选项设置为 false,则传入的消息将被认为包含两帧:主题和 ZeroMQ 消息。否则,默认情况下,ZMsg 被认为包含三帧:第一帧包含主题,最后一帧包含消息,中间有一帧为空。

使用 SocketType.SUB 时,ZeroMqMessageProducer 使用提供的 topics 选项进行订阅;默认为订阅所有。可以使用 subscribeToTopics()unsubscribeFromTopics() @ManagedOperation 在运行时调整订阅。

以下是 ZeroMqMessageProducer 配置示例

@Bean
ZeroMqMessageProducer zeroMqMessageProducer(ZContext context, MessageChannel outputChannel) {
    ZeroMqMessageProducer messageProducer = new ZeroMqMessageProducer(context, SocketType.SUB);
    messageProducer.setOutputChannel(outputChannel);
    messageProducer.setTopics("some");
    messageProducer.setReceiveRaw(true);
    messageProducer.setBindPort(7070);
    messageProducer.setConsumeDelay(Duration.ofMillis(100));
    return messageProducer;
}

ZeroMQ 出站通道适配器

ZeroMqMessageHandler 是一个 ReactiveMessageHandler 实现,用于将发布消息发送到 ZeroMQ 套接字。仅支持 SocketType.PAIRSocketType.PUSHSocketType.PUBZeroMqMessageHandler 仅支持连接 ZeroMQ 套接字;不支持绑定。当使用 SocketType.PUB 时,将针对请求消息评估 topicExpression 以将主题帧注入 ZeroMQ 消息(如果它不为空)。订阅者端 (SocketType.SUB) 必须先接收主题帧,然后才能解析实际数据。

如果 wrapTopic 选项设置为 false,则 ZeroMQ 消息帧将在注入的主题(如果存在)之后发送。默认情况下,在主题和消息之间会发送一个额外的空帧。

当请求消息的有效负载是 ZMsg 时,不会执行转换或主题提取:ZMsg 将按原样发送到套接字,并且不会被销毁以供进一步重用。否则,将使用 OutboundMessageMapper<byte[]> 将请求消息(或仅其有效负载)转换为 ZeroMQ 帧以发布。默认情况下,使用一个 ConvertingBytesMessageMapper,并提供一个 ConfigurableCompositeMessageConverter。可以通过 setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer) 回调配置套接字选项(例如安全性或写入超时)。

以下是 ZeroMqMessageHandler 配置示例

@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
    ZeroMqMessageHandler messageHandler =
                  new ZeroMqMessageHandler(context, "tcp://127.0.0.1:6060", SocketType.PUB);
    messageHandler.setTopicExpression(
                  new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
    messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
}

ZeroMQ Java DSL 支持

spring-integration-zeromq 通过 ZeroMq 工厂和上述组件的 IntegrationComponentSpec 实现提供方便的 Java DSL 流畅 API。

这是 ZeroMqChannel 的 Java DSL 示例

.channel(ZeroMq.zeroMqChannel(this.context)
            .connectUrl("tcp://127.0.0.1:6001:6002")
            .consumeDelay(Duration.ofMillis(100)))
}

ZeroMQ Java DSL 的入站通道适配器是

IntegrationFlow.from(
            ZeroMq.inboundChannelAdapter(this.context, SocketType.SUB)
                        .connectUrl("tcp://127.0.0.1:9000")
                        .topics("someTopic")
                        .receiveRaw(true)
                        .consumeDelay(Duration.ofMillis(100)))
}

ZeroMQ Java DSL 的出站通道适配器是

.handle(ZeroMq.outboundChannelAdapter(this.context, "tcp://127.0.0.1:9001", SocketType.PUB)
                  .topicFunction(message -> message.getHeaders().get("myTopic")))
}