操作符 gateway()

IntegrationFlow定义中的gateway()操作符是一个特殊的服务激活器实现,用于通过其输入通道调用其他端点或集成流并等待回复。从技术上讲,它与<chain>定义中的嵌套<gateway>组件扮演相同角色(参见从链内调用链),并允许流程更简洁、更直接。从逻辑上和业务角度来看,它是一个消息网关,允许在目标集成解决方案的不同部分之间分发和重用功能(参见消息网关)。此操作符有几个针对不同目标的重载

  • gateway(String requestChannel) 通过名称向某个端点的输入通道发送消息;

  • gateway(MessageChannel requestChannel) 通过直接注入向某个端点的输入通道发送消息;

  • gateway(IntegrationFlow flow) 向提供的IntegrationFlow的输入通道发送消息。

所有这些都有一个带有第二个Consumer<GatewayEndpointSpec>参数的变体,用于配置目标GatewayMessageHandler和相应的AbstractEndpoint。此外,基于IntegrationFlow的方法允许调用现有的IntegrationFlowbean,或者通过IntegrationFlow函数接口的内联lambda声明流为子流,或者将其提取到private方法中以获得更清晰的代码风格。

@Bean
IntegrationFlow someFlow() {
        return IntegrationFlow
                .from(...)
                .gateway(subFlow())
                .handle(...)
                .get();
}

private static IntegrationFlow subFlow() {
        return f -> f
                .scatterGather(s -> s.recipientFlow(...),
                        g -> g.outputProcessor(MessageGroup::getOne))
}
如果下游流程并不总是返回回复,则应将requestTimeout设置为0,以防止调用线程无限期挂起。在这种情况下,流程将在此处结束,并释放线程以进行进一步的工作。