SFTP 流入站通道适配器

版本 4.3 引入了流式入站通道适配器。此适配器生成有效负载类型为 InputStream 的消息,允许您获取文件而不写入本地文件系统。由于会话保持打开状态,因此在使用完文件后,消费应用程序负责关闭会话。会话在 closeableResource 标头(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE)中提供。标准框架组件(例如 FileSplitterStreamTransformer)会自动关闭会话。有关这些组件的更多信息,请参阅 文件拆分器流转换器

<int-sftp:inbound-streaming-channel-adapter id="ftpInbound"
            channel="ftpChannel"
            session-factory="sessionFactory"
            filename-pattern="*.txt"
            filename-regex=".*\.txt"
            filter="filter"
            filter-expression="@myFilterBean.check(#root)"
            remote-file-separator="/"
            comparator="comparator"
            max-fetch-size="1"
            remote-directory-expression="'foo/bar'">
        <int:poller fixed-rate="1000" />
</int-sftp:inbound-streaming-channel-adapter>

您只能使用 filename-patternfilename-regexfilterfilter-expression 之一。

从版本 5.0 开始,默认情况下,SftpStreamingMessageSource 适配器通过使用基于内存中 SimpleMetadataStoreSftpPersistentAcceptOnceFileListFilter 来防止远程文件重复。默认情况下,此过滤器也会与文件名模式(或正则表达式)一起应用。如果您需要允许重复,则可以使用 AcceptAllFileListFilter。您可以通过使用 CompositeFileListFilter(或 ChainFileListFilter)来处理任何其他用例。稍后显示的 Java 配置显示了一种在处理后删除远程文件的技术,从而避免重复。

有关 SftpPersistentAcceptOnceFileListFilter 及其使用方法的更多信息,请参阅 远程持久文件列表过滤器

您可以使用 max-fetch-size 属性来限制在每次轮询时获取的文件数量(如果需要获取)。在集群环境中运行时,将其设置为 1 并使用持久过滤器。有关更多信息,请参阅 入站通道适配器:控制远程文件获取

适配器将远程目录和文件名放入标头(分别为 FileHeaders.REMOTE_DIRECTORYFileHeaders.REMOTE_FILE)。从版本 5.0 开始,FileHeaders.REMOTE_FILE_INFO 标头提供了其他远程文件信息(以 JSON 格式)。如果您将 SftpStreamingMessageSource 上的 fileInfoJson 属性设置为 false,则标头包含一个 SftpFileInfo 对象。您可以通过使用 SftpFileInfo.getFileInfo() 方法访问底层 SftpClient 提供的 SftpClient.DirEntry 对象。当您使用 XML 配置时,fileInfoJson 属性不可用,但您可以通过将 SftpStreamingMessageSource 注入到您的配置类之一中来设置它。另请参阅 远程文件信息

使用 Java 配置进行配置

以下 Spring Boot 应用程序显示了如何使用 Java 配置入站适配器的示例

@SpringBootApplication
public class SftpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    @InboundChannelAdapter(channel = "stream")
    public MessageSource<InputStream> ftpMessageSource() {
        SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template());
        messageSource.setRemoteDirectory("sftpSource/");
        messageSource.setFilter(new AcceptAllFileListFilter<>());
        messageSource.setMaxFetchSize(1);
        return messageSource;
    }

    @Bean
    @Transformer(inputChannel = "stream", outputChannel = "data")
    public org.springframework.integration.transformer.Transformer transformer() {
        return new StreamTransformer("UTF-8");
    }

    @Bean
    public SftpRemoteFileTemplate template() {
        return new SftpRemoteFileTemplate(sftpSessionFactory());
    }

    @ServiceActivator(inputChannel = "data", adviceChain = "after")
    @Bean
    public MessageHandler handle() {
        return System.out::println;
    }

    @Bean
    public ExpressionEvaluatingRequestHandlerAdvice after() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpression(
                "@template.remove(headers['file_remoteDirectory'] + '/' +  headers['file_remoteFile'])");
        advice.setPropagateEvaluationFailures(true);
        return advice;
    }

}

请注意,在此示例中,转换器下游的消息处理程序具有一个 advice,该 advice 在处理后删除远程文件。