文件聚合器

从 5.5 版本开始,引入了 FileAggregator 来处理启用 START/END 标记时 FileSplitter 使用场景的另一面。为方便起见,FileAggregator 实现了所有三种序列细节策略。

  • HeaderAttributeCorrelationStrategy 使用 FileHeaders.FILENAME 属性来计算关联键。当在 FileSplitter 上启用标记时,它不会填充序列细节头,因为 START/END 标记消息也包含在序列大小中。FileHeaders.FILENAME 仍然会为每个发出的行填充,包括 START/END 标记消息。

  • FileMarkerReleaseStrategy - 检查组中是否存在 FileSplitter.FileMarker.Mark.END 消息,然后将 FileHeaders.LINE_COUNT 头的值与组大小减去 2 - FileSplitter.FileMarker 实例进行比较。它还为 conditionSupplier 函数实现了一个方便的 GroupConditionProvider 联系,该函数将在 AbstractCorrelatingMessageHandler 中使用。有关更多信息,请参阅 消息组条件

  • FileAggregatingMessageGroupProcessor 只从组中删除 FileSplitter.FileMarker 消息,并将其余消息收集到一个列表有效负载中以生成。

以下列表显示了配置 FileAggregator 的可能方法。

@Bean
public IntegrationFlow fileSplitterAggregatorFlow(TaskExecutor taskExecutor) {
    return f -> f
            .split(Files.splitter()
                    .markers()
                    .firstLineAsHeader("firstLine"))
            .channel(c -> c.executor(taskExecutor))
            .filter(payload -> !(payload instanceof FileSplitter.FileMarker),
                    e -> e.discardChannel("aggregatorChannel"))
            .<String, String>transform(String::toUpperCase)
            .channel("aggregatorChannel")
            .aggregate(new FileAggregator())
            .channel(c -> c.queue("resultChannel"));
}

如果 FileAggregator 的默认行为不能满足目标逻辑,建议使用单独的策略配置聚合器端点。有关更多信息,请参阅 FileAggregator JavaDocs。