消息通道实现

Spring Integration 提供了不同的消息通道实现。以下部分简要介绍了每种实现。

PublishSubscribeChannel

PublishSubscribeChannel 实现将发送给它的任何 Message 广播给所有订阅的处理程序。这通常用于发送事件消息,其主要作用是通知(与文档消息相反,文档消息通常旨在由单个处理程序处理)。请注意,PublishSubscribeChannel 仅用于发送。由于它在调用其 send(Message) 方法时直接广播给其订阅者,因此消费者无法轮询消息(它没有实现 PollableChannel,因此没有 receive() 方法)。相反,任何订阅者本身都必须是 MessageHandler,并且依次调用订阅者的 handleMessage(Message) 方法。

在 3.0 版之前,在没有订阅者的 PublishSubscribeChannel 上调用 send 方法将返回 false。当与 MessagingTemplate 结合使用时,将抛出 MessageDeliveryException。从 3.0 版开始,行为已更改,因此如果至少存在最小订阅者(并且成功处理消息),则 send 始终被认为是成功的。此行为可以通过设置 minSubscribers 属性来修改,该属性默认为 0

如果您使用 TaskExecutor,则仅使用正确数量的订阅者的存在来进行此确定,因为消息的实际处理是异步执行的。

QueueChannel

QueueChannel 实现包装了一个队列。与 PublishSubscribeChannel 不同,QueueChannel 具有点对点语义。换句话说,即使通道有多个消费者,也只有一个消费者应该接收发送到该通道的任何 Message。它提供了一个默认的无参数构造函数(提供本质上无界的容量 Integer.MAX_VALUE),以及一个接受队列容量的构造函数,如下面的清单所示。

public QueueChannel(int capacity)

尚未达到容量限制的通道将其消息存储在其内部队列中,并且 send(Message<?>) 方法立即返回,即使没有接收器准备处理消息也是如此。如果队列已达到容量,则发送方将阻塞,直到队列中有空间可用。或者,如果您使用具有附加超时参数的 send 方法,则队列将阻塞,直到有空间可用或超时时间过去,以先发生者为准。类似地,如果队列中有消息可用,则 receive() 调用会立即返回,但是,如果队列为空,则接收调用可能会阻塞,直到有消息可用或超时(如果提供)过去。在这两种情况下,都可以通过传递 0 的超时值来强制立即返回,而不管队列的状态如何。但是请注意,对没有 timeout 参数的 send()receive() 版本的调用将无限期地阻塞。

PriorityChannel

QueueChannel强制先进先出 (FIFO) 顺序不同,PriorityChannel 是一种替代实现,允许根据优先级对通道内的消息进行排序。默认情况下,优先级由每个消息中的 priority 头部确定。但是,对于自定义优先级确定逻辑,可以向 PriorityChannel 构造函数提供类型为 Comparator<Message<?>> 的比较器。

RendezvousChannel

RendezvousChannel 允许“直接传递”场景,其中发送方会阻塞,直到另一方调用通道的 receive() 方法。另一方会阻塞,直到发送方发送消息。在内部,这种实现与 QueueChannel 非常相似,只是它使用 SynchronousQueueBlockingQueue 的零容量实现)。这在发送方和接收方在不同线程中运行但异步将消息丢弃到队列中不合适的情况下非常有效。换句话说,使用 RendezvousChannel,发送方知道某个接收方已接受消息,而使用 QueueChannel,消息将存储到内部队列中,并且可能永远不会被接收。

请记住,默认情况下,所有这些基于队列的通道都仅在内存中存储消息。当需要持久性时,您可以在 'queue' 元素中提供 'message-store' 属性来引用持久性 MessageStore 实现,或者您可以用由持久性代理支持的通道替换本地通道,例如 JMS 支持的通道或通道适配器。后一种选项允许您利用任何 JMS 提供程序的实现来实现消息持久性,如 JMS 支持 中所述。但是,当不需要在队列中缓冲时,最简单的方法是依赖于 DirectChannel,这将在下一节中讨论。

