轮询器
本节介绍 Spring Integration 中轮询机制的工作原理。
轮询消费者
当消息端点(通道适配器)连接到通道并实例化时,它们会生成以下实例之一
实际的实现取决于这些端点连接到的通道类型。连接到实现 org.springframework.messaging.SubscribableChannel
接口的通道的通道适配器会生成一个 EventDrivenConsumer
实例。另一方面,连接到实现 org.springframework.messaging.PollableChannel
接口(例如 QueueChannel
)的通道的通道适配器会生成一个 PollingConsumer
实例。
轮询消费者允许 Spring Integration 组件主动轮询消息,而不是以事件驱动的方式处理消息。
它们代表了许多消息传递场景中的关键横切关注点。在 Spring Integration 中,轮询消费者基于同名模式,该模式在 Gregor Hohpe 和 Bobby Woolf 合著的《企业集成模式》一书中有所描述。您可以在 该书的网站 上找到该模式的描述。
有关轮询消费者配置的更多信息,请参阅 消息端点。
可轮询消息源
Spring Integration 提供了轮询消费者模式的第二种变体。当使用入站通道适配器时,这些适配器通常会被 SourcePollingChannelAdapter
包装。例如,当从远程 FTP 服务器位置检索消息时,FTP 入站通道适配器 中描述的适配器会配置一个轮询器来定期检索消息。因此,当组件配置了轮询器时,生成的实例将是以下类型之一
这意味着轮询器在入站和出站消息传递场景中都使用。以下是一些使用轮询器的用例
-
轮询某些外部系统,例如 FTP 服务器、数据库和 Web 服务
-
轮询内部(可轮询)消息通道
-
轮询内部服务(例如重复执行 Java 类上的方法)
AOP 增强类可以应用于轮询器,在一个advice-chain 中,例如一个事务增强来启动事务。从版本 4.1 开始,提供了PollSkipAdvice 。轮询器使用触发器来确定下次轮询的时间。PollSkipAdvice 可用于抑制(跳过)轮询,可能是因为存在一些下游条件会阻止消息被处理。要使用此增强,您必须为其提供一个PollSkipStrategy 的实现。从版本 4.2.5 开始,提供了SimplePollSkipStrategy 。要使用它,您可以将一个实例作为 bean 添加到应用程序上下文,将其注入到PollSkipAdvice 中,并将该增强添加到轮询器的增强链中。要跳过轮询,请调用skipPolls() 。要恢复轮询,请调用reset() 。版本 4.2 在此领域增加了更多灵活性。请参阅条件轮询器.
|
延迟确认可轮询消息源
从版本 5.0.1 开始,某些模块提供支持延迟确认直到下游流程完成(或将消息传递给另一个线程)的MessageSource
实现。目前仅限于AmqpMessageSource
和KafkaMessageSource
。
使用这些消息源,IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
头部(参见 MessageHeaderAccessor
API)将被添加到消息中。当与可轮询消息源一起使用时,该头的值为 AcknowledgmentCallback
的实例,如下例所示
@FunctionalInterface
public interface AcknowledgmentCallback {
void acknowledge(Status status);
boolean isAcknowledged();
void noAutoAck();
default boolean isAutoAck();
enum Status {
/**
* Mark the message as accepted.
*/
ACCEPT,
/**
* Mark the message as rejected.
*/
REJECT,
/**
* Reject the message and requeue so that it will be redelivered.
*/
REQUEUE
}
}
并非所有消息源(例如,KafkaMessageSource
)都支持 REJECT
状态。它与 ACCEPT
相同。
应用程序可以在任何时间确认消息,如下例所示
Message<?> received = source.receive();
...
StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
.acknowledge(Status.ACCEPT);
如果 MessageSource
连接到 SourcePollingChannelAdapter
,当轮询线程在完成下游流程后返回到适配器时,适配器会检查确认是否已被确认,如果没有,则将其状态设置为 ACCEPT
(如果流程抛出异常,则为 REJECT
)。状态值在 AcknowledgmentCallback.Status
枚举 中定义。
Spring Integration 提供 MessageSourcePollingTemplate
来执行对 MessageSource
的临时轮询。它也会在 MessageHandler
回调返回(或抛出异常)时处理对 AcknowledgmentCallback
设置 ACCEPT
或 REJECT
。以下示例展示了如何使用 MessageSourcePollingTemplate
轮询
MessageSourcePollingTemplate template =
new MessageSourcePollingTemplate(this.source);
template.poll(h -> {
...
});
在这两种情况下(SourcePollingChannelAdapter
和 MessageSourcePollingTemplate
),可以通过在回调上调用 noAutoAck()
来禁用自动确认/否定确认。如果将消息传递给另一个线程并希望稍后确认,则可能需要这样做。并非所有实现都支持此功能(例如,Apache Kafka 不支持,因为偏移量提交必须在同一个线程上执行)。
消息源的条件轮询器
本节介绍如何使用条件轮询器。
背景
轮询器上的 advice-chain
中的 Advice
对象会建议整个轮询任务(包括消息检索和处理)。这些“环绕建议”方法无法访问轮询的任何上下文,只能访问轮询本身。对于将任务设为事务性或由于某些外部条件而跳过轮询等要求,这很好,如前所述。如果我们希望根据轮询的 receive
部分的结果采取一些操作,或者如果我们希望根据条件调整轮询器,该怎么办?对于这些情况,Spring Integration 提供了“智能”轮询。
“智能”轮询
版本 5.3 引入了 ReceiveMessageAdvice
接口。advice-chain
中实现此接口的任何 Advice
对象仅应用于 receive()
操作 - MessageSource.receive()
和 PollableChannel.receive(timeout)
。因此,它们只能应用于 SourcePollingChannelAdapter
或 PollingConsumer
。这些类实现了以下方法
-
beforeReceive(Object source)
此方法在Object.receive()
方法之前调用。它允许您检查和重新配置源。返回false
会取消此轮询(类似于前面提到的PollSkipAdvice
)。 -
Message<?> afterReceive(Message<?> result, Object source)
此方法在receive()
方法调用后被调用。同样,您可以重新配置源或采取任何操作(可能取决于结果,如果源没有创建消息,则结果可能为null
)。您甚至可以返回不同的消息
线程安全
如果 |
Advice 链排序
您应该了解在初始化期间如何处理 advice 链。不实现 |
SimpleActiveIdleReceiveMessageAdvice
此 advice 是 ReceiveMessageAdvice
的简单实现。当与 DynamicPeriodicTrigger
结合使用时,它会根据之前的轮询是否产生消息来调整轮询频率。轮询器还必须引用同一个 DynamicPeriodicTrigger
。
重要:异步传递
SimpleActiveIdleReceiveMessageAdvice 根据 receive() 结果修改触发器。这仅在轮询器线程上调用 advice 时才有效。如果轮询器具有 task-executor ,则它不起作用。要使用此 advice,您希望在轮询结果之后使用异步操作,请稍后进行异步传递,也许可以使用 ExecutorChannel 。
|
CompoundTriggerAdvice
此 advice 允许根据轮询是否返回消息来选择两个触发器之一。考虑一个使用 CronTrigger
的轮询器。CronTrigger
实例是不可变的,因此一旦构造就不能更改。考虑一个用例,我们希望使用 cron 表达式每小时触发一次轮询,但如果未收到消息,则每分钟轮询一次,并且当检索到消息时,恢复使用 cron 表达式。
此 advice(和轮询器)为此目的使用 CompoundTrigger
。触发器的 primary
触发器可以是 CronTrigger
。当 advice 检测到未收到消息时,它会将辅助触发器添加到 CompoundTrigger
中。当 CompoundTrigger
实例的 nextExecutionTime
方法被调用时,它会委托给辅助触发器(如果存在)。否则,它会委托给主触发器。
轮询器也必须引用相同的CompoundTrigger
。
以下示例展示了每小时 cron 表达式配置,并回退到每分钟执行。
<int:inbound-channel-adapter channel="nullChannel" auto-startup="false">
<bean class="org.springframework.integration.endpoint.PollerAdviceTests.Source" />
<int:poller trigger="compoundTrigger">
<int:advice-chain>
<bean class="org.springframework.integration.aop.CompoundTriggerAdvice">
<constructor-arg ref="compoundTrigger"/>
<constructor-arg ref="secondary"/>
</bean>
</int:advice-chain>
</int:poller>
</int:inbound-channel-adapter>
<bean id="compoundTrigger" class="org.springframework.integration.util.CompoundTrigger">
<constructor-arg ref="primary" />
</bean>
<bean id="primary" class="org.springframework.scheduling.support.CronTrigger">
<constructor-arg value="0 0 * * * *" /> <!-- top of every hour -->
</bean>
<bean id="secondary" class="org.springframework.scheduling.support.PeriodicTrigger">
<constructor-arg value="60000" />
</bean>
重要:异步传递
CompoundTriggerAdvice 根据receive() 结果修改触发器。这仅在轮询线程上调用建议时有效。如果轮询器具有task-executor ,则无效。要在您希望在轮询结果后使用异步操作的情况下使用此建议,请稍后进行异步传递,例如使用ExecutorChannel 。
|
仅限 MessageSource 的建议
某些建议可能仅适用于MessageSource.receive()
,对于PollableChannel
则没有意义。为此,MessageSourceMutator
接口(ReceiveMessageAdvice
的扩展)仍然存在。有关更多信息,请参见入站通道适配器:轮询多个服务器和目录。