读取文件
可以使用 FileReadingMessageSource
从文件系统中读取文件。这是一个 MessageSource
的实现,它从文件系统目录创建消息。以下示例展示了如何配置 FileReadingMessageSource
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:directory="${input.directory}"/>
为了防止为某些文件创建消息,您可以提供一个 FileListFilter
。默认情况下,我们使用以下过滤器
-
IgnoreHiddenFileListFilter
-
AcceptOnceFileListFilter
IgnoreHiddenFileListFilter
确保不会处理隐藏文件。请注意,隐藏文件的定义是系统相关的。例如,在基于 UNIX 的系统上,以句点字符开头的文件被认为是隐藏的。另一方面,Microsoft Windows 具有专门的文件属性来指示隐藏文件。
版本 4.2 引入了 |
AcceptOnceFileListFilter
确保文件只从目录中读取一次。
从 4.0 版本开始,此过滤器需要一个 从 4.1.5 版本开始,此过滤器有一个新的属性( |
持久文件列表过滤器现在有一个布尔属性 forRecursion
。将此属性设置为 true
,也会设置 alwaysAcceptDirectories
,这意味着出站网关上的递归操作(ls
和 mget
)现在将始终在每次遍历完整的目录树。这是为了解决目录树深处发生的变化未被检测到的问题。此外,forRecursion=true
会导致使用文件的完整路径作为元数据存储键;这解决了过滤器在同一目录中出现多个同名文件时无法正常工作的问题。重要提示:这意味着持久元数据存储中现有的键将无法在顶层目录下的文件中找到。出于这个原因,该属性默认情况下为 false
;这可能会在将来的版本中更改。
以下示例配置了一个带有过滤器的 FileReadingMessageSource
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="customFilterBean"/>
读取文件的一个常见问题是,文件可能在准备就绪之前就被检测到(也就是说,其他进程可能仍在写入文件)。默认的 AcceptOnceFileListFilter
无法阻止这种情况。在大多数情况下,如果文件写入进程在文件准备好读取后立即重命名每个文件,就可以防止这种情况。一个 filename-pattern
或 filename-regex
过滤器,它只接受已准备好的文件(可能基于已知的后缀),与默认的 AcceptOnceFileListFilter
组合,允许这种情况。CompositeFileListFilter
允许组合,如下面的示例所示
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="compositeFilter"/>
<bean id="compositeFilter"
class="org.springframework.integration.file.filters.CompositeFileListFilter">
<constructor-arg>
<list>
<bean class="o.s.i.file.filters.AcceptOnceFileListFilter"/>
<bean class="o.s.i.file.filters.RegexPatternFileListFilter">
<constructor-arg value="^test.*$"/>
</bean>
</list>
</constructor-arg>
</bean>
如果无法使用临时名称创建文件并重命名为最终名称,Spring Integration 提供了另一种选择。4.2 版本添加了 LastModifiedFileListFilter
。此过滤器可以配置一个 age
属性,以便只传递比此值更旧的文件。年龄默认为 60 秒,但您应该选择一个足够大的年龄,以避免过早地获取文件(例如,由于网络故障)。以下示例展示了如何配置 LastModifiedFileListFilter
<bean id="filter" class="org.springframework.integration.file.filters.LastModifiedFileListFilter">
<property name="age" value="120" />
</bean>
从 4.3.7 版本开始,引入了 `ChainFileListFilter`(`CompositeFileListFilter` 的扩展),用于在后续过滤器仅需查看前一个过滤器的结果的情况下。 (在 `CompositeFileListFilter` 中,所有过滤器都能看到所有文件,但它只传递通过所有过滤器的文件)。新行为所需的示例是 `LastModifiedFileListFilter` 和 `AcceptOnceFileListFilter` 的组合,当我们不希望在经过一段时间后才接受文件时。使用 `CompositeFileListFilter`,由于 `AcceptOnceFileListFilter` 在第一次遍历时会看到所有文件,因此它在其他过滤器通过时不会再传递它。`CompositeFileListFilter` 方法在模式过滤器与查找辅助文件以指示文件传输完成的自定义过滤器组合时很有用。模式过滤器可能只传递主文件(例如 `something.txt`),但“完成”过滤器需要查看(例如)`something.done` 是否存在。
假设我们有文件 `a.txt`、`a.done` 和 `b.txt`。
模式过滤器只传递 `a.txt` 和 `b.txt`,而“完成”过滤器看到所有三个文件,只传递 `a.txt`。组合过滤器的最终结果是只释放 `a.txt`。
使用 `ChainFileListFilter`,如果链中的任何过滤器返回空列表,则不会调用剩余的过滤器。 |
5.0 版本引入了 `ExpressionFileListFilter`,用于针对文件作为上下文评估根对象执行 SpEL 表达式。为此,所有用于文件处理(本地和远程)的 XML 组件以及现有的 `filter` 属性都提供了 `filter-expression` 选项,如下例所示
<int-file:inbound-channel-adapter
directory="${inputdir}"
filter-expression="name matches '.text'"
auto-startup="false"/>
版本 5.0.5 引入了 DiscardAwareFileListFilter
实现,这些实现对被拒绝的文件感兴趣。为此,应通过 addDiscardCallback(Consumer<File>)
为此类过滤器实现提供回调。在框架中,此功能来自 FileReadingMessageSource.WatchServiceDirectoryScanner
,与 LastModifiedFileListFilter
结合使用。与常规的 DirectoryScanner
不同,WatchService
根据目标文件系统上的事件提供要处理的文件。在轮询包含这些文件的内部队列时,LastModifiedFileListFilter
可能会丢弃它们,因为它们相对于其配置的 age
太年轻。因此,我们丢失了该文件以供将来可能的考虑。丢弃回调钩子使我们能够将文件保留在内部队列中,以便它在后续轮询中可用于检查 age
。CompositeFileListFilter
也实现了 DiscardAwareFileListFilter
,并为其所有 DiscardAwareFileListFilter
代理填充丢弃回调。
由于 CompositeFileListFilter 会根据所有代理匹配文件,因此 discardCallback 可能会对同一个文件调用多次。
|
从版本 5.1 开始,FileReadingMessageSource
不会检查目录是否存在,也不会在调用其 start()
(通常通过包装 SourcePollingChannelAdapter
)之前创建它。以前,没有简单的方法可以防止在引用目录时出现操作系统权限错误,例如来自测试,或者在以后应用权限时。
消息头
从版本 5.0 开始,FileReadingMessageSource
(除了作为轮询的 File
的 payload
之外)还会将以下头填充到出站 Message
中
-
FileHeaders.FILENAME
:要发送的文件的File.getName()
。可用于后续重命名或复制逻辑。 -
FileHeaders.ORIGINAL_FILE
:File
对象本身。通常,此头由框架组件(例如 拆分器 或 转换器)在丢失原始File
对象时自动填充。但是,为了与任何其他自定义用例保持一致和方便,此头对于访问原始文件可能很有用。 -
FileHeaders.RELATIVE_PATH
: 一个新引入的头部,用于表示文件路径相对于扫描根目录的相对部分。当需要在其他地方恢复源目录层次结构时,此头部非常有用。为此,可以将DefaultFileNameGenerator
(参见“`生成文件名`”)配置为使用此头部。
目录扫描和轮询
FileReadingMessageSource
不会立即为目录中的文件生成消息。它使用一个内部队列来存储由scanner
返回的“合格文件”。scanEachPoll
选项用于确保在每次轮询时,内部队列都会使用最新的输入目录内容进行刷新。默认情况下(scanEachPoll = false
),FileReadingMessageSource
会在再次扫描目录之前清空其队列。这种默认行为对于减少对目录中大量文件的扫描非常有用。但是,在需要自定义排序的情况下,务必考虑将此标志设置为true
的影响。文件处理的顺序可能与预期不符。默认情况下,队列中的文件按其自然(path
)顺序处理。扫描添加的新文件,即使队列中已经存在文件,也会插入到适当的位置以保持该自然顺序。要自定义顺序,FileReadingMessageSource
可以接受一个Comparator<File>
作为构造函数参数。它由内部(PriorityBlockingQueue
)使用,根据业务需求重新排序其内容。因此,要按特定顺序处理文件,您应该向FileReadingMessageSource
提供一个比较器,而不是对自定义DirectoryScanner
生成的列表进行排序。
版本 5.0 引入了RecursiveDirectoryScanner
来执行文件树访问。该实现基于Files.walk(Path start, int maxDepth, FileVisitOption… options)
功能。根目录(DirectoryScanner.listFiles(File)
)参数将从结果中排除。所有其他子目录的包含和排除都基于目标FileListFilter
实现。例如,SimplePatternFileListFilter
默认情况下会过滤掉目录。有关更多信息,请参见AbstractDirectoryAwareFileListFilter
及其实现。
从 5.5 版本开始,Java DSL 的 FileInboundChannelAdapterSpec 提供了一个方便的 recursive(boolean) 选项,可以在目标 FileReadingMessageSource 中使用 RecursiveDirectoryScanner ,而不是默认的扫描器。
|
命名空间支持
可以使用特定于文件的命名空间简化文件读取的配置。为此,请使用以下模板
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
https://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
</beans>
在这个命名空间内,可以减少 FileReadingMessageSource
的使用,并将其包装在入站通道适配器中,如下所示
<int-file:inbound-channel-adapter id="filesIn1"
directory="file:${input.directory}" prevent-duplicates="true" ignore-hidden="true"/>
<int-file:inbound-channel-adapter id="filesIn2"
directory="file:${input.directory}"
filter="customFilterBean" />
<int-file:inbound-channel-adapter id="filesIn3"
directory="file:${input.directory}"
filename-pattern="test*" />
<int-file:inbound-channel-adapter id="filesIn4"
directory="file:${input.directory}"
filename-regex="test[0-9]+\.txt" />
第一个通道适配器示例依赖于默认的 FileListFilter
实现
-
IgnoreHiddenFileListFilter
(不处理隐藏文件) -
AcceptOnceFileListFilter
(防止重复)
因此,也可以省略 prevent-duplicates
和 ignore-hidden
属性,因为它们默认情况下为 true
。
Spring Integration 4.2 引入了 |
第二个通道适配器示例使用自定义过滤器,第三个使用 filename-pattern
属性添加基于 AntPathMatcher
的过滤器,第四个使用 filename-regex
属性添加基于正则表达式模式的过滤器到 FileReadingMessageSource
。filename-pattern
和 filename-regex
属性是相互排斥的,不能同时使用。但是,可以使用 filter
属性引用 CompositeFileListFilter
的实例,该实例可以组合任意数量的过滤器,包括一个或多个基于模式的过滤器,以满足您的特定需求。
当多个进程从同一个目录读取文件时,您可能希望锁定文件以防止它们被同时读取。为此,可以使用 FileLocker
。有一个基于 java.nio
的实现可用,但也可以实现自己的锁定方案。nio
锁定器可以像下面这样注入
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:nio-locker/>
</int-file:inbound-channel-adapter>
可以像下面这样配置自定义锁定器
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:locker ref="customLocker"/>
</int-file:inbound-channel-adapter>
当文件入站适配器配置了锁定器时,它负责在允许接收文件之前获取锁。它不负责解锁文件。如果您已经处理了文件并且锁仍然存在,就会出现内存泄漏。如果这是一个问题,您应该在适当的时候自己调用 FileLocker.unlock(File file) 。
|
当过滤和锁定文件不足以满足需求时,您可能需要完全控制文件的列出方式。为了实现这种需求,您可以使用 DirectoryScanner
的实现。这个扫描器可以让您精确地确定每次轮询中列出的文件。这也是 Spring Integration 在内部使用来连接 FileListFilter
实例和 FileLocker
到 FileReadingMessageSource
的接口。您可以将自定义的 DirectoryScanner
注入到 <int-file:inbound-channel-adapter/>
的 scanner
属性中,如下例所示
<int-file:inbound-channel-adapter id="filesIn" directory="file:${input.directory}"
scanner="customDirectoryScanner"/>
这样做可以让您自由选择排序、列出和锁定策略。
同样重要的是要理解,过滤器(包括 patterns
、regex
、prevent-duplicates
等)和 locker
实例实际上是由 scanner
使用的。适配器上设置的任何这些属性随后都会被注入到内部的 scanner
中。对于外部 scanner
的情况,FileReadingMessageSource
上禁止使用所有过滤器和锁定属性。它们必须在自定义的 DirectoryScanner
上指定(如果需要)。换句话说,如果您将 scanner
注入到 FileReadingMessageSource
中,您应该在 scanner
上提供 filter
和 locker
,而不是在 FileReadingMessageSource
上。
默认情况下,DefaultDirectoryScanner 使用 IgnoreHiddenFileListFilter 和 AcceptOnceFileListFilter 。为了防止使用它们,您可以配置自己的过滤器(例如 AcceptAllFileListFilter )甚至将其设置为 null 。
|
WatchServiceDirectoryScanner
FileReadingMessageSource.WatchServiceDirectoryScanner
依赖于文件系统事件,当新文件被添加到目录时会触发这些事件。在初始化期间,目录被注册以生成事件。初始文件列表也在初始化期间构建。在遍历目录树时,遇到的任何子目录也会被注册以生成事件。在第一次轮询时,返回从遍历目录得到的初始文件列表。在随后的轮询中,返回来自新创建事件的文件。如果添加了新的子目录,则使用其创建事件来遍历新的子树以查找现有文件并注册找到的任何新子目录。
当 WatchKey 的内部事件 queue 没有被程序尽快清空,而目录修改事件却持续发生时,就会出现一个问题。如果队列大小超过了限制,就会发出 StandardWatchEventKinds.OVERFLOW 来指示一些文件系统事件可能丢失。在这种情况下,根目录将被完全重新扫描。为了避免重复,请考虑使用适当的 FileListFilter (例如 AcceptOnceFileListFilter )或在处理完成后删除文件。
|
可以通过FileReadingMessageSource.use-watch-service
选项启用WatchServiceDirectoryScanner
,该选项与scanner
选项互斥。将为提供的directory
填充内部FileReadingMessageSource.WatchServiceDirectoryScanner
实例。
此外,现在WatchService
轮询逻辑可以跟踪StandardWatchEventKinds.ENTRY_MODIFY
和StandardWatchEventKinds.ENTRY_DELETE
。
如果您需要跟踪现有文件的修改以及新文件,则应在FileListFilter
中实现ENTRY_MODIFY
事件逻辑。否则,来自这些事件的文件将以相同的方式处理。
ResettableFileListFilter
实现会接收ENTRY_DELETE
事件。因此,它们的将为remove()
操作提供文件。启用此事件时,诸如AcceptOnceFileListFilter
之类的过滤器将删除该文件。结果,如果出现同名文件,它将通过过滤器并作为消息发送。
为此,引入了watch-events
属性(FileReadingMessageSource.setWatchEvents(WatchEventType… watchEvents)
)。(WatchEventType
是FileReadingMessageSource
中的公共内部枚举。)使用此选项,我们可以对新文件使用一个下游流逻辑,对修改后的文件使用其他逻辑。以下示例展示了如何在同一目录中为创建和修改事件配置不同的逻辑
值得一提的是,ENTRY_DELETE
事件参与了监视目录的子目录的重命名操作。更具体地说,与先前目录名称相关的ENTRY_DELETE
事件先于ENTRY_CREATE
事件,该事件通知有关新(重命名)目录的信息。在某些操作系统(如 Windows)上,必须注册ENTRY_DELETE
事件才能处理这种情况。否则,在文件资源管理器中重命名监视的子目录可能会导致该子目录中的新文件未被检测到。
<int-file:inbound-channel-adapter id="newFiles"
directory="${input.directory}"
use-watch-service="true"/>
<int-file:inbound-channel-adapter id="modifiedFiles"
directory="${input.directory}"
use-watch-service="true"
filter="acceptAllFilter"
watch-events="MODIFY"/> <!-- The default is CREATE. -->
从 6.1 版开始,FileReadingMessageSource
公开了两个新的与 WatchService
相关的选项
-
watchMaxDepth
-Files.walkFileTree(Path root, Set attributes, int maxDepth, FileVisitor visitor)
API 的参数; -
watchDirPredicate
- 一个Predicate<Path>
,用于测试扫描树中的目录是否应该被遍历并使用WatchService
和配置的监视事件类型进行注册。
限制内存消耗
您可以使用HeadDirectoryScanner
来限制内存中保留的文件数量。这在扫描大型目录时非常有用。使用 XML 配置时,可以通过在入站通道适配器上设置queue-size
属性来启用此功能。
在 4.2 版之前,此设置与使用任何其他过滤器不兼容。任何其他过滤器(包括prevent-duplicates="true"
)都会覆盖用于限制大小的过滤器。
使用 通常,在这种情况下,您应该删除已处理的文件,以便以前过滤的文件在将来的轮询中可用,而不是使用 |
使用 Java 配置进行配置
以下 Spring Boot 应用程序展示了如何使用 Java 配置配置出站适配器的示例
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public MessageChannel fileInputChannel() {
return new DirectChannel();
}
@Bean
@InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource source = new FileReadingMessageSource();
source.setDirectory(new File(INBOUND_PATH));
source.setFilter(new SimplePatternFileListFilter("*.txt"));
return source;
}
@Bean
@Transformer(inputChannel = "fileInputChannel", outputChannel = "processFileChannel")
public FileToStringTransformer fileToStringTransformer() {
return new FileToStringTransformer();
}
}
使用 Java DSL 进行配置
以下 Spring Boot 应用程序展示了如何使用 Java DSL 配置出站适配器的示例
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileReadingFlow() {
return IntegrationFlow
.from(Files.inboundAdapter(new File(INBOUND_PATH))
.patternFilter("*.txt"),
e -> e.poller(Pollers.fixedDelay(1000)))
.transform(Files.toStringTransformer())
.channel("processFileChannel")
.get();
}
}
“尾部”跟踪文件
另一个常见的用例是从文件的末尾(或尾部)获取“行”,在添加新行时捕获它们。提供了两种实现。第一个,OSDelegatingFileTailingMessageProducer
,使用本机tail
命令(在具有该命令的操作系统上)。这通常是这些平台上最有效的实现。对于没有tail
命令的操作系统,第二个实现ApacheCommonsFileTailingMessageProducer
使用 Apache 的commons-io
Tailer
类。
在这两种情况下,文件系统事件(例如文件不可用和其他事件)都作为ApplicationEvent
实例发布,使用正常的 Spring 事件发布机制。此类事件的示例包括以下内容
[message=tail: cannot open '/tmp/somefile' for reading:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become accessible, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become inaccessible:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has appeared;
following end of new file, file=/tmp/somefile]
例如,当文件被轮换时,可能会发生前面示例中显示的事件序列。
从 5.0 版开始,当文件在idleEventInterval
期间没有数据时,会发出FileTailingIdleEvent
。以下示例显示了此类事件的外观
[message=Idle timeout, file=/tmp/somefile] [idle time=5438]
并非所有支持tail 命令的平台都提供这些状态消息。
|
从这些端点发出的消息具有以下标头
-
FileHeaders.ORIGINAL_FILE
:File
对象 -
FileHeaders.FILENAME
: 文件名(File.getName()
)
在 5.0 版本之前,FileHeaders.FILENAME 标头包含文件绝对路径的字符串表示形式。现在,您可以通过调用原始文件标头的getAbsolutePath() 方法来获取该字符串表示形式。
|
以下示例使用默认选项('-F -n 0',表示从当前末尾跟踪文件名)创建一个本机适配器。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
task-executor="exec"
file="/tmp/foo"/>
以下示例使用 '-F -n +0' 选项(表示跟踪文件名,发出所有现有行)创建一个本机适配器。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
native-options="-F -n +0"
task-executor="exec"
file-delay=10000
file="/tmp/foo"/>
如果tail
命令失败(在某些平台上,即使指定了-F
,缺少文件也会导致tail
失败),则每 10 秒重试一次命令。
默认情况下,本机适配器从标准输出捕获并以消息形式发送内容。它们还从标准错误捕获以引发事件。从 4.3.6 版本开始,您可以通过将enable-status-reader
设置为false
来丢弃标准错误事件,如下例所示
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
enable-status-reader="false"
task-executor="exec"
file="/tmp/foo"/>
在以下示例中,IdleEventInterval
设置为5000
,这意味着如果五秒钟内没有写入任何行,则每五秒钟触发一次FileTailingIdleEvent
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
idle-event-interval="5000"
task-executor="exec"
file="/tmp/somefile"/>
这在您需要停止适配器时很有用。
以下示例创建一个 Apache commons-io
Tailer
适配器,该适配器每两秒检查文件是否有新行,并每十秒检查一次丢失的文件是否存在
<int-file:tail-inbound-channel-adapter id="apache"
channel="input"
task-executor="exec"
file="/tmp/bar"
delay="2000"
end="false" (1)
reopen="true" (2)
file-delay="10000"/>
1 | 从开头(end="false" )跟踪文件,而不是从结尾(默认值)。 |
2 | 为每个块重新打开文件(默认情况下保持文件打开)。 |
指定delay 、end 或reopen 属性会强制使用 Apache commons-io 适配器,并使native-options 属性不可用。
|