RendezvousChannel 也适用于实现请求-回复操作。发送方可以创建一个临时、匿名的 RendezvousChannel 实例,然后在构建 Message 时将其设置为 'replyChannel' 头部。发送该 Message 后,发送方可以立即调用 receive(可以选择提供超时值)以在等待回复 Message 时阻塞。这与许多 Spring Integration 的请求-回复组件内部使用的实现非常相似。

DirectChannel

DirectChannel 具有点对点语义,但与 PublishSubscribeChannel 更相似,而不是之前描述的任何基于队列的通道实现。它实现了 SubscribableChannel 接口而不是 PollableChannel 接口,因此它直接将消息分派给订阅者。然而,作为点对点通道,它与 PublishSubscribeChannel 不同,因为它将每个 Message 发送给单个已订阅的 MessageHandler

除了是最简单的点对点通道选项之外,它最重要的功能之一是它使单个线程能够在通道的“两侧”执行操作。例如,如果一个处理程序订阅了 DirectChannel,那么向该通道发送 Message 会直接在发送者的线程中触发该处理程序的 handleMessage(Message) 方法的调用,在 send() 方法调用返回之前。

提供具有此行为的通道实现的关键动机是支持跨通道的交易,同时仍然从通道提供的抽象和松耦合中获益。如果 send() 调用是在事务范围内调用的,则处理程序调用的结果(例如,更新数据库记录)在确定该事务的最终结果(提交或回滚)中起作用。

由于 DirectChannel 是最简单的选项,并且不会添加任何用于调度和管理轮询器线程所需的额外开销,因此它是 Spring Integration 中的默认通道类型。一般思路是为应用程序定义通道,考虑哪些通道需要提供缓冲或限制输入,并将这些通道修改为基于队列的 PollableChannels。同样,如果通道需要广播消息,它不应该是一个 DirectChannel,而应该是一个 PublishSubscribeChannel。稍后,我们将展示如何配置这些通道。

DirectChannel 在内部委托给消息调度器来调用其订阅的消息处理程序,并且该调度器可以具有由 load-balancerload-balancer-ref 属性(互斥)公开的负载均衡策略。负载均衡策略由消息调度器使用,以帮助确定在多个消息处理程序订阅同一通道时如何将消息分配到消息处理程序之间。为了方便起见,load-balancer 属性公开了一个枚举值,指向预先存在的 LoadBalancingStrategy 实现。round-robin(在处理程序之间轮流进行负载均衡)和 none(用于显式禁用负载均衡的情况)是唯一可用的值。其他策略实现可能会在将来的版本中添加。但是,从 3.0 版本开始,您可以提供自己的 LoadBalancingStrategy 实现,并使用 load-balancer-ref 属性注入它,该属性应该指向实现 LoadBalancingStrategy 的 Bean,如下面的示例所示

FixedSubscriberChannel 是一个 SubscribableChannel,它只支持一个无法取消订阅的 MessageHandler 订阅者。这对于没有其他订阅者参与且不需要通道拦截器的高吞吐量性能用例很有用。

<int:channel id="lbRefChannel">
  <int:dispatcher load-balancer-ref="lb"/>
</int:channel>

<bean id="lb" class="foo.bar.SampleLoadBalancingStrategy"/>

请注意,load-balancerload-balancer-ref 属性是互斥的。

负载均衡也与布尔 failover 属性一起使用。如果 failover 值为 true(默认值),则调度器在前面的处理程序抛出异常时回退到任何后续处理程序(根据需要)。顺序由处理程序本身定义的可选顺序值确定,或者如果不存在此类值,则由处理程序订阅的顺序确定。

如果某种情况要求调度器始终尝试调用第一个处理程序,然后在每次发生错误时以相同的固定顺序序列回退,则不应提供任何负载均衡策略。换句话说,即使没有启用负载均衡,调度器仍然支持 failover 布尔属性。但是,在没有负载均衡的情况下,处理程序的调用始终从第一个开始,根据它们的顺序。例如,当对主、次、三级等有明确定义时,这种方法很有效。使用命名空间支持时,任何端点上的 order 属性决定顺序。

请记住,负载均衡和 failover 仅在通道有多个订阅的消息处理程序时才适用。使用命名空间支持时,这意味着多个端点共享在 input-channel 属性中定义的相同通道引用。

