分散-聚合
从 4.1 版本开始,Spring Integration 提供了 分散-聚合 (scatter-gather) 企业集成模式的实现。它是一个复合端点,目标是将消息发送给多个接收者并聚合结果。正如 企业集成模式 中所述,它适用于“最优报价”等场景,在此类场景中,我们需要向多个供应商请求信息,并决定哪一个能为所请求的商品提供最佳条款。
以前,该模式可以通过使用离散组件来配置。此项增强带来了更方便的配置。
ScatterGatherHandler 是一个请求-回复端点,它结合了 PublishSubscribeChannel(或 RecipientListRouter)和 AggregatingMessageHandler。请求消息被发送到 scatter 通道,ScatterGatherHandler 等待聚合器发送到 outputChannel 的回复。
功能
分散-聚合 模式提出了两种场景:“拍卖”和“分发”。在这两种情况下,聚合 功能是相同的,并提供了 AggregatingMessageHandler 可用的所有选项。(实际上,ScatterGatherHandler 仅需要 AggregatingMessageHandler 作为构造函数参数。)有关更多信息,请参阅 聚合器。
拍卖
拍卖 分散-聚合 变体使用“发布-订阅”逻辑处理请求消息,其中“分散”通道是带有 apply-sequence="true" 的 PublishSubscribeChannel。然而,此通道可以是任何 MessageChannel 实现(就像 ContentEnricher 中的 request-channel 一样 — 请参阅 内容增强器)。但是,在这种情况下,您应该为 聚合 功能创建自己的自定义 correlationStrategy。
分发
分发 分散-聚合 变体基于 RecipientListRouter(请参阅 RecipientListRouter),并具有 RecipientListRouter 的所有可用选项。这是第二个 ScatterGatherHandler 构造函数参数。如果您只想依赖 recipient-list-router 和 聚合器 的默认 correlationStrategy,则应指定 apply-sequence="true"。否则,您应该为 聚合器 提供自定义 correlationStrategy。与 PublishSubscribeChannel 变体(拍卖变体)不同,拥有 recipient-list-router 的 selector 选项允许根据消息过滤目标供应商。使用 apply-sequence="true" 时,会提供默认的 sequenceSize,并且 聚合器 可以正确地释放组。分发选项与拍卖选项互斥。
applySequence=true 仅在基于 ScatterGatherHandler(MessageHandler scatterer, MessageHandler gatherer) 构造函数配置的纯 Java 配置中才需要,因为框架无法修改外部提供的组件。为了方便起见,从 6.0 版本开始,分散-聚合 的 XML 和 Java DSL 将 applySequence 设置为 true。 |
对于拍卖和分发变体,请求(分散)消息会通过 gatherResultChannel 标头进行丰富,以等待来自 聚合器 的回复消息。
默认情况下,所有供应商都应将其结果发送到 replyChannel 标头(通常通过省略最终端点的 output-channel)。但是,也提供了 gatherChannel 选项,允许供应商将其回复发送到该通道进行聚合。
配置分散-聚合端点
以下示例显示了 分散-聚合 Bean 定义的 Java 配置
@Bean
public MessageHandler distributor() {
RecipientListRouter router = new RecipientListRouter();
router.setApplySequence(true);
router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),
distributionChannel3()));
return router;
}
@Bean
public MessageHandler gatherer() {
return new AggregatingMessageHandler(
new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),
new SimpleMessageStore(),
new HeaderAttributeCorrelationStrategy(
IntegrationMessageHeaderAccessor.CORRELATION_ID),
new ExpressionEvaluatingReleaseStrategy("size() == 2"));
}
@Bean
@ServiceActivator(inputChannel = "distributionChannel")
public MessageHandler scatterGatherDistribution() {
ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
handler.setOutputChannel(output());
return handler;
}
在前面的示例中,我们配置了 RecipientListRouter 的 distributor bean,其中 applySequence="true" 和接收者通道列表。下一个 bean 是 AggregatingMessageHandler。最后,我们将这两个 bean 注入到 ScatterGatherHandler bean 定义中,并将其标记为 @ServiceActivator 以将分散-聚合组件连接到集成流中。
以下示例展示了如何使用 XML 命名空间配置 <scatter-gather> 端点
<scatter-gather
id="" (1)
auto-startup="" (2)
input-channel="" (3)
output-channel="" (4)
scatter-channel="" (5)
gather-channel="" (6)
order="" (7)
phase="" (8)
send-timeout="" (9)
gather-timeout="" (10)
requires-reply="" > (11)
<scatterer/> (12)
<gatherer/> (13)
</scatter-gather>
| 1 | 端点的 ID。ScatterGatherHandler bean 使用 id + '.handler' 的别名注册。RecipientListRouter bean 使用 id + '.scatterer' 的别名注册。AggregatingMessageHandler bean 使用 id + '.gatherer' 的别名注册。可选。(BeanFactory 生成默认的 id 值。) |
| 2 | 生命周期属性,表示端点是否应在应用程序上下文初始化期间启动。此外,ScatterGatherHandler 也实现了 Lifecycle,如果提供了 gather-channel,它将启动和停止内部创建的 gatherEndpoint。可选。(默认值为 true。) |
| 3 | 用于在 ScatterGatherHandler 中接收请求消息以进行处理的通道。必填。 |
| 4 | ScatterGatherHandler 发送聚合结果的通道。可选。(传入消息可以在 replyChannel 消息头中自行指定回复通道)。 |
| 5 | 用于拍卖场景中发送分散消息的通道。可选。与 <scatterer> 子元素互斥。 |
| 6 | 用于接收每个供应商的回复以进行聚合的通道。它用作分散消息中的 replyChannel 标头。可选。默认情况下,会创建 FixedSubscriberChannel。 |
| 7 | 当多个处理程序订阅到同一个 DirectChannel 时,此组件的顺序(用于负载平衡目的)。可选。 |
| 8 | 指定端点应启动和停止的阶段。启动顺序从最低到最高,关闭顺序从最高到最低。默认情况下,此值为 Integer.MAX_VALUE,这意味着此容器尽可能晚地启动,尽可能早地停止。可选。 |
| 9 | 发送回复 Message 到 output-channel 时的等待超时时间。默认情况下,send() 会阻塞一秒钟。它仅在输出通道存在某些“发送”限制时适用——例如,一个容量固定的 QueueChannel 已满。在这种情况下,会抛出 MessageDeliveryException。对于 AbstractSubscribableChannel 实现,send-timeout 将被忽略。对于 group-timeout(-expression),来自调度过期任务的 MessageDeliveryException 会导致此任务被重新调度。可选。 |
| 10 | 允许您指定分散-聚合在返回之前等待回复消息的时间。默认情况下,它会等待 30 秒。如果回复超时,则返回 'null'。可选。 |
| 11 | 指定分散-聚合是否必须返回非空值。此值默认为 true。因此,当底层聚合器在 gather-timeout 后返回 null 值时,会抛出 ReplyRequiredException。请注意,如果可能为 null,则应指定 gather-timeout 以避免无限期等待。 |
| 12 | <recipient-list-router> 选项。可选。与 scatter-channel 属性互斥。 |
| 13 | <aggregator> 选项。必填。 |
从版本 6.5.3 开始,当 ScatterGatherHandler 配置为 async = true 选项时,请求消息处理线程不再阻塞等待内部 ((PollableChannel) gatherResultChannel).receive(this.gatherTimeout) 操作上的聚合结果。相反,基于最终从 gatherResultChannel 生成的聚合结果,会返回一个 reactor.core.publisher.Mono 作为回复对象。然后,该 Mono 将根据框架中的 Reactive Streams 支持 进行处理。 |
错误处理
由于分散-聚合是一个多请求-回复组件,错误处理会变得更加复杂。在某些情况下,如果 ReleaseStrategy 允许进程以少于请求的回复完成,最好只捕获并忽略下游异常。在其他情况下,当发生错误时,应考虑使用“补偿消息”从子流返回。
每个异步子流都应该配置 errorChannel 标头,以便 MessagePublishingErrorHandler 正确发送错误消息。否则,错误将被发送到具有通用错误处理逻辑的全局 errorChannel。有关异步错误处理的更多信息,请参阅 错误处理。
同步流可以使用 ExpressionEvaluatingRequestHandlerAdvice 来忽略异常或返回补偿消息。当其中一个子流抛出异常到 ScatterGatherHandler 时,它会被重新抛出到上游。这样,所有其他子流将白费功夫,并且它们的回复将在 ScatterGatherHandler 中被忽略。这有时可能是预期的行为,但在大多数情况下,最好在特定的子流中处理错误,而不会影响所有其他子流和聚合器中的预期。
从 5.1.3 版本开始,ScatterGatherHandler 提供了 errorChannelName 选项。它会被填充到分散消息的 errorChannel 标头中,并在发生异步错误时使用,或者可以在常规同步子流中用于直接发送错误消息。
以下示例配置演示了通过返回补偿消息进行异步错误处理
@Bean
public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {
return f -> f
.scatterGather(
scatterer -> scatterer
.recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
.recipientFlow(f2 -> f2
.channel(c -> c.executor(taskExecutor))
.transform(p -> {
throw new RuntimeException("Sub-flow#2");
})),
null,
s -> s.errorChannel("scatterGatherErrorChannel"));
}
@ServiceActivator(inputChannel = "scatterGatherErrorChannel")
public Message<?> processAsyncScatterError(MessagingException payload) {
return MessageBuilder.withPayload(payload.getCause().getCause())
.copyHeaders(payload.getFailedMessage().getHeaders())
.build();
}
为了生成正确的回复,我们必须从 MessagingException 的 failedMessage 复制头部(包括 replyChannel 和 errorChannel),该异常已由 MessagePublishingErrorHandler 发送到 scatterGatherErrorChannel。这样,目标异常将返回到 ScatterGatherHandler 的聚合器,以完成回复消息组。这样的异常 payload 可以在聚合器的 MessageGroupProcessor 中过滤掉,或者在分散-聚合端点之后以其他方式向下游处理。
在将分散结果发送给聚合器之前,ScatterGatherHandler 会恢复请求消息头,包括回复通道和错误通道(如果有)。这样,来自 AggregatingMessageHandler 的错误将传播给调用者,即使在分散接收者子流中应用了异步传递。为了成功操作,gatherResultChannel、originalReplyChannel 和 originalErrorChannel 头必须传回分散接收者子流的回复中。在这种情况下,必须为 ScatterGatherHandler 配置一个合理的、有限的 gatherTimeout。否则,默认情况下,它将无限期地阻塞等待来自聚合器的回复。 |