通过消息启动批处理作业
通过核心 Spring Batch API 启动批处理作业时,您基本上有两种选择
-
从命令行,使用
CommandLineJobOperator -
通过编程方式,使用
JobOperator.start()
例如,当通过 shell 脚本调用批处理作业时,您可能希望使用 CommandLineJobOperator。或者,您可以直接使用 JobOperator(例如,当 Spring Batch 作为 Web 应用程序的一部分使用时)。但是,更复杂的用例呢?也许您需要轮询远程 (S)FTP 服务器以检索批处理作业的数据,或者您的应用程序必须同时支持多个不同的数据源。例如,您可能不仅从 Web 接收数据文件,还从 FTP 和其他来源接收数据文件。在调用 Spring Batch 之前,可能还需要对输入文件进行额外的转换。
因此,使用 Spring Integration 及其众多适配器执行批处理作业会更加强大。例如,您可以使用文件入站通道适配器来监控文件系统中的目录,并在输入文件到达时立即启动批处理作业。此外,您可以创建使用多个不同适配器的 Spring Integration 流,仅通过配置即可轻松地同时从多个源获取批处理作业的数据。使用 Spring Integration 实现所有这些场景非常容易,因为它允许解耦、事件驱动地执行 JobOperator。
Spring Batch Integration 提供了 JobLaunchingMessageHandler 类,您可以使用它来启动批处理作业。JobLaunchingMessageHandler 的输入由 Spring Integration 消息提供,该消息的有效负载类型为 JobLaunchRequest。此类是对要启动的 Job 和启动批处理作业所需的 JobParameters 的包装。
下图显示了启动批处理作业所需的典型 Spring Integration 消息流。EIP (Enterprise Integration Patterns) 网站 提供了消息图标及其描述的完整概述。
将文件转换为 JobLaunchRequest
以下示例将文件转换为 JobLaunchRequest
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;
import java.io.File;
public class FileMessageToJobRequest {
private Job job;
private String fileParameterName;
public void setFileParameterName(String fileParameterName) {
this.fileParameterName = fileParameterName;
}
public void setJob(Job job) {
this.job = job;
}
@Transformer
public JobLaunchRequest toRequest(Message<File> message) {
JobParametersBuilder jobParametersBuilder =
new JobParametersBuilder();
jobParametersBuilder.addString(fileParameterName,
message.getPayload().getAbsolutePath());
return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
}
}
JobExecution 响应
当批处理作业正在执行时,会返回一个 JobExecution 实例。您可以使用此实例来确定执行的状态。如果 JobExecution 能够成功创建,无论实际执行是否成功,它总是会返回。
返回 JobExecution 实例的确切行为取决于所提供的 TaskExecutor。如果使用 同步(单线程)TaskExecutor 实现,则 JobExecution 响应仅在作业完成后返回。当使用 异步 TaskExecutor 时,JobExecution 实例会立即返回。然后,您可以获取 JobExecution 实例的 id(使用 JobExecution.getJobInstanceId()),并使用 JobExplorer 查询 JobRepository 以获取作业的更新状态。有关更多信息,请参阅 查询 Repository。
Spring Batch Integration 配置
考虑这样一种情况:需要创建文件 inbound-channel-adapter 以监听指定目录中的 CSV 文件,将其交给转换器(FileMessageToJobRequest),通过作业启动网关启动作业,并使用 logging-channel-adapter 记录 JobExecution 的输出。
-
Java
-
XML
以下示例显示了如何在 Java 中配置这种常见情况
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
fileMessageToJobRequest.setFileParameterName("input.file.name");
fileMessageToJobRequest.setJob(personJob());
return fileMessageToJobRequest;
}
@Bean
public JobLaunchingGateway jobLaunchingGateway() {
TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SyncTaskExecutor());
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher);
return jobLaunchingGateway;
}
@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
return IntegrationFlow.from(Files.inboundAdapter(new File("/tmp/myfiles")).
filter(new SimplePatternFileListFilter("*.csv")),
c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
transform(fileMessageToJobRequest()).
handle(jobLaunchingGateway).
log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
get();
}
以下示例显示了如何在 XML 中配置这种常见情况
<int:channel id="inboundFileChannel"/>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>
<int-file:inbound-channel-adapter id="filePoller"
channel="inboundFileChannel"
directory="file:/tmp/myfiles/"
filename-pattern="*.csv">
<int:poller fixed-rate="1000"/>
</int-file:inbound-channel-adapter>
<int:transformer input-channel="inboundFileChannel"
output-channel="outboundJobRequestChannel">
<bean class="io.spring.sbi.FileMessageToJobRequest">
<property name="job" ref="personJob"/>
<property name="fileParameterName" value="input.file.name"/>
</bean>
</int:transformer>
<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
reply-channel="jobLaunchReplyChannel"/>
<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>
ItemReader 配置示例
现在我们正在轮询文件并启动作业,我们需要配置我们的 Spring Batch ItemReader(例如)以使用作业参数“input.file.name”定义的位置找到的文件,如以下 bean 配置所示
-
Java
-
XML
以下 Java 示例显示了必要的 bean 配置
@Bean
@StepScope
public ItemReader sampleReader(@Value("#{jobParameters[input.file.name]}") String resource) {
...
FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
flatFileItemReader.setResource(new FileSystemResource(resource));
...
return flatFileItemReader;
}
以下 XML 示例显示了必要的 bean 配置
<bean id="itemReader" class="org.springframework.batch.infrastructure.item.file.FlatFileItemReader"
scope="step">
<property name="resource" value="file://#{jobParameters['input.file.name']}"/>
...
</bean>
前面示例中主要值得注意的地方是将 #{jobParameters['input.file.name']} 的值注入为 Resource 属性值,并将 ItemReader bean 设置为 step 作用域。将 bean 设置为 step 作用域利用了后期绑定支持,允许访问 jobParameters 变量。