FTP 入站通道适配器

FTP 入站通道适配器是一个特殊的监听器,它连接到 FTP 服务器并监听远程目录事件(例如,创建新文件),此时它会启动文件传输。以下示例显示了如何配置一个inbound-channel-adapter

<int-ftp:inbound-channel-adapter id="ftpInbound"
    channel="ftpChannel"
    session-factory="ftpSessionFactory"
    auto-create-local-directory="true"
    delete-remote-files="true"
    filename-pattern="*.txt"
    remote-directory="some/remote/path"
    remote-file-separator="/"
    preserve-timestamp="true"
    local-filename-generator-expression="#this.toUpperCase() + '.a'"
    scanner="myDirScanner"
    local-filter="myFilter"
    temporary-file-suffix=".writing"
    max-fetch-size="-1"
    local-directory=".">
    <int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>

如上述配置所示,您可以使用inbound-channel-adapter元素配置 FTP 入站通道适配器,同时为各种属性提供值,例如local-directoryfilename-pattern(基于简单的模式匹配,而不是正则表达式)以及对session-factory的引用。

默认情况下,传输的文件与原始文件具有相同的名称。如果要覆盖此行为,可以设置local-filename-generator-expression属性,该属性允许您提供一个 SpEL 表达式来生成本地文件的名称。与出站网关和适配器不同(其中 SpEL 评估上下文的根对象是Message),此入站适配器在评估时还没有消息,因为这是它最终使用传输的文件作为其有效负载生成的。因此,SpEL 评估上下文的根对象是远程文件的原始名称(一个String)。

入站通道适配器首先为本地目录检索File对象,然后根据轮询器配置发出每个文件。从 5.0 版本开始,您现在可以限制从 FTP 服务器获取的文件数量,当需要检索新文件时。当目标文件非常大或在具有持久文件列表过滤器的集群系统中运行时,这可能非常有用,稍后将对此进行讨论。为此,请使用max-fetch-size。负值(默认值)表示没有限制,将检索所有匹配的文件。有关详细信息,请参阅入站通道适配器:控制远程文件获取。从 5.0 版本开始,您还可以通过设置scanner属性向inbound-channel-adapter提供自定义的DirectoryScanner实现。

从 Spring Integration 3.0 开始,您可以指定preserve-timestamp属性(其默认值为false)。当为true时,本地文件的修改时间戳将设置为从服务器检索的值。否则,它将设置为当前时间。

从 4.2 版本开始,您可以指定remote-directory-expression而不是remote-directory,允许您在每次轮询时动态确定目录,例如remote-directory-expression="@myBean.determineRemoteDir()"

从 4.3 版本开始,您可以省略remote-directoryremote-directory-expression属性。它们默认为null。在这种情况下,根据 FTP 协议,客户端工作目录将用作默认远程目录。

有时,使用filename-pattern属性指定的简单模式进行的文件过滤可能不够。如果是这种情况,您可以使用filename-regex属性来指定正则表达式(例如filename-regex=".*\.test$")。此外,如果您需要完全控制,您可以使用filter属性并提供对o.s.i.file.filters.FileListFilter的任何自定义实现的引用,这是一个用于过滤文件列表的策略接口。此过滤器确定检索哪些远程文件。您还可以通过使用CompositeFileListFilter将基于模式的过滤器与其他过滤器(例如AcceptOnceFileListFilter以避免同步以前已获取的文件)结合使用。

AcceptOnceFileListFilter将状态存储在内存中。如果您希望状态在系统重新启动后仍然存在,请考虑改用FtpPersistentAcceptOnceFileListFilter。此过滤器将已接受的文件名存储在MetadataStore策略的实例中(请参阅元数据存储)。此过滤器根据文件名和远程修改时间进行匹配。

从 4.0 版本开始,此过滤器需要ConcurrentMetadataStore。当与共享数据存储(例如使用RedisMetadataStoreRedis)一起使用时,它允许在多个应用程序或服务器实例之间共享过滤器密钥。

从 5.0 版本开始,默认情况下,FtpInboundFileSynchronizer应用具有内存中SimpleMetadataStoreFtpPersistentAcceptOnceFileListFilter。此过滤器也应用于 XML 配置中的regexpattern选项以及 Java DSL 中的FtpInboundChannelAdapterSpec。任何其他用例都可以使用CompositeFileListFilter(或ChainFileListFilter)进行管理。

之前的讨论指的是在检索文件之前过滤文件。检索文件后,将对文件系统上的文件应用附加过滤器。默认情况下,这是一个AcceptOnceFileListFilter,如前所述,它保留内存中的状态,并且不考虑文件的修改时间。除非您的应用程序在处理后删除文件,否则适配器默认会在应用程序重新启动后重新处理磁盘上的文件。

此外,如果您将filter配置为使用FtpPersistentAcceptOnceFileListFilter并且远程文件时间戳发生更改(导致重新获取),则默认的本地过滤器不允许处理此新文件。

