消息通道实现
Spring Integration 提供不同的消息通道实现。以下部分简要描述了每一个实现。
PublishSubscribeChannel
PublishSubscribeChannel
实现将发送给它的任何 Message
广播到所有已订阅的处理器。这最常用于发送事件消息,其主要作用是通知(与文档消息相反,文档消息通常旨在由单个处理器处理)。请注意,PublishSubscribeChannel
仅用于发送。由于它在调用其 send(Message)
方法时直接向其订阅者广播,因此使用者无法轮询消息(它没有实现 PollableChannel
,因此没有 receive()
方法)。相反,任何订阅者本身都必须是 MessageHandler
,并且依次调用订阅者的 handleMessage(Message)
方法。
在 3.0 版本之前,对没有订阅者的 PublishSubscribeChannel
调用 send
方法将返回 false
。当与 MessagingTemplate
一起使用时,会抛出 MessageDeliveryException
。从 3.0 版本开始,行为已更改,如果至少存在最小订阅者(并成功处理消息),则 send
始终被认为是成功的。可以通过设置 minSubscribers
属性来修改此行为,该属性默认为 0
。
如果使用 TaskExecutor ,则仅使用正确数量的订阅者的存在来确定此结果,因为消息的实际处理是异步执行的。 |
QueueChannel
QueueChannel
实现包装了一个队列。与 PublishSubscribeChannel
不同,QueueChannel
具有点对点语义。换句话说,即使通道有多个使用者,也只有一个使用者应该接收发送到该通道的任何 Message
。它提供了一个默认的无参数构造函数(提供基本上无界的容量 Integer.MAX_VALUE
),以及一个接受队列容量的构造函数,如下所示:
public QueueChannel(int capacity)
尚未达到其容量限制的通道将其内部队列中的消息存储起来,并且 send(Message<?>)
方法立即返回,即使没有接收器准备好处理该消息也是如此。如果队列已达到容量,则发送方将阻塞,直到队列中有可用空间。或者,如果使用具有附加超时参数的 send 方法,则队列将阻塞,直到有可用空间或超时时间过去,以先发生者为准。类似地,如果队列中有可用消息,则 receive()
调用将立即返回,但是,如果队列为空,则 receive 调用可能会阻塞,直到有可用消息或(如果提供)超时时间过去。在这两种情况下,都可以通过传递 0 的超时值来强制立即返回,而不管队列的状态如何。但是,请注意,对没有 timeout
参数的 send()
和 receive()
版本的调用会无限期阻塞。
PriorityChannel
QueueChannel
执行先进先出 (FIFO) 排序,而 PriorityChannel
是一种替代实现,它允许根据优先级对通道内的消息进行排序。默认情况下,优先级由每条消息中的 priority
头确定。但是,对于自定义优先级确定逻辑,可以向 PriorityChannel
构造函数提供类型为 Comparator<Message<?>>
的比较器。
RendezvousChannel
RendezvousChannel
支持“直接传递”方案,其中发送方会阻塞,直到另一方调用通道的 receive()
方法。另一方会阻塞,直到发送方发送消息。在内部,此实现与 QueueChannel
非常相似,只是它使用 SynchronousQueue
(BlockingQueue
的零容量实现)。这在发送方和接收方在不同的线程中运行但异步丢弃队列中的消息不合适的情况下效果很好。换句话说,使用 RendezvousChannel
,发送方知道某个接收方已接受消息,而使用 QueueChannel
,消息将存储到内部队列中,并且可能永远不会被接收。
请记住,所有这些基于队列的通道默认情况下仅将消息存储在内存中。当需要持久性时,您可以在 'queue' 元素中提供 'message-store' 属性以引用持久性 MessageStore 实现,或者您可以将本地通道替换为由持久性代理支持的通道,例如 JMS 支持的通道或通道适配器。后一种选项允许您利用任何 JMS 提供商的实现来实现消息持久性,如JMS 支持中所述。但是,当不需要在队列中缓冲时,最简单的方法是依赖于下一节中讨论的 DirectChannel 。 |
RendezvousChannel
也可用于实现请求回复操作。发送方可以创建 RendezvousChannel
的临时匿名实例,然后在构建 Message
时将其设置为 'replyChannel' 头。发送该 Message
后,发送方可以立即调用 receive
(可选地提供超时值)以在等待回复 Message
时阻塞。这与许多 Spring Integration 的请求回复组件内部使用的实现非常相似。
DirectChannel
DirectChannel
具有点对点语义,但在其他方面更类似于 PublishSubscribeChannel
,而不是前面描述的任何基于队列的通道实现。它实现 SubscribableChannel
接口而不是 PollableChannel
接口,因此它直接将消息分派给订阅者。但是,作为点对点通道,它与 PublishSubscribeChannel
的区别在于它将每个 Message
发送给单个已订阅的 MessageHandler
。
除了是最简单的点对点通道选项外,其最重要的功能之一是它允许单个线程对通道的“两侧”执行操作。例如,如果处理程序订阅 DirectChannel
,则向该通道发送 Message
将直接在发送方的线程中触发该处理程序的 handleMessage(Message)
方法的调用,然后才能返回 send()
方法调用。
提供具有此行为的通道实现的主要动机是支持必须跨通道的交易,同时仍然受益于通道提供的抽象和松散耦合。如果在事务范围内调用 send()
调用,则处理程序调用的结果(例如,更新数据库记录)将在确定该事务的最终结果(提交或回滚)中发挥作用。
由于 DirectChannel 是最简单的选项,并且不会添加轮询程序的线程调度和管理所需的任何额外开销,因此它是 Spring Integration 中的默认通道类型。总体思路是定义应用程序的通道,考虑哪些通道需要提供缓冲或限制输入,并将这些通道修改为基于队列的 PollableChannels 。同样,如果通道需要广播消息,则它不应该是 DirectChannel ,而应该是 PublishSubscribeChannel 。稍后,我们将展示如何配置每个通道。 |
DirectChannel
在内部委托给消息分派器以调用其已订阅的消息处理程序,并且该分派器可以具有由 load-balancer
或 load-balancer-ref
属性(互斥)公开的负载均衡策略。消息分派器使用负载均衡策略来帮助确定在多个消息处理程序订阅同一通道时如何将消息分发到消息处理程序之间。为了方便起见,load-balancer
属性公开了指向现有 LoadBalancingStrategy
实现的枚举值。round-robin
(轮流跨处理程序进行负载均衡)和 none
(用于显式禁用负载均衡的情况)是唯一可用的值。将来版本中可能会添加其他策略实现。但是,从 3.0 版本开始,您可以提供自己的 LoadBalancingStrategy
实现并使用 load-balancer-ref
属性注入它,该属性应指向实现 LoadBalancingStrategy
的 bean,如下例所示:
FixedSubscriberChannel
是一个 SubscribableChannel
,它仅支持单个 MessageHandler
订阅者,并且无法取消订阅。这在没有其他订阅者参与且不需要通道拦截器的情况下,对于高吞吐量性能用例非常有用。
<int:channel id="lbRefChannel">
<int:dispatcher load-balancer-ref="lb"/>
</int:channel>
<bean id="lb" class="foo.bar.SampleLoadBalancingStrategy"/>
请注意,load-balancer
和 load-balancer-ref
属性是互斥的。
负载均衡也与布尔型failover
属性结合使用。如果failover
值为true(默认值),则当前面的处理程序抛出异常时,调度程序会根据需要回退到任何后续的处理程序。顺序由处理程序本身定义的可选order值决定,如果不存在此类值,则由处理程序订阅的顺序决定。
如果某种情况要求调度程序始终尝试调用第一个处理程序,然后每次发生错误时都按相同的固定顺序回退,则不应提供任何负载均衡策略。换句话说,即使未启用负载均衡,调度程序仍然支持failover
布尔属性。但是,在没有负载均衡的情况下,处理程序的调用始终从第一个处理程序开始,按照它们的顺序。例如,当对主、次、三级等有明确定义时,这种方法效果很好。当使用命名空间支持时,任何端点上的order
属性决定顺序。
请记住,负载均衡和failover 仅在通道有多个已订阅的消息处理程序时才适用。当使用命名空间支持时,这意味着多个端点共享在input-channel 属性中定义的相同通道引用。 |
从5.2版本开始,当failover
为true时,当前处理程序的失败以及失败的消息将分别在配置为debug
或info
级别时记录。
ExecutorChannel
ExecutorChannel
是一个点对点通道,支持与DirectChannel
相同的调度程序配置(负载均衡策略和failover
布尔属性)。这两种调度通道类型之间的主要区别在于,ExecutorChannel
委托给TaskExecutor
的实例来执行调度。这意味着send方法通常不会阻塞,但也意味着处理程序的调用可能不会在发送者的线程中发生。因此,它不支持跨越发送者和接收处理程序的事务。
发送者有时可能会阻塞。例如,当使用具有限制客户端的拒绝策略的TaskExecutor (例如ThreadPoolExecutor.CallerRunsPolicy )时,当线程池达到最大容量且执行器的作业队列已满时,发送者的线程可以随时执行该方法。由于这种情况只会以不可预测的方式发生,因此不应依赖它进行事务处理。 |
PartitionedChannel
从6.1版本开始,提供了PartitionedChannel
实现。这是AbstractExecutorChannel
的扩展,表示点对点调度逻辑,其中实际的消费在特定线程上处理,该线程由从发送到此通道的消息中评估的分区键确定。此通道类似于上面提到的ExecutorChannel
,但区别在于具有相同分区键的消息始终在同一线程中处理,从而保持顺序。它不需要外部TaskExecutor
,但可以使用自定义ThreadFactory
进行配置(例如Thread.ofVirtual().name("partition-", 0).factory()
)。此工厂用于为每个分区填充单线程执行器到MessageDispatcher
委托中。默认情况下,使用IntegrationMessageHeaderAccessor.CORRELATION_ID
消息头作为分区键。此通道可以配置为一个简单的bean。
@Bean
PartitionedChannel somePartitionedChannel() {
return new PartitionedChannel(3, (message) -> message.getHeaders().get("partitionKey"));
}
该通道将有3
个分区 - 专用线程;将使用partitionKey
头来确定在哪个分区中处理消息。有关更多信息,请参见PartitionedChannel
类的Javadoc。
FluxMessageChannel
FluxMessageChannel
是"sinking"
的org.reactivestreams.Publisher
实现,它将发送的消息放入内部的reactor.core.publisher.Flux
中,以便下游的反应式订阅者按需使用。此通道实现既不是SubscribableChannel
,也不是PollableChannel
,因此只能使用org.reactivestreams.Subscriber
实例从该通道消费,遵守反应式流的反压特性。另一方面,FluxMessageChannel
实现了ReactiveStreamsSubscribableChannel
及其subscribeTo(Publisher<Message<?>>)
契约,允许接收来自反应式源发布者的事件,将反应式流桥接到集成流中。为了实现整个集成流的完全反应式行为,必须在流中的所有端点之间放置这样的通道。
有关与反应式流交互的更多信息,请参见反应式流支持。
作用域通道
Spring Integration 1.0提供了ThreadLocalChannel
实现,但从2.0版本开始已将其移除。现在处理相同需求的更通用方法是向通道添加scope
属性。属性的值可以是上下文中可用的作用域的名称。例如,在Web环境中,某些作用域可用,任何自定义作用域实现都可以注册到上下文中。以下示例显示将线程局部作用域应用于通道,包括作用域本身的注册。
<int:channel id="threadScopedChannel" scope="thread">
<int:queue />
</int:channel>
<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
<property name="scopes">
<map>
<entry key="thread" value="org.springframework.context.support.SimpleThreadScope" />
</map>
</property>
</bean>
在前面的示例中定义的通道还在内部委托给一个队列,但通道绑定到当前线程,因此队列的内容也类似地绑定。这样,发送到通道的线程以后可以接收相同的这些消息,但其他线程将无法访问它们。虽然很少需要线程作用域通道,但在使用DirectChannel
实例强制执行单线程操作但任何回复消息都应发送到“终端”通道的情况下,它们可能很有用。如果该终端通道是线程作用域的,则原始发送线程可以从终端通道收集其回复。
现在,由于任何通道都可以具有作用域,因此除了线程局部之外,您还可以定义自己的作用域。