读取文件

可以使用FileReadingMessageSource从文件系统读取文件。这是一个MessageSource的实现,它从文件系统目录创建消息。以下示例演示如何配置FileReadingMessageSource

<bean id="pollableFileSource"
    class="org.springframework.integration.file.FileReadingMessageSource"
    p:directory="${input.directory}"/>

为了防止为某些文件创建消息,您可以提供一个FileListFilter。默认情况下,我们使用以下过滤器:

  • IgnoreHiddenFileListFilter

  • AcceptOnceFileListFilter

IgnoreHiddenFileListFilter确保不处理隐藏文件。请注意,隐藏文件的精确定义取决于系统。例如,在基于UNIX的系统上,以句点字符开头的文件被认为是隐藏文件。另一方面,Microsoft Windows有一个专用的文件属性来指示隐藏文件。

4.2版本引入了IgnoreHiddenFileListFilter。在之前的版本中,包含隐藏文件。在默认配置下,IgnoreHiddenFileListFilter首先触发,然后是AcceptOnceFileListFilter

AcceptOnceFileListFilter确保仅从目录中拾取文件一次。

AcceptOnceFileListFilter将其状态存储在内存中。如果您希望状态在系统重启后仍然存在,您可以使用FileSystemPersistentAcceptOnceFileListFilter。此过滤器将已接受的文件名存储在MetadataStore实现中(参见元数据存储)。此过滤器根据文件名和修改时间进行匹配。

从4.0版本开始,此过滤器需要一个ConcurrentMetadataStore。当与共享数据存储(例如使用RedisMetadataStoreRedis)一起使用时,它允许在多个应用程序实例之间或在多个服务器使用的网络文件共享之间共享过滤器键。

从4.1.5版本开始,此过滤器新增了一个属性(flushOnUpdate),它会导致过滤器在每次更新时刷新元数据存储(如果存储实现Flushable)。

持久性文件列表过滤器现在具有一个布尔属性forRecursion。将此属性设置为true,还会设置alwaysAcceptDirectories,这意味着出站网关(lsmget)上的递归操作现在每次都会遍历整个目录树。这是为了解决目录树深处更改未被检测到的问题。此外,forRecursion=true导致使用文件的完整路径作为元数据存储键;这解决了如果在不同目录中多次出现同名文件,则过滤器无法正常工作的问题。重要:这意味着持久性元数据存储中存在的键将找不到顶级目录下的文件。因此,此属性默认为false;这可能会在将来的版本中更改。

以下示例配置具有过滤器的FileReadingMessageSource

<bean id="pollableFileSource"
    class="org.springframework.integration.file.FileReadingMessageSource"
    p:inputDirectory="${input.directory}"
    p:filter-ref="customFilterBean"/>

读取文件的一个常见问题是,在文件准备好之前可能会检测到该文件(即,某些其他进程可能仍在写入该文件)。默认的AcceptOnceFileListFilter不会阻止这种情况。在大多数情况下,如果文件写入进程在准备好读取时立即重命名每个文件,则可以防止这种情况。一个filename-patternfilename-regex过滤器,它只接受已准备好的文件(可能基于已知的后缀),与默认的AcceptOnceFileListFilter组合使用,允许这种情况。CompositeFileListFilter启用组合,如下例所示:

<bean id="pollableFileSource"
    class="org.springframework.integration.file.FileReadingMessageSource"
    p:inputDirectory="${input.directory}"
    p:filter-ref="compositeFilter"/>

<bean id="compositeFilter"
    class="org.springframework.integration.file.filters.CompositeFileListFilter">
    <constructor-arg>
        <list>
            <bean class="o.s.i.file.filters.AcceptOnceFileListFilter"/>
            <bean class="o.s.i.file.filters.RegexPatternFileListFilter">
                <constructor-arg value="^test.*$"/>
            </bean>
        </list>
    </constructor-arg>
</bean>