有关此过滤器及其使用方法的更多信息,请参阅远程持久文件列表过滤器

您可以使用local-filter属性配置本地文件系统过滤器的行为。从 4.3.8 版本开始,默认情况下配置了FileSystemPersistentAcceptOnceFileListFilter。此过滤器将已接受的文件名和修改时间戳存储在MetadataStore策略的实例中(请参阅元数据存储)并检测本地文件修改时间的更改。默认的MetadataStoreSimpleMetadataStore,它将状态存储在内存中。

从 4.1.5 版本开始,这些过滤器具有一个新属性 (flushOnUpdate),它会导致它们在每次更新时刷新元数据存储(如果存储实现Flushable)。

此外,如果您使用分布式MetadataStore(例如Redis),您可以拥有多个相同适配器或应用程序的实例,并确保每个文件仅处理一次。

实际的本地过滤器是一个CompositeFileListFilter,它包含提供的过滤器和一个模式过滤器,该过滤器可以防止处理正在下载的文件(基于temporary-file-suffix)。使用此后缀下载文件(默认为.writing),并且在传输完成后将文件重命名为其最终名称,使其对过滤器“可见”。

remote-file-separator属性允许您配置文件分隔符字符,如果默认的“/”不适用于您的特定环境。

有关这些属性的更多详细信息,请参阅模式

您还应该了解,FTP 入站通道适配器是一个轮询使用者。因此,您必须配置一个轮询器(使用全局默认值或本地子元素)。传输文件后,将生成一个以java.io.File作为其有效负载的消息,并将其发送到由channel属性标识的通道。

从 6.2 版本开始,您可以使用FtpLastModifiedFileListFilter根据上次修改策略过滤 FTP 文件。此过滤器可以使用age属性进行配置,以便只有比此值旧的文件才能通过过滤器。年龄默认为 60 秒,但您应该选择一个足够大的年龄以避免过早拾取文件(例如,由于网络故障)。查看其Javadoc以获取更多信息。

关于文件过滤和不完整文件的更多信息

有时,监控的(远程)目录中新出现的文件可能不完整。通常,此类文件使用临时扩展名写入(例如somefile.txt.writing),然后在写入过程完成后重命名。在大多数情况下,您只对完整的文件感兴趣,并且希望仅筛选完整的文件。为了处理这些场景,您可以使用filename-patternfilename-regexfilter属性提供的筛选支持。以下示例使用自定义过滤器实现。

<int-ftp:inbound-channel-adapter
    channel="ftpChannel"
    session-factory="ftpSessionFactory"
    filter="customFilter"
    local-directory="file:/my_transfers">
    remote-directory="some/remote/path"
    <int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>

<bean id="customFilter" class="org.example.CustomFilter"/>

入站 FTP 适配器的轮询器配置说明

入站 FTP 适配器的任务包括两项:

  1. 与远程服务器通信,以便将文件从远程目录传输到本地目录。

  2. 对于每个传输的文件,生成一条消息,该消息以该文件作为有效负载,并将其发送到由“channel”属性标识的通道。这就是为什么它们被称为“通道适配器”,而不是仅仅被称为“适配器”。此类适配器的主要工作是生成消息以发送到消息通道。本质上,第二个任务优先,因此,如果您的本地目录已经有一个或多个文件,它首先会从这些文件中生成消息。只有在处理完所有本地文件后,它才会启动远程通信以检索更多文件。

此外,在轮询器上配置触发器时,应密切注意max-messages-per-poll属性。对于所有SourcePollingChannelAdapter实例(包括 FTP),其默认值为1。这意味着,一旦处理了一个文件,它就会等待根据触发器配置确定的下一个执行时间。如果您碰巧在local-directory中有一个或多个文件,它会在启动与远程 FTP 服务器的通信之前处理这些文件。此外,如果max-messages-per-poll设置为1(默认值),它一次只处理一个文件,间隔由您的触发器定义,基本上工作方式为“一次轮询===一个文件”。

对于典型的文件传输用例,您很可能需要相反的行为:为每次轮询处理所有可以处理的文件,然后才等待下一次轮询。如果是这种情况,请将max-messages-per-poll设置为 -1。然后,在每次轮询中,适配器都会尝试生成尽可能多的消息。换句话说,它处理本地目录中的所有内容,然后连接到远程目录以传输所有可在此处本地处理的内容。只有完成轮询操作后,轮询器才会等待下一个执行时间。

或者,您可以将“max-messages-per-poll”值设置为一个正值,该值指示每次轮询从文件中创建的消息的上限。例如,值为10表示每次轮询最多尝试处理十个文件。

从故障中恢复

了解适配器的架构非常重要。有一个文件同步器来获取文件,以及一个FileReadingMessageSource,它为每个同步的文件发出消息。如前所述,涉及两个过滤器。filter属性(和模式)引用远程(FTP)文件列表,以避免获取已经获取的文件。local-filterFileReadingMessageSource使用,以确定哪些文件将作为消息发送。