从 5.2 版本开始,当 failover 为 true 时,当前处理程序的故障以及失败的消息将分别在配置的 debuginfo 下记录。

ExecutorChannel

ExecutorChannel 是一种点对点通道,支持与 DirectChannel 相同的调度程序配置(负载均衡策略和 failover 布尔属性)。这两种调度通道类型之间的主要区别在于 ExecutorChannel 将委托给 TaskExecutor 的实例来执行调度。这意味着发送方法通常不会阻塞,但也意味着处理程序调用可能不会在发送者的线程中发生。因此,它不支持跨越发送者和接收处理程序的事务。

发送者有时可能会阻塞。例如,当使用具有拒绝策略的 TaskExecutor 来限制客户端(例如 ThreadPoolExecutor.CallerRunsPolicy)时,发送者的线程可以在线程池达到最大容量且执行程序的工作队列已满时随时执行方法。由于这种情况只会以不可预测的方式发生,因此您不应依赖它进行事务。

PartitionedChannel

从 6.1 版本开始,提供了 PartitionedChannel 实现。这是 AbstractExecutorChannel 的扩展,表示点对点调度逻辑,其中实际的消费是在特定线程上处理的,该线程由从发送到该通道的消息中评估的分区键确定。该通道类似于上面提到的 ExecutorChannel,但不同之处在于具有相同分区键的消息始终在同一线程中处理,从而保留了顺序。它不需要外部 TaskExecutor,但可以使用自定义 ThreadFactory 配置(例如 Thread.ofVirtual().name("partition-", 0).factory())。此工厂用于将单线程执行程序填充到每个分区的 MessageDispatcher 委托中。默认情况下,IntegrationMessageHeaderAccessor.CORRELATION_ID 消息头用作分区键。该通道可以配置为一个简单的 bean

@Bean
PartitionedChannel somePartitionedChannel() {
    return new PartitionedChannel(3, (message) -> message.getHeaders().get("partitionKey"));
}

该通道将有 3 个分区 - 专用线程;将使用 partitionKey 头来确定消息将在哪个分区中处理。有关更多信息,请参阅 PartitionedChannel 类 Javadoc。

FluxMessageChannel

FluxMessageChannelorg.reactivestreams.Publisher 的实现,用于将发送的消息“下沉”到内部 reactor.core.publisher.Flux 中,以便下游的响应式订阅者按需消费。此通道实现既不是 SubscribableChannel,也不是 PollableChannel,因此只有 org.reactivestreams.Subscriber 实例才能用于从该通道消费,并遵守响应式流的背压特性。另一方面,FluxMessageChannel 实现了一个 ReactiveStreamsSubscribableChannel,其 subscribeTo(Publisher<Message<?>>) 合同允许从响应式源发布者接收事件,将响应式流桥接到集成流中。为了实现整个集成流的完全响应式行为,必须将此类通道放置在流中的所有端点之间。

有关与响应式流交互的更多信息,请参见 响应式流支持

作用域通道

Spring Integration 1.0 提供了 ThreadLocalChannel 实现,但从 2.0 开始已将其删除。现在,处理相同需求的更通用方法是向通道添加 scope 属性。属性的值可以是上下文中可用的作用域的名称。例如,在 Web 环境中,某些作用域是可用的,任何自定义作用域实现都可以注册到上下文中。以下示例显示将线程局部作用域应用于通道,包括作用域本身的注册

<int:channel id="threadScopedChannel" scope="thread">
     <int:queue />
</int:channel>

<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
    <property name="scopes">
        <map>
            <entry key="thread" value="org.springframework.context.support.SimpleThreadScope" />
        </map>
    </property>
</bean>

在前面的示例中定义的通道也内部委托给队列,但通道绑定到当前线程,因此队列的内容也类似地绑定。这样,发送到通道的线程以后可以接收相同的消息,但其他线程将无法访问它们。虽然线程作用域通道很少需要,但它们在以下情况下可能很有用:DirectChannel 实例用于强制执行单线程操作,但任何回复消息都应发送到“终端”通道。如果该终端通道是线程作用域的,则原始发送线程可以从终端通道收集其回复。

现在,由于任何通道都可以作用域化,因此除了线程局部之外,您还可以定义自己的作用域。