如果无法使用临时名称创建文件并重命名为最终名称,Spring Integration提供了另一种替代方案。4.2版本添加了LastModifiedFileListFilter。此过滤器可以使用age属性进行配置,以便只有比此值旧的文件才能通过过滤器。年龄默认为60秒,但您应该选择一个足够大的年龄,以避免过早拾取文件(例如,由于网络故障)。以下示例演示如何配置LastModifiedFileListFilter

<bean id="filter" class="org.springframework.integration.file.filters.LastModifiedFileListFilter">
    <property name="age" value="120" />
</bean>

从4.3.7版本开始,引入了ChainFileListFilterCompositeFileListFilter的扩展),允许在后续过滤器只应查看先前过滤器的结果的情况下使用。(使用CompositeFileListFilter,所有过滤器都会看到所有文件,但它只传递已通过所有过滤器的文件)。需要新行为的一个示例是LastModifiedFileListFilterAcceptOnceFileListFilter的组合,当我们不希望在经过一段时间后才接受文件时。使用CompositeFileListFilter,由于AcceptOnceFileListFilter在第一次传递时会看到所有文件,因此当其他过滤器这样做时,它不会在稍后传递它。当模式过滤器与查找辅助文件以指示文件传输完成的自定义过滤器组合使用时,CompositeFileListFilter方法很有用。模式过滤器可能只传递主文件(例如something.txt),但“完成”过滤器需要查看(例如)something.done是否存在。

假设我们有文件a.txta.doneb.txt

模式过滤器只传递a.txtb.txt,“完成”过滤器查看所有三个文件,只传递a.txt。组合过滤器的最终结果是只有a.txt被释放。

使用ChainFileListFilter,如果链中的任何过滤器返回空列表,则不会调用其余过滤器。

5.0版本引入了ExpressionFileListFilter,用于针对文件作为上下文评估根对象执行SpEL表达式。为此,所有用于文件处理(本地和远程)的XML组件以及现有的filter属性都已提供filter-expression选项,如下例所示:

<int-file:inbound-channel-adapter
        directory="${inputdir}"
        filter-expression="name matches '.text'"
        auto-startup="false"/>

5.0.5版本引入了DiscardAwareFileListFilter实现,这些实现对被拒绝的文件感兴趣。为此,应通过addDiscardCallback(Consumer<File>)为这样的过滤器实现提供回调。在框架中,此功能与LastModifiedFileListFilter结合使用,来自FileReadingMessageSource.WatchServiceDirectoryScanner。与常规的DirectoryScanner不同,WatchService根据目标文件系统上的事件提供要处理的文件。在轮询包含这些文件的内部队列时,LastModifiedFileListFilter可能会丢弃它们,因为相对于其配置的age而言,它们太年轻了。因此,我们丢失了将来可能考虑的文件。丢弃回调钩子允许我们将文件保留在内部队列中,以便在后续轮询中检查ageCompositeFileListFilter也实现了一个DiscardAwareFileListFilter,并向其所有DiscardAwareFileListFilter委托填充丢弃回调。

由于CompositeFileListFilter根据所有委托匹配文件,因此discardCallback可能会对同一文件调用多次。

从5.1版本开始,FileReadingMessageSource不会检查目录是否存在,也不会在调用其start()(通常通过包装SourcePollingChannelAdapter)之前创建目录。以前,当引用目录时(例如来自测试,或者当稍后应用权限时),没有简单的方法可以防止操作系统权限错误。

消息头