同步器列出远程文件并查阅其过滤器。然后传输文件。如果在文件传输过程中发生 IO 错误,则会删除已添加到过滤器的任何文件,以便它们在下一次轮询时可以重新获取。这仅适用于过滤器实现ReversibleFileListFilter(例如AcceptOnceFileListFilter)的情况。

如果在同步文件后,下游流处理文件时发生错误,则不会自动回滚过滤器,因此默认情况下不会重新处理失败的文件。

如果您希望在失败后重新处理此类文件,可以使用类似于以下配置来促进从过滤器中删除失败的文件。

<int-ftp:inbound-channel-adapter id="ftpAdapter"
        session-factory="ftpSessionFactory"
        channel="requestChannel"
        remote-directory-expression="'/ftpSource'"
        local-directory="file:myLocalDir"
        auto-create-local-directory="true"
        filename-pattern="*.txt">
    <int:poller fixed-rate="1000">
        <int:transactional synchronization-factory="syncFactory" />
    </int:poller>
</int-ftp:inbound-channel-adapter>

<bean id="acceptOnceFilter"
    class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>

<bean id="transactionManager"
    class="org.springframework.integration.transaction.PseudoTransactionManager" />

上述配置适用于任何ResettableFileListFilter

从 5.0 版本开始,入站通道适配器可以在本地构建与生成的本地文件名相对应的子目录。这也可以是远程子路径。为了能够根据层次结构支持递归读取本地目录以进行修改,您现在可以使用基于Files.walk()算法的新RecursiveDirectoryScanner提供内部FileReadingMessageSource。有关更多信息,请参阅AbstractInboundFileSynchronizingMessageSource.setScanner()。此外,您现在可以使用setUseWatchService()选项将AbstractInboundFileSynchronizingMessageSource切换到基于WatchServiceDirectoryScanner。它也针对所有WatchEventType实例进行配置,以对本地目录中的任何修改做出反应。前面显示的重新处理示例基于FileReadingMessageSource.WatchServiceDirectoryScanner的内置功能,当文件从本地目录中删除(StandardWatchEventKinds.ENTRY_DELETE)时执行ResettableFileListFilter.remove()。有关更多信息,请参阅WatchServiceDirectoryScanner

使用 Java 配置进行配置

以下 Spring Boot 应用程序展示了如何使用 Java 配置配置入站适配器的示例。

@SpringBootApplication
public class FtpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FtpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public SessionFactory<FTPFile> ftpSessionFactory() {
        DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
        sf.setHost("localhost");
        sf.setPort(port);
        sf.setUsername("foo");
        sf.setPassword("foo");
        sf.setTestSession(true);
        return new CachingSessionFactory<FTPFile>(sf);
    }

    @Bean
    public FtpInboundFileSynchronizer ftpInboundFileSynchronizer() {
        FtpInboundFileSynchronizer fileSynchronizer = new FtpInboundFileSynchronizer(ftpSessionFactory());
        fileSynchronizer.setDeleteRemoteFiles(false);
        fileSynchronizer.setRemoteDirectory("foo");
        fileSynchronizer.setFilter(new FtpSimplePatternFileListFilter("*.xml"));
        return fileSynchronizer;
    }

    @Bean
    @InboundChannelAdapter(channel = "ftpChannel", poller = @Poller(fixedDelay = "5000"))
    public MessageSource<File> ftpMessageSource() {
        FtpInboundFileSynchronizingMessageSource source =
                new FtpInboundFileSynchronizingMessageSource(ftpInboundFileSynchronizer());
        source.setLocalDirectory(new File("ftp-inbound"));
        source.setAutoCreateLocalDirectory(true);
        source.setLocalFilter(new AcceptOnceFileListFilter<File>());
        source.setMaxFetchSize(1);
        return source;
    }

    @Bean
    @ServiceActivator(inputChannel = "ftpChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }

        };
    }

}

使用 Java DSL 进行配置

以下 Spring Boot 应用程序展示了如何使用 Java DSL 配置入站适配器的示例。

@SpringBootApplication
public class FtpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FtpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow ftpInboundFlow() {
        return IntegrationFlow
            .from(Ftp.inboundAdapter(this.ftpSessionFactory)
                    .preserveTimestamp(true)
                    .remoteDirectory("foo")
                    .regexFilter(".*\\.txt$")
                    .localFilename(f -> f.toUpperCase() + ".a")
                    .localDirectory(new File("d:\\ftp_files")),
                e -> e.id("ftpInboundAdapter")
                    .autoStartup(true)
                    .poller(Pollers.fixedDelay(5000)))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
    }
}

处理不完整的数据

提供FtpSystemMarkerFilePresentFileListFilter用于过滤远程系统上没有对应标记文件的远程文件。有关配置信息,请参阅Javadoc(并浏览父类)。