文件拆分器

FileSplitter 在 4.1.2 版本中添加,其命名空间支持在 4.2 版本中添加。FileSplitter 根据 BufferedReader.readLine() 将文本文件拆分为单独的行。默认情况下,拆分器使用 Iterator 从文件中读取行时一次发出一个行。将 iterator 属性设置为 false 会导致它在将所有行作为消息发出之前将所有行读入内存。这可能的一种用例是,如果您想在发送包含行的任何消息之前检测文件的 I/O 错误。但是,它只适用于相对较短的文件。

输入负载可以是 FileString(一个 File 路径)、InputStreamReader。其他类型的负载将保持不变。

以下列表显示了配置 FileSplitter 的可能方法

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@SpringBootApplication
public class FileSplitterApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FileSplitterApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow fileSplitterFlow() {
        return IntegrationFlow
            .from(Files.inboundAdapter(tmpDir.getRoot())
                 .filter(new ChainFileListFilter<File>()
                        .addFilter(new AcceptOnceFileListFilter<>())
                        .addFilter(new ExpressionFileListFilter<>(
                             new FunctionExpression<File>(f -> "foo.tmp".equals(f.getName()))))))
            .split(Files.splitter()
                     .markers()
                     .charset(StandardCharsets.US_ASCII)
                     .firstLineAsHeader("fileHeader")
                     .applySequence(true))
            .channel(c -> c.queue("fileSplittingResultChannel"))
            .get();
    }

}
@Bean
fun fileSplitterFlow() =
    integrationFlow(
        Files.inboundAdapter(tmpDir.getRoot())
            .filter(
                ChainFileListFilter<File?>()
                    .addFilter(AcceptOnceFileListFilter())
                    .addFilter(ExpressionFileListFilter(FunctionExpression { f: File? -> "foo.tmp" == f!!.name }))
            )
    ) {
        split(
            Files.splitter()
                .markers()
                .charset(StandardCharsets.US_ASCII)
                .firstLineAsHeader("fileHeader")
                .applySequence(true)
        )
        channel { queue("fileSplittingResultChannel") }
    }
@Splitter(inputChannel="toSplitter")
@Bean
public MessageHandler fileSplitter() {
    FileSplitter splitter = new FileSplitter(true, true);
    splitter.setApplySequence(true);
    splitter.setOutputChannel(outputChannel);
    return splitter;
}
<int-file:splitter id="splitter" (1)
    iterator=""                  (2)
    markers=""                   (3)
    markers-json=""              (4)
    apply-sequence=""            (5)
    requires-reply=""            (6)
    charset=""                   (7)
    first-line-as-header=""      (8)
    input-channel=""             (9)
    output-channel=""            (10)
    send-timeout=""              (11)
    auto-startup=""              (12)
    order=""                     (13)
    phase="" />                  (14)
1 分隔器的 Bean 名称。
2 设置为 true(默认值)以使用迭代器,或设置为 false 以将文件加载到内存中,然后再发送行。
3 设置为 true 以在文件数据之前和之后发出文件开头和文件结尾的标记消息。标记是带有 FileSplitter.FileMarker 负载的消息(在 mark 属性中具有 STARTEND 值)。您可以在下游流中顺序处理文件时使用标记,其中某些行被过滤。它们使下游处理能够知道何时已完全处理文件。此外,一个包含 STARTENDfile_marker 标头将添加到这些消息中。END 标记包含行数。如果文件为空,则仅发出 STARTEND 标记,lineCount0。默认值为 false。当为 true 时,apply-sequence 默认情况下为 false。另请参阅 markers-json(下一个属性)。
4 markers 为 true 时,将此设置为 true 以将 FileMarker 对象转换为 JSON 字符串。(在内部使用 SimpleJsonSerializer)。
5 设置为 false 以禁用在消息中包含 sequenceSizesequenceNumber 标头。默认值为 true,除非 markerstrue。当 truemarkerstrue 时,标记将包含在排序中。当 trueiteratortrue 时,sequenceSize 标头将设置为 0,因为大小未知。
6 设置为 true 以在文件中没有行时引发 RequiresReplyException。默认值为 false
7 设置用于将文本数据读入 String 负载时的字符集名称。默认值为平台字符集。
8 用于第一行的标头名称,作为为剩余行发出的消息中的标头携带。自版本 5.0 起。
9 设置用于将消息发送到分隔器的输入通道。
10 设置用于发送消息的输出通道。
11 设置发送超时时间。仅当output-channel可以阻塞时适用,例如,一个已满的QueueChannel
12 设置为false以禁用在上下文刷新时自动启动拆分器。默认值为true
13 如果input-channel<publish-subscribe-channel/>,则设置此端点的顺序。
14 设置拆分器的启动阶段(在auto-startuptrue时使用)。

FileSplitter还会将任何基于文本的InputStream拆分为行。从版本 4.3 开始,当与 FTP 或 SFTP 流式入站通道适配器或使用stream选项检索文件的 FTP 或 SFTP 出站网关结合使用时,拆分器会在文件完全使用后自动关闭支持流的会话。有关这些功能的更多信息,请参阅FTP 流式入站通道适配器SFTP 流式入站通道适配器,以及FTP 出站网关SFTP 出站网关

使用 Java 配置时,可以使用额外的构造函数,如下例所示

public FileSplitter(boolean iterator, boolean markers, boolean markersJson)

markersJson为 true 时,标记将表示为 JSON 字符串(使用SimpleJsonSerializer)。

版本 5.0 引入了firstLineAsHeader选项,用于指定内容的第一行是标题(例如,CSV 文件中的列名)。传递给此属性的参数是标题名称,在该名称下,第一行作为标题在为剩余行发出的消息中传递。此行不包含在序列标题中(如果applySequence为 true)也不包含在与FileMarker.END关联的lineCount中。注意:从版本 5.5 开始,lineCount也作为FileHeaders.LINE_COUNT包含在FileMarker.END消息的标题中,因为FileMarker可以序列化为 JSON。如果文件仅包含标题行,则该文件将被视为为空,因此,在拆分期间仅发出FileMarker实例(如果启用了标记,否则不会发出任何消息)。默认情况下(如果未设置标题名称),第一行被视为数据,并成为第一个发出的消息的有效负载。

如果您需要从文件内容中提取标题的更复杂逻辑(不是第一行,不是整行内容,不是一个特定的标题,等等),请考虑使用FileSplitter之前的标题丰富器。请注意,已移至标题的行可能会从正常内容处理中被过滤掉。

对拆分文件进行幂等的下游处理

apply-sequence为 true 时,拆分器会在SEQUENCE_NUMBER标题中添加行号(当markers为 true 时,标记被计为行)。行号可以与幂等接收器一起使用,以避免在重启后重新处理行。

例如

@Bean
public ConcurrentMetadataStore store() {
    return new ZookeeperMetadataStore();
}

@Bean
public MetadataStoreSelector selector() {
    return new MetadataStoreSelector(
            message -> message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class)
                    .getAbsolutePath(),
            message -> message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)
                    .toString(),
            store())
                    .compareValues(
                            (oldVal, newVal) -> Integer.parseInt(oldVal) < Integer.parseInt(newVal));
}

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
    return new IdempotentReceiverInterceptor(selector());
}

@Bean
public IntegrationFlow flow() {
    ...
    .split(new FileSplitter())
    ...
    .handle("lineHandler", e -> e.advice(idempotentReceiverInterceptor()))
    ...
}