高级元数据使用
到目前为止,我们已经讨论了JobLauncher
和JobRepository
接口。它们共同代表了作业的简单启动和批处理域对象的简单 CRUD 操作
JobLauncher
使用 JobRepository
创建新的 JobExecution
对象并运行它们。Job
和 Step
实现随后在运行 Job
期间使用相同的 JobRepository
对相同的执行进行基本更新。基本操作足以满足简单的场景。但是,在拥有数百个批处理作业和复杂调度要求的大型批处理环境中,需要更高级的元数据访问权限。
JobExplorer
和 JobOperator
接口(将在接下来的部分中讨论)为查询和控制元数据添加了额外的功能。
查询存储库
在任何高级功能之前,最基本的需求是能够查询存储库以查找现有执行。此功能由 JobExplorer
接口提供。
public interface JobExplorer {
List<JobInstance> getJobInstances(String jobName, int start, int count);
JobExecution getJobExecution(Long executionId);
StepExecution getStepExecution(Long jobExecutionId, Long stepExecutionId);
JobInstance getJobInstance(Long instanceId);
List<JobExecution> getJobExecutions(JobInstance jobInstance);
Set<JobExecution> findRunningJobExecutions(String jobName);
}
从其方法签名可以明显看出,JobExplorer
是 JobRepository
的只读版本,并且与 JobRepository
一样,它可以通过使用工厂 Bean 轻松配置。
-
Java
-
XML
以下示例展示了如何在 Java 中配置 JobExplorer
...
// This would reside in your DefaultBatchConfiguration extension
@Bean
public JobExplorer jobExplorer() throws Exception {
JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
factoryBean.setDataSource(this.dataSource);
return factoryBean.getObject();
}
...
以下示例展示了如何在 XML 中配置 JobExplorer
<bean id="jobExplorer" class="org.spr...JobExplorerFactoryBean"
p:dataSource-ref="dataSource" />
本章前面,我们注意到您可以修改 JobRepository
的表前缀以允许不同的版本或模式。由于 JobExplorer
使用相同的表,因此它也需要设置前缀的能力。
-
Java
-
XML
以下示例展示了如何在 Java 中为 JobExplorer
设置表前缀
...
// This would reside in your DefaultBatchConfiguration extension
@Bean
public JobExplorer jobExplorer() throws Exception {
JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
factoryBean.setDataSource(this.dataSource);
factoryBean.setTablePrefix("SYSTEM.");
return factoryBean.getObject();
}
...
以下示例展示了如何在 XML 中为 JobExplorer
设置表前缀
<bean id="jobExplorer" class="org.spr...JobExplorerFactoryBean"
p:tablePrefix="SYSTEM."/>
JobRegistry
JobRegistry
(及其父接口 JobLocator
)不是必需的,但如果您想跟踪上下文中有哪些作业可用,它会很有用。它对于在作业在其他地方创建(例如,在子上下文)时,将作业集中收集到应用程序上下文中也很有用。您还可以使用自定义 JobRegistry
实现来操作注册的作业的名称和其他属性。框架只提供了一个实现,它基于从作业名称到作业实例的简单映射。
-
Java
-
XML
使用 @EnableBatchProcessing
时,会为您提供 JobRegistry
。以下示例展示了如何配置您自己的 JobRegistry
...
// This is already provided via the @EnableBatchProcessing but can be customized via
// overriding the bean in the DefaultBatchConfiguration
@Override
@Bean
public JobRegistry jobRegistry() throws Exception {
return new MapJobRegistry();
}
...
以下示例展示了如何在 XML 中定义的作业中包含 JobRegistry
<bean id="jobRegistry" class="org.springframework.batch.core.configuration.support.MapJobRegistry" />
您可以通过以下几种方式填充 JobRegistry
:使用 Bean 后置处理器,或使用智能初始化单例,或使用注册器生命周期组件。接下来的部分将描述这些机制。
JobRegistryBeanPostProcessor
这是一个 Bean 后置处理器,它可以在创建所有作业时注册它们。
-
Java
-
XML
以下示例展示了如何在 Java 中定义的作业中包含 JobRegistryBeanPostProcessor
@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry) {
JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
postProcessor.setJobRegistry(jobRegistry);
return postProcessor;
}
以下示例展示了如何在 XML 中定义的作业中包含 JobRegistryBeanPostProcessor
<bean id="jobRegistryBeanPostProcessor" class="org.spr...JobRegistryBeanPostProcessor">
<property name="jobRegistry" ref="jobRegistry"/>
</bean>
虽然这不是严格必要的,但示例中的后置处理器被赋予了一个 id
,以便它可以包含在子上下文(例如,作为父 Bean 定义)中,并导致在那里创建的所有作业也被自动注册。
从 5.1 版本开始,@EnableBatchProcessing
注解会在应用程序上下文中自动注册一个 jobRegistryBeanPostProcessor
Bean。
JobRegistrySmartInitializingSingleton
这是一个 SmartInitializingSingleton
,它在作业注册表中注册所有单例作业。
-
Java
-
XML
以下示例展示了如何在 Java 中定义 JobRegistrySmartInitializingSingleton
@Bean
public JobRegistrySmartInitializingSingleton jobRegistrySmartInitializingSingleton(JobRegistry jobRegistry) {
return new JobRegistrySmartInitializingSingleton(jobRegistry);
}
以下示例展示了如何在 XML 中定义 JobRegistrySmartInitializingSingleton
<bean class="org.springframework.batch.core.configuration.support.JobRegistrySmartInitializingSingleton">
<property name="jobRegistry" ref="jobRegistry" />
</bean>
AutomaticJobRegistrar
这是一个生命周期组件,它创建子上下文并在创建时从这些上下文中注册作业。这样做的一个优点是,虽然子上下文中的作业名称在注册表中仍然必须是全局唯一的,但它们的依赖关系可以具有“自然”名称。因此,例如,您可以创建一组 XML 配置文件,每个文件只有一个作业,但它们都具有不同的 ItemReader
定义,并且具有相同的 Bean 名称,例如 reader
。如果所有这些文件都被导入到同一个上下文中,那么阅读器定义将发生冲突并相互覆盖,但是,使用自动注册器,可以避免这种情况。这使得集成来自应用程序不同模块的作业变得更容易。
-
Java
-
XML
以下示例展示了如何在 Java 中定义的作业中包含 AutomaticJobRegistrar
@Bean
public AutomaticJobRegistrar registrar() {
AutomaticJobRegistrar registrar = new AutomaticJobRegistrar();
registrar.setJobLoader(jobLoader());
registrar.setApplicationContextFactories(applicationContextFactories());
registrar.afterPropertiesSet();
return registrar;
}
以下示例展示了如何在 XML 中定义的作业中包含 AutomaticJobRegistrar
<bean class="org.spr...AutomaticJobRegistrar">
<property name="applicationContextFactories">
<bean class="org.spr...ClasspathXmlApplicationContextsFactoryBean">
<property name="resources" value="classpath*:/config/job*.xml" />
</bean>
</property>
<property name="jobLoader">
<bean class="org.spr...DefaultJobLoader">
<property name="jobRegistry" ref="jobRegistry" />
</bean>
</property>
</bean>
注册器有两个必填属性:一个 ApplicationContextFactory
数组(从前面的示例中的一个方便的工厂 Bean 创建)和一个 JobLoader
。JobLoader
负责管理子上下文的生命周期并在 JobRegistry
中注册作业。
ApplicationContextFactory
负责创建子上下文。最常见的用法是(如前面的示例)使用 ClassPathXmlApplicationContextFactory
。此工厂的一个特性是,默认情况下,它会将一些配置从父上下文复制到子上下文。因此,例如,您不需要在子上下文中重新定义 PropertyPlaceholderConfigurer
或 AOP 配置,前提是它应该与父上下文相同。
您可以将AutomaticJobRegistrar
与JobRegistryBeanPostProcessor
结合使用(只要您也使用DefaultJobLoader
)。例如,如果主父上下文和子位置中都定义了作业,则可能需要这样做。
JobOperator
如前所述,JobRepository
提供元数据的CRUD操作,而JobExplorer
提供元数据的只读操作。但是,这些操作在用于执行常见的监控任务(例如停止、重新启动或汇总作业)时最有用,这通常由批处理操作员完成。Spring Batch在JobOperator
接口中提供了这些类型的操作
public interface JobOperator {
List<Long> getExecutions(long instanceId) throws NoSuchJobInstanceException;
List<Long> getJobInstances(String jobName, int start, int count)
throws NoSuchJobException;
Set<Long> getRunningExecutions(String jobName) throws NoSuchJobException;
String getParameters(long executionId) throws NoSuchJobExecutionException;
Long start(String jobName, String parameters)
throws NoSuchJobException, JobInstanceAlreadyExistsException;
Long restart(long executionId)
throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException,
NoSuchJobException, JobRestartException;
Long startNextInstance(String jobName)
throws NoSuchJobException, JobParametersNotFoundException, JobRestartException,
JobExecutionAlreadyRunningException, JobInstanceAlreadyCompleteException;
boolean stop(long executionId)
throws NoSuchJobExecutionException, JobExecutionNotRunningException;
String getSummary(long executionId) throws NoSuchJobExecutionException;
Map<Long, String> getStepExecutionSummaries(long executionId)
throws NoSuchJobExecutionException;
Set<String> getJobNames();
}
前面的操作代表来自许多不同接口的方法,例如JobLauncher
、JobRepository
、JobExplorer
和JobRegistry
。因此,提供的JobOperator
实现(SimpleJobOperator
)具有许多依赖项。
-
Java
-
XML
以下示例显示了Java中SimpleJobOperator
的典型bean定义
/**
* All injected dependencies for this bean are provided by the @EnableBatchProcessing
* infrastructure out of the box.
*/
@Bean
public SimpleJobOperator jobOperator(JobExplorer jobExplorer,
JobRepository jobRepository,
JobRegistry jobRegistry,
JobLauncher jobLauncher) {
SimpleJobOperator jobOperator = new SimpleJobOperator();
jobOperator.setJobExplorer(jobExplorer);
jobOperator.setJobRepository(jobRepository);
jobOperator.setJobRegistry(jobRegistry);
jobOperator.setJobLauncher(jobLauncher);
return jobOperator;
}
以下示例显示了XML中SimpleJobOperator
的典型bean定义
<bean id="jobOperator" class="org.spr...SimpleJobOperator">
<property name="jobExplorer">
<bean class="org.spr...JobExplorerFactoryBean">
<property name="dataSource" ref="dataSource" />
</bean>
</property>
<property name="jobRepository" ref="jobRepository" />
<property name="jobRegistry" ref="jobRegistry" />
<property name="jobLauncher" ref="jobLauncher" />
</bean>
从版本5.0开始,@EnableBatchProcessing
注释会自动在应用程序上下文中注册一个作业操作员bean。
如果您在作业存储库上设置了表前缀,请不要忘记在作业资源管理器上也设置它。 |
JobParametersIncrementer
JobOperator
上的大多数方法是不言自明的,您可以在接口的Javadoc中找到更详细的解释。但是,startNextInstance
方法值得注意。此方法始终启动Job
的新实例。如果JobExecution
中存在严重问题,并且需要从头开始重新启动Job
,这将非常有用。与JobLauncher
(需要一个新的JobParameters
对象来触发新的JobInstance
)不同,如果参数与任何以前的参数集不同,则startNextInstance
方法使用与Job
绑定的JobParametersIncrementer
强制Job
进入新的实例
public interface JobParametersIncrementer {
JobParameters getNext(JobParameters parameters);
}
JobParametersIncrementer
的约定是,给定一个JobParameters对象,它通过递增其可能包含的任何必要值来返回“下一个”JobParameters
对象。此策略很有用,因为框架无法知道对JobParameters
的哪些更改使其成为“下一个”实例。例如,如果JobParameters
中唯一的价值是日期,并且应该创建下一个实例,那么该价值应该增加一天还是一周(例如,如果作业是每周一次)?对于任何有助于识别Job
的数值,也可以说相同的话,如下面的示例所示
public class SampleIncrementer implements JobParametersIncrementer {
public JobParameters getNext(JobParameters parameters) {
if (parameters==null || parameters.isEmpty()) {
return new JobParametersBuilder().addLong("run.id", 1L).toJobParameters();
}
long id = parameters.getLong("run.id",1L) + 1;
return new JobParametersBuilder().addLong("run.id", id).toJobParameters();
}
}
在此示例中,具有run.id
键的值用于区分JobInstances
。如果传入的JobParameters
为null,则可以假设该Job
以前从未运行过,因此可以返回其初始状态。但是,如果不是,则获取旧值,将其递增1,然后返回。
-
Java
-
XML
对于在 Java 中定义的作业,您可以通过构建器中提供的 incrementer
方法将增量器与 Job
关联,如下所示
@Bean
public Job footballJob(JobRepository jobRepository) {
return new JobBuilder("footballJob", jobRepository)
.incrementer(sampleIncrementer())
...
.build();
}
对于在 XML 中定义的作业,您可以通过命名空间中的 incrementer
属性将增量器与 Job
关联,如下所示
<job id="footballJob" incrementer="sampleIncrementer">
...
</job>
停止作业
JobOperator
最常见的用例之一是优雅地停止作业
Set<Long> executions = jobOperator.getRunningExecutions("sampleJob");
jobOperator.stop(executions.iterator().next());
关闭不是立即的,因为没有办法强制立即关闭,尤其是在执行当前处于框架无法控制的开发人员代码中时,例如业务服务。但是,一旦控制权返回到框架,它就会将当前 StepExecution
的状态设置为 BatchStatus.STOPPED
,保存它,并在完成之前对 JobExecution
执行相同的操作。
中止作业
状态为 FAILED
的作业执行可以重新启动(如果 Job
可重新启动)。状态为 ABANDONED
的作业执行无法由框架重新启动。ABANDONED
状态也用于步骤执行,以将它们标记为在重新启动的作业执行中可跳过。如果作业正在运行并遇到在先前失败的作业执行中标记为 ABANDONED
的步骤,它将继续执行到下一步(由作业流程定义和步骤执行退出状态确定)。
如果进程死亡(kill -9
或服务器故障),作业当然不会运行,但 JobRepository
无法知道,因为在进程死亡之前没有人告诉它。您必须手动告诉它,您知道执行已失败或应被视为中止(将其状态更改为 FAILED
或 ABANDONED
)。这是一个业务决策,无法自动化。仅当作业可重新启动且您知道重新启动数据有效时,才将状态更改为 FAILED
。