线程屏障

有时,我们需要暂停消息流线程,直到发生其他异步事件。例如,考虑一个将消息发布到RabbitMQ的HTTP请求。我们可能希望在RabbitMQ代理发出消息已收到的确认之前,不回复用户。

在版本4.2中,Spring Integration为此目的引入了<barrier/>组件。底层的MessageHandlerBarrierMessageHandler。该类还实现了MessageTriggerAction,其中传递给trigger()方法的消息会释放handleRequestMessage()方法中相应的线程(如果存在)。

通过对消息调用CorrelationStrategy来关联挂起线程和触发线程。当消息发送到input-channel时,线程将挂起长达requestTimeout毫秒,等待相应的触发消息。默认的关联策略使用IntegrationMessageHeaderAccessor.CORRELATION_ID头。当具有相同关联的触发消息到达时,线程被释放。释放后发送到output-channel的消息是使用MessageGroupProcessor构建的。默认情况下,该消息是一个包含两个有效载荷的Collection<?>,并且头文件通过DefaultAggregatingMessageGroupProcessor进行合并。

如果trigger()方法首先被调用(或在主线程超时后),它将被挂起长达triggerTimeout,等待挂起消息到达。如果您不想挂起触发线程,请考虑将其交给TaskExecutor,以便其线程被挂起。
在5.4版本之前,请求和触发消息只有一个timeout选项,但在某些用例中,为这些操作设置不同的超时时间会更好。因此,引入了requestTimeouttriggerTimeout选项。

requires-reply属性确定在触发消息到达之前,如果挂起线程超时,要采取的操作。默认情况下,它为false,这意味着端点返回null,流程结束,线程返回给调用者。当为true时,会抛出ReplyRequiredException

您可以编程方式调用trigger()方法(使用名称barrier.handler获取bean引用——其中barrier是屏障端点的bean名称)。或者,您可以配置一个<outbound-channel-adapter/>来触发释放。

只有一个线程可以与相同的关联一起挂起。相同的关联可以多次使用,但不能同时使用。如果第二个线程以相同的关联到达,则会抛出异常。

以下示例演示了如何使用自定义头进行关联

  • Java

  • XML

@ServiceActivator(inputChannel="in")
@Bean
public BarrierMessageHandler barrier(MessageChannel out, MessageChannel lateTriggerChannel) {
    BarrierMessageHandler barrier = new BarrierMessageHandler(10000);
    barrier.setOutputChannel(out());
    barrier.setDiscardChannel(lateTriggerChannel);
    return barrier;
}

@ServiceActivator (inputChannel="release")
@Bean
public MessageHandler releaser(MessageTriggerAction barrier) {
    return barrier::trigger;
}
<int:barrier id="barrier1" input-channel="in" output-channel="out"
        correlation-strategy-expression="headers['myHeader']"
        output-processor="myOutputProcessor"
        discard-channel="lateTriggerChannel"
        timeout="10000">
</int:barrier>

<int:outbound-channel-adapter channel="release" ref="barrier1.handler" method="trigger" />

根据哪个消息先到达,发送消息到in的线程或发送消息到release的线程将等待长达十秒,直到另一个消息到达。当消息被释放时,out通道会发送一个消息,该消息结合了调用自定义MessageGroupProcessor bean(名为myOutputProcessor)的结果。如果主线程超时,并且触发器稍后到达,您可以配置一个丢弃通道,将延迟触发器发送到该通道。如果请求消息未能及时到达,触发消息也会被丢弃。

有关此组件的示例,请参阅屏障示例应用程序

© . This site is unofficial and not affiliated with VMware.