FTP 流式入站通道适配器
4.3 版本引入了流入站通道适配器。此适配器生成有效负载类型为 InputStream
的消息,让文件可以在不写入本地文件系统的情况下获取。由于会话保持打开,因此当文件已使用时,使用应用程序负责关闭会话。会话在 closeableResource
标头(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE
)中提供。标准框架组件(例如 FileSplitter
和 StreamTransformer
)会自动关闭会话。请参阅 文件拆分器 和 流转换器 以了解有关这些组件的更多信息。以下示例展示如何配置 inbound-streaming-channel-adapter
<int-ftp:inbound-streaming-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="sessionFactory"
filename-pattern="*.txt"
filename-regex=".*\.txt"
filter="filter"
filter-expression="@myFilterBean.check(#root)"
remote-file-separator="/"
comparator="comparator"
max-fetch-size="1"
remote-directory-expression="'foo/bar'">
<int:poller fixed-rate="1000" />
</int-ftp:inbound-streaming-channel-adapter>
仅允许 filename-pattern
、filename-regex
、filter
或 filter-expression
中的一个。
从 5.0 版本开始,默认情况下,FtpStreamingMessageSource 适配器会根据基于内存中的 SimpleMetadataStore 的 FtpPersistentAcceptOnceFileListFilter ,防止远程文件出现重复。默认情况下,此过滤器也会与文件名模式(或正则表达式)一起应用。如果您需要允许重复,则可以使用 AcceptAllFileListFilter 。任何其他用例都可以通过 CompositeFileListFilter (或 ChainFileListFilter )来处理。Java 配置(本文档后面)展示了一种在处理后删除远程文件以避免重复的技术。
|
有关 FtpPersistentAcceptOnceFileListFilter
及其使用方式的更多信息,请参阅 远程持久文件列表过滤器。
使用 max-fetch-size
属性来限制在需要获取时每次轮询获取的文件数。在群集环境中运行时,将其设置为 1
并使用持久过滤器。有关更多信息,请参阅 入站通道适配器:控制远程文件获取。
适配器将远程目录和文件名分别放入 FileHeaders.REMOTE_DIRECTORY
和 FileHeaders.REMOTE_FILE
头中。从 5.0 版本开始,FileHeaders.REMOTE_FILE_INFO
头提供了其他远程文件信息(默认情况下以 JSON 表示)。如果您将 FtpStreamingMessageSource
上的 fileInfoJson
属性设置为 false
,则头将包含一个 FtpFileInfo
对象。可以通过使用 FtpFileInfo.getFileInfo()
方法来访问底层 Apache Net 库提供的 FTPFile
对象。当使用 XML 配置时,fileInfoJson
属性不可用,但您可以通过将 FtpStreamingMessageSource
注入到您的某个配置类中来设置它。另请参阅 远程文件信息。
从 5.1 版本开始,comparator
的泛型类型是 FTPFile
。以前,它是 AbstractFileInfo<FTPFile>
。这是因为现在在处理、筛选和应用 maxFetch
之前,会更早地执行排序。
使用 Java 配置进行配置
以下 Spring Boot 应用程序展示了如何使用 Java 配置对入站适配器进行配置的示例
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
@InboundChannelAdapter(channel = "stream")
public MessageSource<InputStream> ftpMessageSource() {
FtpStreamingMessageSource messageSource = new FtpStreamingMessageSource(template());
messageSource.setRemoteDirectory("ftpSource/");
messageSource.setFilter(new AcceptAllFileListFilter<>());
messageSource.setMaxFetchSize(1);
return messageSource;
}
@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public org.springframework.integration.transformer.Transformer transformer() {
return new StreamTransformer("UTF-8");
}
@Bean
public FtpRemoteFileTemplate template() {
return new FtpRemoteFileTemplate(ftpSessionFactory());
}
@ServiceActivator(inputChannel = "data", adviceChain = "after")
@Bean
public MessageHandler handle() {
return System.out::println;
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice after() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnSuccessExpression(
"@template.remove(headers['file_remoteDirectory'] + headers['file_remoteFile'])");
advice.setPropagateEvaluationFailures(true);
return advice;
}
}
请注意,在此示例中,转换器下游的消息处理程序有一个 advice
,它会在处理后删除远程文件。