聚合器和重排序器
Aggregator在概念上与Splitter相反。它将一系列单独的消息聚合成一个消息,并且必然更复杂。默认情况下,聚合器返回一个包含来自传入消息的有效负载集合的消息。同样的规则也适用于Resequencer。以下示例展示了分发器-聚合器模式的典型示例。
@Bean
public IntegrationFlow splitAggregateFlow() {
return IntegrationFlow.from("splitAggregateInput")
.split()
.channel(MessageChannels.executor(this.taskExecutor()))
.resequence()
.aggregate()
.get();
}
split()方法将列表拆分成单独的消息并将其发送到ExecutorChannel。resequence()方法根据消息头中找到的序列详细信息对消息进行重新排序。aggregate()方法收集这些消息。
但是,您可以通过指定发布策略和关联策略等来更改默认行为。考虑以下示例:
.aggregate(a ->
a.correlationStrategy(m -> m.getHeaders().get("myCorrelationKey"))
.releaseStrategy(g -> g.size() > 10)
.messageStore(messageStore()))
前面的示例关联了带有myCorrelationKey头的消息,并在至少累积十个消息后发布这些消息。
resequence() EIP 方法也提供了类似的 Lambda 配置。