FTP 入站通道适配器
FTP 入站通道适配器是一个特殊的监听器,它连接到 FTP 服务器并监听远程目录事件(例如,新文件创建),此时它会启动文件传输。以下示例展示了如何配置 inbound-channel-adapter
<int-ftp:inbound-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="ftpSessionFactory"
auto-create-local-directory="true"
delete-remote-files="true"
filename-pattern="*.txt"
remote-directory="some/remote/path"
remote-file-separator="/"
preserve-timestamp="true"
local-filename-generator-expression="#this.toUpperCase() + '.a'"
scanner="myDirScanner"
local-filter="myFilter"
temporary-file-suffix=".writing"
max-fetch-size="-1"
local-directory=".">
<int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>
如前配置所示,您可以使用 `inbound-channel-adapter` 元素配置 FTP 入站通道适配器,同时为各种属性提供值,例如 `local-directory`、`filename-pattern`(基于简单模式匹配,而不是正则表达式)以及对 `session-factory` 的引用。
默认情况下,传输的文件与原始文件具有相同的名称。如果您想覆盖此行为,您可以设置 `local-filename-generator-expression` 属性,它允许您提供一个 SpEL 表达式来生成本地文件的名称。与出站网关和适配器不同,出站网关和适配器的 SpEL 评估上下文的根对象是 `Message`,此入站适配器在评估时还没有消息,因为它最终使用传输的文件作为其有效负载生成消息。因此,SpEL 评估上下文的根对象是远程文件的原始名称(一个 `String`)。
入站通道适配器首先为本地目录检索 `File` 对象,然后根据轮询器配置发出每个文件。从版本 5.0 开始,您现在可以限制从 FTP 服务器获取的文件数量,当需要新的文件检索时。当目标文件非常大或您在具有持久文件列表过滤器的集群系统中运行时,这可能很有用,将在后面讨论。为此,请使用 `max-fetch-size`。负值(默认值)表示没有限制,将检索所有匹配的文件。有关更多信息,请参阅 入站通道适配器:控制远程文件获取。从版本 5.0 开始,您还可以通过设置 `scanner` 属性,向 `inbound-channel-adapter` 提供自定义 `DirectoryScanner` 实现。
从 Spring Integration 3.0 开始,您可以指定 `preserve-timestamp` 属性(其默认值为 `false`)。当为 `true` 时,本地文件的修改时间戳将设置为从服务器检索的值。否则,它将设置为当前时间。
从版本 4.2 开始,您可以指定 `remote-directory-expression` 而不是 `remote-directory`,让您在每次轮询时动态确定目录,例如,`remote-directory-expression="@myBean.determineRemoteDir()"`。
从版本 4.3 开始,您可以省略 `remote-directory` 和 `remote-directory-expression` 属性。它们默认为 `null`。在这种情况下,根据 FTP 协议,客户端工作目录将用作默认的远程目录。
有时,基于使用 `filename-pattern` 属性指定的简单模式的文件过滤可能不够。如果是这种情况,您可以使用 `filename-regex` 属性来指定正则表达式(例如 `filename-regex=".*\.test$"`)。此外,如果您需要完全控制,您可以使用 `filter` 属性并提供对 `o.s.i.file.filters.FileListFilter` 的任何自定义实现的引用,这是一种用于过滤文件列表的策略接口。此过滤器确定检索哪些远程文件。您还可以通过使用 `CompositeFileListFilter` 将基于模式的过滤器与其他过滤器(例如 `AcceptOnceFileListFilter` 以避免同步先前已获取的文件)结合起来。
AcceptOnceFileListFilter
将其状态存储在内存中。如果您希望状态在系统重启后仍然存在,请考虑使用 FtpPersistentAcceptOnceFileListFilter
。此过滤器将接受的文件名存储在 MetadataStore
策略的实例中(参见 元数据存储)。此过滤器根据文件名和远程修改时间进行匹配。
从 4.0 版本开始,此过滤器需要一个 ConcurrentMetadataStore
。当与共享数据存储(例如使用 RedisMetadataStore
的 Redis
)一起使用时,它允许过滤器键在多个应用程序或服务器实例之间共享。
从 5.0 版本开始,FtpPersistentAcceptOnceFileListFilter
与内存中的 SimpleMetadataStore
一起默认应用于 FtpInboundFileSynchronizer
。此过滤器也应用于 XML 配置中的 regex
或 pattern
选项,以及 Java DSL 中的 FtpInboundChannelAdapterSpec
。任何其他用例都可以使用 CompositeFileListFilter
(或 ChainFileListFilter
)来管理。
以上讨论指的是在检索文件之前对文件进行过滤。一旦文件被检索,就会对文件系统上的文件应用一个额外的过滤器。默认情况下,这是一个 AcceptOnceFileListFilter
,正如前面所讨论的,它保留状态在内存中,并且不考虑文件的修改时间。除非您的应用程序在处理后删除文件,否则适配器将在应用程序重启后默认情况下重新处理磁盘上的文件。
此外,如果您将 filter
配置为使用 FtpPersistentAcceptOnceFileListFilter
,并且远程文件时间戳发生更改(导致它被重新获取),默认的本地过滤器不会允许处理此新文件。
有关此过滤器及其使用方法的更多信息,请参见 远程持久文件列表过滤器。
您可以使用 local-filter
属性来配置本地文件系统过滤器的行为。从 4.3.8 版本开始,默认情况下配置了一个 FileSystemPersistentAcceptOnceFileListFilter
。此过滤器将接受的文件名和修改时间戳存储在 MetadataStore
策略的实例中(参见 元数据存储),并检测本地文件修改时间的更改。默认的 MetadataStore
是一个 SimpleMetadataStore
,它将状态存储在内存中。
从 4.1.5 版本开始,这些过滤器有一个新的属性(flushOnUpdate
),它会导致它们在每次更新时刷新元数据存储(如果存储实现 Flushable
)。
此外,如果您使用分布式 MetadataStore (例如 Redis),您可以拥有相同适配器或应用程序的多个实例,并确保每个文件只被处理一次。
|
实际的本地过滤器是一个 CompositeFileListFilter
,它包含提供的过滤器和一个模式过滤器,该过滤器阻止处理正在下载过程中的文件(基于 temporary-file-suffix
)。文件使用此后缀(默认值为 .writing
)下载,并且文件在传输完成后重命名为其最终名称,使其对过滤器“可见”。
remote-file-separator
属性允许您配置一个文件分隔符字符,以供在默认的 '/' 不适用于您的特定环境时使用。
有关这些属性的更多详细信息,请参阅 schema。
您还应该了解,FTP 入站通道适配器是一个轮询消费者。因此,您必须配置一个轮询器(通过使用全局默认值或本地子元素)。一旦文件被传输,就会生成一个带有 java.io.File
作为其有效负载的消息,并将其发送到由 channel
属性标识的通道。
从 6.2 版本开始,您可以使用 FtpLastModifiedFileListFilter
基于最后修改策略过滤 FTP 文件。此过滤器可以配置一个 age
属性,以便只有比此值更旧的文件才能通过过滤器。年龄默认为 60 秒,但您应该选择一个足够大的年龄,以避免过早地获取文件(例如,由于网络故障)。查看其 Javadoc 以获取更多信息。
关于文件过滤和不完整文件的更多信息
有时,刚刚出现在监控(远程)目录中的文件并不完整。通常,这样的文件是用一个临时扩展名(例如 somefile.txt.writing
)写入的,然后在写入过程完成后重命名。在大多数情况下,您只对完整的文件感兴趣,并且希望只过滤完整的文件。为了处理这些情况,您可以使用 filename-pattern
、filename-regex
和 filter
属性提供的过滤支持。以下示例使用自定义过滤器实现
<int-ftp:inbound-channel-adapter
channel="ftpChannel"
session-factory="ftpSessionFactory"
filter="customFilter"
local-directory="file:/my_transfers">
remote-directory="some/remote/path"
<int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>
<bean id="customFilter" class="org.example.CustomFilter"/>
入站 FTP 适配器的轮询器配置说明
入站 FTP 适配器的任务包括两个部分
-
与远程服务器通信,以便将文件从远程目录传输到本地目录。
-
对于每个传输的文件,生成一个以该文件作为有效负载的消息,并将其发送到由 'channel' 属性标识的通道。这就是为什么它们被称为 "'通道适配器'" 而不是仅仅 "'适配器'"。这种适配器的主要工作是生成一个消息发送到消息通道。本质上,第二个任务优先于第一个任务,因此,如果您的本地目录中已经存在一个或多个文件,它首先从这些文件中生成消息。只有在所有本地文件都被处理后,它才会启动远程通信以检索更多文件。
此外,在配置轮询器上的触发器时,您应该密切注意 `max-messages-per-poll` 属性。它的默认值为 `1`,适用于所有 `SourcePollingChannelAdapter` 实例(包括 FTP)。这意味着,一旦处理完一个文件,它就会等待由您的触发器配置确定的下一次执行时间。如果您碰巧在 `local-directory` 中有一个或多个文件,它将在启动与远程 FTP 服务器的通信之前处理这些文件。此外,如果 `max-messages-per-poll` 设置为 `1`(默认值),它一次只处理一个文件,间隔由您的触发器定义,本质上是“一次轮询 === 一个文件”。
对于典型的文件传输用例,您很可能希望相反的行为:在每次轮询中处理所有可以处理的文件,然后才等待下一次轮询。如果是这种情况,请将 `max-messages-per-poll` 设置为 -1。然后,在每次轮询中,适配器都会尝试生成尽可能多的消息。换句话说,它会处理本地目录中的所有内容,然后连接到远程目录,将所有可供本地处理的文件传输到本地。只有在完成轮询操作后,轮询器才会等待下一次执行时间。
您也可以将 'max-messages-per-poll' 值设置为一个正值,该值表示每次轮询从文件中创建的消息的上限。例如,值为 `10` 表示每次轮询尝试处理的文件不超过十个。
从故障中恢复
了解适配器的架构非常重要。有一个文件同步器,它获取文件,以及一个 `FileReadingMessageSource`,它为每个同步的文件发出一个消息。如前所述,涉及两个过滤器。`filter` 属性(和模式)指的是远程(FTP)文件列表,以避免获取已经获取的文件。`local-filter` 由 `FileReadingMessageSource` 使用,以确定哪些文件将作为消息发送。
同步器列出远程文件并查询其过滤器。然后传输文件。如果在文件传输过程中发生 IO 错误,则会删除已添加到过滤器的任何文件,以便它们在下次轮询时有资格重新获取。这仅适用于过滤器实现 `ReversibleFileListFilter`(例如 `AcceptOnceFileListFilter`)。
如果在同步文件后,下游流在处理文件时发生错误,则不会自动回滚过滤器,因此默认情况下不会重新处理失败的文件。
如果希望在失败后重新处理此类文件,可以使用类似以下配置来帮助从过滤器中删除失败的文件
<int-ftp:inbound-channel-adapter id="ftpAdapter"
session-factory="ftpSessionFactory"
channel="requestChannel"
remote-directory-expression="'/ftpSource'"
local-directory="file:myLocalDir"
auto-create-local-directory="true"
filename-pattern="*.txt">
<int:poller fixed-rate="1000">
<int:transactional synchronization-factory="syncFactory" />
</int:poller>
</int-ftp:inbound-channel-adapter>
<bean id="acceptOnceFilter"
class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />
<int:transaction-synchronization-factory id="syncFactory">
<int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>
<bean id="transactionManager"
class="org.springframework.integration.transaction.PseudoTransactionManager" />
上述配置适用于任何 ResettableFileListFilter
。
从 5.0 版本开始,入站通道适配器可以在本地构建与生成的本地文件名相对应的子目录。这也可以是远程子路径。为了能够根据层次结构支持递归地读取本地目录以进行修改,您现在可以为内部 FileReadingMessageSource
提供一个基于 Files.walk()
算法的新 RecursiveDirectoryScanner
。有关更多信息,请参见 AbstractInboundFileSynchronizingMessageSource.setScanner()
。此外,您现在可以使用 setUseWatchService()
选项将 AbstractInboundFileSynchronizingMessageSource
切换到基于 WatchService
的 DirectoryScanner
。它还配置为所有 WatchEventType
实例,以对本地目录中的任何修改做出反应。前面显示的重新处理示例基于 FileReadingMessageSource.WatchServiceDirectoryScanner
的内置功能,当文件从本地目录中删除(StandardWatchEventKinds.ENTRY_DELETE
)时执行 ResettableFileListFilter.remove()
。有关更多信息,请参见 WatchServiceDirectoryScanner
。
使用 Java 配置进行配置
以下 Spring Boot 应用程序展示了如何使用 Java 配置配置入站适配器的示例
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}
@Bean
public FtpInboundFileSynchronizer ftpInboundFileSynchronizer() {
FtpInboundFileSynchronizer fileSynchronizer = new FtpInboundFileSynchronizer(ftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory("foo");
fileSynchronizer.setFilter(new FtpSimplePatternFileListFilter("*.xml"));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "ftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> ftpMessageSource() {
FtpInboundFileSynchronizingMessageSource source =
new FtpInboundFileSynchronizingMessageSource(ftpInboundFileSynchronizer());
source.setLocalDirectory(new File("ftp-inbound"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
source.setMaxFetchSize(1);
return source;
}
@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
使用 Java DSL 进行配置
以下 Spring Boot 应用程序展示了如何使用 Java DSL 配置入站适配器的示例
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlow
.from(Ftp.inboundAdapter(this.ftpSessionFactory)
.preserveTimestamp(true)
.remoteDirectory("foo")
.regexFilter(".*\\.txt$")
.localFilename(f -> f.toUpperCase() + ".a")
.localDirectory(new File("d:\\ftp_files")),
e -> e.id("ftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(5000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}