从5.0版本开始,FileReadingMessageSource(除了将轮询的File作为payload之外)还会将以下头填充到出站Message中:

  • FileHeaders.FILENAME:要发送文件的File.getName()。可用于后续重命名或复制逻辑。

  • FileHeaders.ORIGINAL_FILEFile对象本身。通常,当我们丢失原始File对象时,框架组件(例如拆分器转换器)会自动填充此标头。但是,为了与任何其他自定义用例保持一致性和方便性,此标头对于访问原始文件很有用。

  • FileHeaders.RELATIVE_PATH:引入的新标头,用于表示相对于扫描的根目录的文件路径的一部分。当需要在其他地方还原源目录层次结构时,此标头非常有用。为此,可以配置DefaultFileNameGenerator(参见“`生成文件名”)以使用此标头。

目录扫描和轮询

FileReadingMessageSource不会立即生成目录中文件的的消息。它使用内部队列来存储由scanner返回的“合格文件”。scanEachPoll选项用于确保在每次轮询时,内部队列都会使用最新的输入目录内容进行刷新。默认情况下(scanEachPoll = false),FileReadingMessageSource会在再次扫描目录之前清空其队列。此默认行为对于减少对目录中大量文件的扫描特别有用。但是,在需要自定义排序的情况下,务必考虑将此标志设置为true的影响。文件的处理顺序可能与预期不符。默认情况下,队列中的文件按其自然(path)顺序处理。即使队列中已经有文件,扫描添加的新文件也会插入到适当的位置以保持该自然顺序。要自定义顺序,FileReadingMessageSource可以接受Comparator<File>作为构造函数参数。内部(PriorityBlockingQueue)使用它根据业务需求重新排序其内容。因此,要按特定顺序处理文件,应向FileReadingMessageSource提供比较器,而不是对自定义DirectoryScanner生成的列表进行排序。

5.0 版本引入了RecursiveDirectoryScanner来执行文件树遍历。该实现基于Files.walk(Path start, int maxDepth, FileVisitOption…​ options)功能。根目录(DirectoryScanner.listFiles(File))参数不包含在结果中。所有其他子目录的包含和排除都基于目标FileListFilter实现。例如,SimplePatternFileListFilter默认会过滤掉目录。有关更多信息,请参见AbstractDirectoryAwareFileListFilter及其实现。

从 5.5 版本开始,Java DSL 的FileInboundChannelAdapterSpec具有方便的recursive(boolean)选项,可在目标FileReadingMessageSource中使用RecursiveDirectoryScanner,而不是默认的扫描器。

命名空间支持

可以使用特定于文件的命名空间简化文件读取的配置。为此,请使用以下模板

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-file="http://www.springframework.org/schema/integration/file"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/file
    https://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
</beans>

在此命名空间内,您可以减少FileReadingMessageSource并将其包装在入站通道适配器中,如下所示

<int-file:inbound-channel-adapter id="filesIn1"
    directory="file:${input.directory}" prevent-duplicates="true" ignore-hidden="true"/>

<int-file:inbound-channel-adapter id="filesIn2"
    directory="file:${input.directory}"
    filter="customFilterBean" />

<int-file:inbound-channel-adapter id="filesIn3"
    directory="file:${input.directory}"
    filename-pattern="test*" />

<int-file:inbound-channel-adapter id="filesIn4"
    directory="file:${input.directory}"
    filename-regex="test[0-9]+\.txt" />

第一个通道适配器示例依赖于默认的FileListFilter实现

  • IgnoreHiddenFileListFilter(不处理隐藏文件)

  • AcceptOnceFileListFilter(防止重复)

因此,您也可以省略prevent-duplicatesignore-hidden属性,因为它们默认为true

Spring Integration 4.2 引入了ignore-hidden属性。在早期版本中,包含隐藏文件。

第二个通道适配器示例使用自定义过滤器,第三个使用filename-pattern属性添加基于AntPathMatcher的过滤器,第四个使用filename-regex属性向FileReadingMessageSource添加基于正则表达式的模式过滤器。filename-patternfilename-regex属性与常规filter引用属性互斥。但是,您可以使用filter属性引用CompositeFileListFilter的实例,该实例可以组合任意数量的过滤器,包括一个或多个基于模式的过滤器,以满足您的特定需求。

当多个进程从同一目录读取时,您可能希望锁定文件以防止它们被同时获取。为此,您可以使用FileLocker。有一个基于java.nio的实现可用,但也可以实现您自己的锁定方案。可以按如下方式注入nio锁:

<int-file:inbound-channel-adapter id="filesIn"
    directory="file:${input.directory}" prevent-duplicates="true">
    <int-file:nio-locker/>
</int-file:inbound-channel-adapter>

您可以按如下方式配置自定义锁:

<int-file:inbound-channel-adapter id="filesIn"
    directory="file:${input.directory}" prevent-duplicates="true">
    <int-file:locker ref="customLocker"/>
</int-file:inbound-channel-adapter>
当文件入站适配器配置了锁时,它负责在允许接收文件之前获取锁。它不承担解锁文件的责任。如果您已处理文件并让锁保持挂起状态,则会出现内存泄漏。如果这是一个问题,则应在适当的时候自行调用FileLocker.unlock(File file)

当过滤和锁定文件不足时,您可能需要完全控制文件的列出方式。要实现此类需求,可以使用DirectoryScanner的实现。此扫描器允许您准确确定每次轮询中列出的文件。这也是 Spring Integration 用于将FileListFilter实例和FileLocker连接到FileReadingMessageSource的内部接口。您可以将自定义DirectoryScanner注入到<int-file:inbound-channel-adapter/>scanner属性中,如下例所示

<int-file:inbound-channel-adapter id="filesIn" directory="file:${input.directory}"
     scanner="customDirectoryScanner"/>

这样做可以让您完全自由地选择排序、列出和锁定策略。

还必须了解,过滤器(包括patternsregexprevent-duplicates等)和locker实例实际上是由scanner使用的。适配器上设置的任何这些属性随后都会注入到内部scanner中。对于外部scanner,在FileReadingMessageSource上禁止所有过滤器和锁属性。必须在自定义DirectoryScanner上指定(如果需要)它们。换句话说,如果您将scanner注入到FileReadingMessageSource中,则应在该scanner上提供filterlocker,而不是在FileReadingMessageSource上。

默认情况下,DefaultDirectoryScanner使用IgnoreHiddenFileListFilterAcceptOnceFileListFilter。要阻止使用它们,您可以配置您自己的过滤器(例如AcceptAllFileListFilter)甚至将其设置为null

WatchServiceDirectoryScanner

FileReadingMessageSource.WatchServiceDirectoryScanner依赖于将新文件添加到目录时的文件系统事件。在初始化期间,注册目录以生成事件。初始文件列表也在初始化期间构建。遍历目录树时,遇到的任何子目录也会注册以生成事件。在第一次轮询中,返回遍历目录生成的初始文件列表。在后续轮询中,返回来自新创建事件的文件。如果添加了新的子目录,则使用其创建事件遍历新的子树以查找现有文件并注册找到的任何新的子目录。

当程序无法像目录修改事件发生那样快速地清空其内部事件queue时,WatchKey存在问题。如果队列大小超过限制,则会发出StandardWatchEventKinds.OVERFLOW以指示某些文件系统事件可能丢失。在这种情况下,将完全重新扫描根目录。为了避免重复,请考虑使用合适的FileListFilter(例如AcceptOnceFileListFilter)或在处理完成后删除文件。

可以通过FileReadingMessageSource.use-watch-service选项启用WatchServiceDirectoryScanner,该选项与scanner选项互斥。为提供的directory填充内部FileReadingMessageSource.WatchServiceDirectoryScanner实例。

此外,现在WatchService轮询逻辑可以跟踪StandardWatchEventKinds.ENTRY_MODIFYStandardWatchEventKinds.ENTRY_DELETE

如果您需要跟踪现有文件的修改以及新文件,则应在FileListFilter中实现ENTRY_MODIFY事件逻辑。否则,这些事件的文件将以相同的方式处理。

ResettableFileListFilter实现拾取ENTRY_DELETE事件。因此,它们的文件将用于remove()操作。启用此事件时,AcceptOnceFileListFilter等过滤器会删除该文件。结果,如果出现同名文件,它将通过过滤器并作为消息发送。

为此,引入了watch-events属性(FileReadingMessageSource.setWatchEvents(WatchEventType…​ watchEvents))。(WatchEventTypeFileReadingMessageSource中的公共内部枚举。)使用此选项,我们可以对新文件使用一个下游流逻辑,对修改的文件使用其他逻辑。以下示例显示了如何在同一目录中为创建和修改事件配置不同的逻辑

值得一提的是,ENTRY_DELETE事件参与到被监视目录的子目录的重命名操作中。更具体地说,与先前目录名称相关的ENTRY_DELETE事件先于ENTRY_CREATE事件,后者通知新的(已重命名)目录。在某些操作系统(如 Windows)上,必须注册ENTRY_DELETE事件才能处理这种情况。否则,在文件资源管理器中重命名受监视的子目录可能会导致该子目录中的新文件未被检测到。

<int-file:inbound-channel-adapter id="newFiles"
     directory="${input.directory}"
     use-watch-service="true"/>

<int-file:inbound-channel-adapter id="modifiedFiles"
     directory="${input.directory}"
     use-watch-service="true"
     filter="acceptAllFilter"
     watch-events="MODIFY"/> <!-- The default is CREATE. -->

从 6.1 版本开始,FileReadingMessageSource公开了两个新的与WatchService相关的选项

  • watchMaxDepth - Files.walkFileTree(Path root, Set attributes, int maxDepth, FileVisitor visitor) API 的参数;

  • watchDirPredicate - 一个Predicate<Path>,用于测试是否应遍历扫描树中的目录并将其注册到WatchService和配置的监视事件类型。

限制内存消耗

您可以使用HeadDirectoryScanner来限制内存中保留的文件数量。这在扫描大型目录时非常有用。使用 XML 配置,可以通过在入站通道适配器上设置queue-size属性来启用此功能。

在 4.2 版本之前,此设置与使用任何其他过滤器不兼容。任何其他过滤器(包括prevent-duplicates="true")都会覆盖用于限制大小的过滤器。

使用HeadDirectoryScannerAcceptOnceFileListFilter不兼容。由于在轮询决策期间会咨询所有过滤器,因此AcceptOnceFileListFilter不知道其他过滤器可能暂时过滤文件。即使以前被HeadDirectoryScanner.HeadFilter过滤的文件现在可用,AcceptOnceFileListFilter也会过滤它们。

通常,在这种情况下,不要使用AcceptOnceFileListFilter,而应删除已处理的文件,以便以前被过滤的文件在将来的轮询中可用。

使用 Java 配置进行配置

以下 Spring Boot 应用程序显示了如何使用 Java 配置配置出站适配器的示例

@SpringBootApplication
public class FileReadingJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FileReadingJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public MessageChannel fileInputChannel() {
        return new DirectChannel();
    }

    @Bean
    @InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000"))
    public MessageSource<File> fileReadingMessageSource() {
         FileReadingMessageSource source = new FileReadingMessageSource();
         source.setDirectory(new File(INBOUND_PATH));
         source.setFilter(new SimplePatternFileListFilter("*.txt"));
         return source;
    }

    @Bean
    @Transformer(inputChannel = "fileInputChannel", outputChannel = "processFileChannel")
    public FileToStringTransformer fileToStringTransformer() {
        return new FileToStringTransformer();
    }

}

使用 Java DSL 进行配置

以下 Spring Boot 应用程序显示了如何使用 Java DSL 配置出站适配器的示例

@SpringBootApplication
public class FileReadingJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FileReadingJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow fileReadingFlow() {
         return IntegrationFlow
                  .from(Files.inboundAdapter(new File(INBOUND_PATH))
                              .patternFilter("*.txt"),
                          e -> e.poller(Pollers.fixedDelay(1000)))
                  .transform(Files.toStringTransformer())
                  .channel("processFileChannel")
                  .get();
    }

}

跟踪文件

另一个常用的案例是从文件的末尾(或尾部)获取“行”,并在添加新行时捕获它们。提供了两种实现方式。第一种,OSDelegatingFileTailingMessageProducer,使用原生tail命令(在拥有该命令的操作系统上)。这通常是这些平台上效率最高的实现方式。对于没有tail命令的操作系统,第二个实现ApacheCommonsFileTailingMessageProducer使用Apache commons-ioTailer类。

在这两种情况下,文件系统事件(例如文件不可用和其他事件)都将作为ApplicationEvent实例发布,方法是使用正常的Spring事件发布机制。此类事件的示例包括:

[message=tail: cannot open '/tmp/somefile' for reading:
               No such file or directory, file=/tmp/somefile]

[message=tail: '/tmp/somefile' has become accessible, file=/tmp/somefile]

[message=tail: '/tmp/somefile' has become inaccessible:
               No such file or directory, file=/tmp/somefile]

[message=tail: '/tmp/somefile' has appeared;
               following end of new file, file=/tmp/somefile]

例如,当文件被轮换时,可能会发生前面示例中所示的事件序列。

从5.0版本开始,当在idleEventInterval期间文件中没有数据时,会发出FileTailingIdleEvent。以下示例显示了此类事件的外观:

[message=Idle timeout, file=/tmp/somefile] [idle time=5438]
并非所有支持tail命令的平台都提供这些状态消息。

从这些端点发出的消息具有以下头信息:

  • FileHeaders.ORIGINAL_FILEFile对象

  • FileHeaders.FILENAME:文件名 (File.getName())

在5.0版本之前,FileHeaders.FILENAME头包含文件的绝对路径的字符串表示形式。现在,您可以通过调用原始文件头的getAbsolutePath()方法来获取该字符串表示形式。

以下示例使用默认选项('-F -n 0',表示从当前末尾跟踪文件名)创建一个原生适配器。

<int-file:tail-inbound-channel-adapter id="native"
	channel="input"
	task-executor="exec"
	file="/tmp/foo"/>

以下示例使用'-F -n +0'选项(表示跟踪文件名,发出所有现有行)创建一个原生适配器。

<int-file:tail-inbound-channel-adapter id="native"
	channel="input"
	native-options="-F -n +0"
	task-executor="exec"
	file-delay=10000
	file="/tmp/foo"/>

如果tail命令失败(在某些平台上,即使指定了-F,缺少文件也会导致tail失败),则每10秒重试一次该命令。

默认情况下,原生适配器从标准输出捕获内容并将其作为消息发送。它们还从标准错误捕获内容以引发事件。从4.3.6版本开始,您可以通过将enable-status-reader设置为false来丢弃标准错误事件,如下例所示:

<int-file:tail-inbound-channel-adapter id="native"
	channel="input"
	enable-status-reader="false"
	task-executor="exec"
	file="/tmp/foo"/>

在以下示例中,IdleEventInterval设置为5000,这意味着如果五秒钟内没有写入任何行,则每五秒钟触发一次FileTailingIdleEvent

<int-file:tail-inbound-channel-adapter id="native"
	channel="input"
	idle-event-interval="5000"
	task-executor="exec"
	file="/tmp/somefile"/>

当您需要停止适配器时,这非常有用。

以下示例创建一个Apache commons-io Tailer适配器,该适配器每两秒检查文件是否有新行,并每十秒检查是否存在缺少的文件。

<int-file:tail-inbound-channel-adapter id="apache"
	channel="input"
	task-executor="exec"
	file="/tmp/bar"
	delay="2000"
	end="false"             (1)
	reopen="true"           (2)
	file-delay="10000"/>
1 文件从开头(end="false")而不是从结尾(这是默认值)进行跟踪。
2 每个块都会重新打开文件(默认情况下保持文件打开)。
指定delayendreopen属性会强制使用Apache commons-io适配器,并使native-options属性不可用。

处理不完整的数据

文件传输场景中的一个常见问题是如何确定传输是否完成,以便您不会开始读取不完整的文件。解决此问题的常用技术是使用临时名称写入文件,然后将其原子地重命名为最终名称。此技术与屏蔽消费者无法拾取临时文件的过滤器一起,提供了一个可靠的解决方案。Spring Integration组件(本地或远程)写入文件时会使用此技术。默认情况下,它们会在文件名后附加.writing,并在传输完成后将其删除。

另一种常用技术是写入第二个“标记”文件以指示文件传输已完成。在这种情况下,您不应认为somefile.txt(例如)可用,除非somefile.txt.complete也存在。Spring Integration 5.0版引入了新的过滤器来支持此机制。提供了文件系统 (FileSystemMarkerFilePresentFileListFilter)、FTPSFTP的实现。它们是可配置的,因此标记文件可以具有任何名称,尽管它通常与正在传输的文件相关。有关更多信息,请参见Javadoc