聚合器和重新排序器

一个Aggregator在概念上与Splitter相反。它将一系列单独的消息聚合到一条消息中,并且必然更加复杂。默认情况下,聚合器返回一条包含来自传入消息的消息体集合的消息。Resequencer也应用相同的规则。以下示例展示了拆分器-聚合器模式的典型示例

@Bean
public IntegrationFlow splitAggregateFlow() {
    return IntegrationFlow.from("splitAggregateInput")
            .split()
            .channel(MessageChannels.executor(this.taskExecutor()))
            .resequence()
            .aggregate()
            .get();
}

split()方法将列表拆分为单独的消息并将其发送到ExecutorChannelresequence()方法根据消息头中找到的序列细节重新排序消息。aggregate()方法收集这些消息。

但是,您可以通过指定释放策略和关联策略等更改默认行为。请考虑以下示例

.aggregate(a ->
        a.correlationStrategy(m -> m.getHeaders().get("myCorrelationKey"))
            .releaseStrategy(g -> g.size() > 10)
            .messageStore(messageStore()))

前面的示例关联具有myCorrelationKey头的消息,并在至少累积了十条消息后释放这些消息。

resequence()EIP 方法提供了类似的 lambda 配置。