消息桥接

消息桥是一个相对简单的端点,用于连接两个消息通道或通道适配器。例如,您可能希望将一个PollableChannel连接到一个SubscribableChannel,以便订阅端点不必担心任何轮询配置。相反,消息桥提供了轮询配置。

通过在两个通道之间提供一个中间轮询器,您可以使用消息桥来限制入站消息。轮询器的触发器决定消息到达第二个通道的速率,并且轮询器的maxMessagesPerPoll属性强制执行吞吐量限制。

消息桥的另一个有效用途是连接两个不同的系统。在这种情况下,Spring 集成的作用仅限于在这些系统之间建立连接并在必要时管理轮询器。在两个系统之间至少有一个转换器来转换它们的格式可能更常见。在这种情况下,通道可以作为转换器端点的“输入通道”和“输出通道”提供。如果不需要数据格式转换,消息桥可能确实足够了。

使用 XML 配置桥

您可以使用<bridge>元素在两个消息通道或通道适配器之间创建消息桥。为此,请提供input-channeloutput-channel属性,如下例所示

<int:bridge input-channel="input" output-channel="output"/>

如上所述,消息桥的一个常见用例是将PollableChannel连接到SubscribableChannel。在执行此角色时,消息桥也可以用作节流器

<int:bridge input-channel="pollable" output-channel="subscribable">
     <int:poller max-messages-per-poll="10" fixed-rate="5000"/>
 </int:bridge>

您可以使用类似的机制来连接通道适配器。以下示例显示了Spring 集成的stream命名空间中stdinstdout适配器之间的一个简单的“回显”。

<int-stream:stdin-channel-adapter id="stdin"/>

 <int-stream:stdout-channel-adapter id="stdout"/>

 <int:bridge id="echo" input-channel="stdin" output-channel="stdout"/>

类似的配置适用于其他(可能更有用)的通道适配器桥,例如文件到JMS或邮件到文件。后续章节将介绍各种通道适配器。

如果在桥上未定义“输出通道”,则使用入站消息提供的回复通道(如果可用)。如果既没有输出通道也没有回复通道,则会抛出异常。

使用 Java 配置配置桥

以下示例显示了如何使用@BridgeFrom注释在 Java 中配置桥

@Bean
public PollableChannel polled() {
    return new QueueChannel();
}

@Bean
@BridgeFrom(value = "polled", poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "10"))
public SubscribableChannel direct() {
    return new DirectChannel();
}

以下示例显示了如何使用@BridgeTo注释在 Java 中配置桥

@Bean
@BridgeTo(value = "direct", poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "10"))
public PollableChannel polled() {
    return new QueueChannel();
}

@Bean
public SubscribableChannel direct() {
    return new DirectChannel();
}

或者,您可以使用BridgeHandler,如下例所示

@Bean
@ServiceActivator(inputChannel = "polled",
        poller = @Poller(fixedRate = "5000", maxMessagesPerPoll = "10"))
public BridgeHandler bridge() {
    BridgeHandler bridge = new BridgeHandler();
    bridge.setOutputChannelName("direct");
    return bridge;
}

使用 Java DSL 配置桥

您可以使用 Java 领域特定语言 (DSL) 配置桥,如下例所示

@Bean
public IntegrationFlow bridgeFlow() {
    return IntegrationFlow.from("polled")
            .bridge(e -> e.poller(Pollers.fixedDelay(5000).maxMessagesPerPoll(10)))
            .channel("direct")
            .get();
}