高级元数据使用

到目前为止,我们已经讨论了 JobLauncherJobRepository 接口。它们一起表示作业的简单启动和批处理领域对象的 CRUD 基本操作。

Job Repository
图 1. 作业存储库

JobLauncher 使用 JobRepository 创建新的 JobExecution 对象并运行它们。JobStep 实现随后在运行 Job 期间使用相同的 JobRepository 对相同的执行进行基本更新。对于简单的场景,基本操作就足够了。但是,在拥有数百个批处理作业和复杂调度需求的大型批处理环境中,需要更高级的元数据访问。

Job Repository Advanced
图 2. 高级作业存储库访问

接下来几节将讨论 JobExplorerJobOperator 接口,它们增加了查询和控制元数据的附加功能。

查询存储库

在任何高级功能之前,最基本的需求是能够查询存储库中现有的执行。此功能由 JobExplorer 接口提供。

public interface JobExplorer {

    List<JobInstance> getJobInstances(String jobName, int start, int count);

    JobExecution getJobExecution(Long executionId);

    StepExecution getStepExecution(Long jobExecutionId, Long stepExecutionId);

    JobInstance getJobInstance(Long instanceId);

    List<JobExecution> getJobExecutions(JobInstance jobInstance);

    Set<JobExecution> findRunningJobExecutions(String jobName);
}

从其方法签名可以看出,JobExplorerJobRepository 的只读版本,并且与 JobRepository 一样,它可以通过使用工厂 bean 来轻松配置。

  • Java

  • XML

以下示例显示如何在 Java 中配置 JobExplorer

Java 配置
...
// This would reside in your DefaultBatchConfiguration extension
@Bean
public JobExplorer jobExplorer() throws Exception {
	JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
	factoryBean.setDataSource(this.dataSource);
	return factoryBean.getObject();
}
...

以下示例显示如何在 XML 中配置 JobExplorer

XML 配置
<bean id="jobExplorer" class="org.spr...JobExplorerFactoryBean"
      p:dataSource-ref="dataSource" />

本章前面,我们注意到可以修改 JobRepository 的表前缀以允许不同的版本或模式。由于 JobExplorer 使用相同的表,因此它也需要设置前缀的能力。

  • Java

  • XML

以下示例显示如何在 Java 中设置 JobExplorer 的表前缀

Java 配置
...
// This would reside in your DefaultBatchConfiguration extension
@Bean
public JobExplorer jobExplorer() throws Exception {
	JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
	factoryBean.setDataSource(this.dataSource);
	factoryBean.setTablePrefix("SYSTEM.");
	return factoryBean.getObject();
}
...

以下示例显示如何在 XML 中设置 JobExplorer 的表前缀

XML 配置
<bean id="jobExplorer" class="org.spr...JobExplorerFactoryBean"
		p:tablePrefix="SYSTEM."/>

JobRegistry

JobRegistry(及其父接口 JobLocator)不是强制性的,但如果您想跟踪上下文中有哪些作业可用,它会很有用。当作业在其他地方创建(例如,在子上下文中)时,它也可用于集中收集应用程序上下文中的作业。您还可以使用自定义 JobRegistry 实现来操作已注册作业的名称和其他属性。框架只提供了一个实现,它基于从作业名称到作业实例的简单映射。

  • Java

  • XML

使用 @EnableBatchProcessing 时,会为您提供一个 JobRegistry。以下示例显示如何配置您自己的 JobRegistry

...
// This is already provided via the @EnableBatchProcessing but can be customized via
// overriding the bean in the DefaultBatchConfiguration
@Override
@Bean
public JobRegistry jobRegistry() throws Exception {
	return new MapJobRegistry();
}
...

以下示例显示如何为在 XML 中定义的作业包含 JobRegistry

<bean id="jobRegistry" class="org.springframework.batch.core.configuration.support.MapJobRegistry" />

您可以通过以下方式之一填充 JobRegistry:使用 bean 后处理器,或使用智能初始化单例,或使用注册器生命周期组件。接下来的部分将描述这些机制。

JobRegistryBeanPostProcessor

这是一个 bean 后处理器,可以在创建所有作业时注册它们。

  • Java

  • XML

以下示例显示如何为在 Java 中定义的作业包含 JobRegistryBeanPostProcessor

Java 配置
@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry) {
    JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
    postProcessor.setJobRegistry(jobRegistry);
    return postProcessor;
}

以下示例显示如何为在 XML 中定义的作业包含 JobRegistryBeanPostProcessor

XML 配置
<bean id="jobRegistryBeanPostProcessor" class="org.spr...JobRegistryBeanPostProcessor">
    <property name="jobRegistry" ref="jobRegistry"/>
</bean>

虽然不是严格必需的,但示例中的后处理器已赋予 id,以便它可以包含在子上下文(例如,作为父 bean 定义)中,并导致在那里创建的所有作业也自动注册。

从 5.1 版本开始,@EnableBatchProcessing 注解会在应用程序上下文中自动注册 jobRegistryBeanPostProcessor bean。

JobRegistrySmartInitializingSingleton

这是一个 SmartInitializingSingleton,它在作业注册表中注册所有单例作业。

  • Java

  • XML

以下示例显示如何在 Java 中定义 JobRegistrySmartInitializingSingleton

Java 配置
@Bean
public JobRegistrySmartInitializingSingleton jobRegistrySmartInitializingSingleton(JobRegistry jobRegistry) {
    return new JobRegistrySmartInitializingSingleton(jobRegistry);
}

以下示例显示如何在 XML 中定义 JobRegistrySmartInitializingSingleton

XML 配置
<bean class="org.springframework.batch.core.configuration.support.JobRegistrySmartInitializingSingleton">
    <property name="jobRegistry" ref="jobRegistry" />
</bean>

AutomaticJobRegistrar

这是一个生命周期组件,它创建子上下文并在创建这些上下文中的作业时注册这些作业。这样做的一个优点是,虽然子上下文中的作业名称仍然必须在注册表中全局唯一,但它们的依赖项可以具有“自然”名称。因此,例如,您可以创建一组 XML 配置文件,每个文件只有一个作业,但所有文件都对具有相同 bean 名称(例如 reader)的 ItemReader 具有不同的定义。如果所有这些文件都被导入到同一个上下文中,读取器定义将会冲突并相互覆盖,但是,使用自动注册器,可以避免这种情况。这使得集成来自应用程序不同模块的作业更容易。

  • Java

  • XML

以下示例显示如何为在 Java 中定义的作业包含 AutomaticJobRegistrar

Java 配置
@Bean
public AutomaticJobRegistrar registrar() {

    AutomaticJobRegistrar registrar = new AutomaticJobRegistrar();
    registrar.setJobLoader(jobLoader());
    registrar.setApplicationContextFactories(applicationContextFactories());
    registrar.afterPropertiesSet();
    return registrar;

}

以下示例显示如何为在 XML 中定义的作业包含 AutomaticJobRegistrar

XML 配置
<bean class="org.spr...AutomaticJobRegistrar">
   <property name="applicationContextFactories">
      <bean class="org.spr...ClasspathXmlApplicationContextsFactoryBean">
         <property name="resources" value="classpath*:/config/job*.xml" />
      </bean>
   </property>
   <property name="jobLoader">
      <bean class="org.spr...DefaultJobLoader">
         <property name="jobRegistry" ref="jobRegistry" />
      </bean>
   </property>
</bean>

注册器有两个强制属性:一个 ApplicationContextFactory 数组(在前面的示例中由方便的工厂 bean 创建)和一个 JobLoaderJobLoader 负责管理子上下文的生命周期并在 JobRegistry 中注册作业。

ApplicationContextFactory 负责创建子上下文。最常见的用法(如前面的示例所示)是使用ClassPathXmlApplicationContextFactory。此工厂的一个特性是,默认情况下,它会将一些配置从父上下文复制到子上下文。因此,例如,如果子上下文中的配置应该与父上下文相同,则无需在子上下文中重新定义PropertyPlaceholderConfigurer或AOP配置。

您可以将AutomaticJobRegistrarJobRegistryBeanPostProcessor结合使用(只要您也使用DefaultJobLoader)。例如,如果主父上下文和子位置中都定义了作业,则可能需要这样做。

JobOperator

如前所述,JobRepository提供元数据的CRUD操作,JobExplorer提供元数据的只读操作。但是,这些操作在组合使用以执行常见的监控任务(例如停止、重启或汇总作业,这通常由批处理操作员完成)时最为有用。Spring Batch 在JobOperator接口中提供了这些类型的操作。

public interface JobOperator {

    List<Long> getExecutions(long instanceId) throws NoSuchJobInstanceException;

    List<Long> getJobInstances(String jobName, int start, int count)
          throws NoSuchJobException;

    Set<Long> getRunningExecutions(String jobName) throws NoSuchJobException;

    String getParameters(long executionId) throws NoSuchJobExecutionException;

    Long start(String jobName, String parameters)
          throws NoSuchJobException, JobInstanceAlreadyExistsException;

    Long restart(long executionId)
          throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException,
                  NoSuchJobException, JobRestartException;

    Long startNextInstance(String jobName)
          throws NoSuchJobException, JobParametersNotFoundException, JobRestartException,
                 JobExecutionAlreadyRunningException, JobInstanceAlreadyCompleteException;

    boolean stop(long executionId)
          throws NoSuchJobExecutionException, JobExecutionNotRunningException;

    String getSummary(long executionId) throws NoSuchJobExecutionException;

    Map<Long, String> getStepExecutionSummaries(long executionId)
          throws NoSuchJobExecutionException;

    Set<String> getJobNames();

}

前面的操作表示来自许多不同接口的方法,例如JobLauncherJobRepositoryJobExplorerJobRegistry。因此,提供的JobOperator实现(SimpleJobOperator)具有许多依赖项。

  • Java

  • XML

以下示例显示了Java中SimpleJobOperator的典型bean定义。

 /**
  * All injected dependencies for this bean are provided by the @EnableBatchProcessing
  * infrastructure out of the box.
  */
 @Bean
 public SimpleJobOperator jobOperator(JobExplorer jobExplorer,
                                JobRepository jobRepository,
                                JobRegistry jobRegistry,
                                JobLauncher jobLauncher) {

	SimpleJobOperator jobOperator = new SimpleJobOperator();
	jobOperator.setJobExplorer(jobExplorer);
	jobOperator.setJobRepository(jobRepository);
	jobOperator.setJobRegistry(jobRegistry);
	jobOperator.setJobLauncher(jobLauncher);

	return jobOperator;
 }

以下示例显示了XML中SimpleJobOperator的典型bean定义。

<bean id="jobOperator" class="org.spr...SimpleJobOperator">
    <property name="jobExplorer">
        <bean class="org.spr...JobExplorerFactoryBean">
            <property name="dataSource" ref="dataSource" />
        </bean>
    </property>
    <property name="jobRepository" ref="jobRepository" />
    <property name="jobRegistry" ref="jobRegistry" />
    <property name="jobLauncher" ref="jobLauncher" />
</bean>

从5.0版本开始,@EnableBatchProcessing注解会在应用程序上下文中自动注册一个作业操作符bean。

如果在作业存储库上设置了表前缀,请不要忘记在作业资源管理器上也设置它。

JobParametersIncrementer

JobOperator上的大多数方法是不言自明的,您可以在接口的Javadoc中找到更详细的解释。但是,startNextInstance方法值得注意。此方法始终启动Job的新实例。如果JobExecution中存在严重问题并且需要从头开始重新启动Job,这将非常有用。与JobLauncher(需要一个触发新JobInstance的新JobParameters对象)不同,如果参数与任何以前的参数集不同,则startNextInstance方法使用与Job绑定的JobParametersIncrementer强制Job进入新实例。

public interface JobParametersIncrementer {

    JobParameters getNext(JobParameters parameters);

}

JobParametersIncrementer的约定是,给定一个JobParameters对象,它通过递增可能包含的任何必要值来返回“下一个”JobParameters对象。此策略很有用,因为框架无法知道对JobParameters的哪些更改使其成为“下一个”实例。例如,如果JobParameters中唯一的值得是日期,并且应该创建下一个实例,那么该值应该增加一天还是一周(如果作业是每周一次)?对于任何有助于标识Job的数值,也可以这么说,如下例所示。

public class SampleIncrementer implements JobParametersIncrementer {

    public JobParameters getNext(JobParameters parameters) {
        if (parameters==null || parameters.isEmpty()) {
            return new JobParametersBuilder().addLong("run.id", 1L).toJobParameters();
        }
        long id = parameters.getLong("run.id",1L) + 1;
        return new JobParametersBuilder().addLong("run.id", id).toJobParameters();
    }
}

在此示例中,键为run.id的值用于区分JobInstances。如果传入的JobParameters为空,则可以假设Job以前从未运行过,因此可以返回其初始状态。但是,如果不是,则获取旧值,将其加1,然后返回。

  • Java

  • XML

对于在Java中定义的作业,您可以通过构建器中提供的incrementer方法将递增器与Job关联,如下所示。

@Bean
public Job footballJob(JobRepository jobRepository) {
    return new JobBuilder("footballJob", jobRepository)
    				 .incrementer(sampleIncrementer())
    				 ...
                     .build();
}

对于在XML中定义的作业,您可以通过命名空间中的incrementer属性将递增器与Job关联,如下所示。

<job id="footballJob" incrementer="sampleIncrementer">
    ...
</job>

停止作业

JobOperator最常见的用例之一是优雅地停止作业。

Set<Long> executions = jobOperator.getRunningExecutions("sampleJob");
jobOperator.stop(executions.iterator().next());

关闭不是立即的,因为没有办法强制立即关闭,尤其是在当前执行在框架无法控制的开发人员代码中(例如业务服务)。但是,一旦控制权返回到框架,它就会将当前StepExecution的状态设置为BatchStatus.STOPPED,保存它,并在完成前对JobExecution执行相同的操作。

中止作业

可以重新启动状态为FAILED的作业执行(如果Job是可重新启动的)。状态为ABANDONED的作业执行无法由框架重新启动。ABANDONED状态也用于步骤执行,以将其标记为在重新启动的作业执行中可跳过。如果作业正在运行并遇到在之前的失败作业执行中已标记为ABANDONED的步骤,它将继续执行下一步(由作业流程定义和步骤执行退出状态确定)。

如果进程死亡(kill -9或服务器故障),作业当然没有运行,但JobRepository无法知道,因为在进程死亡之前没有人告诉它。您必须手动告诉它您知道执行已失败或应被视为已中止(将其状态更改为FAILEDABANDONED)。这是一个业务决策,无法自动化。只有当它是可重新启动的并且您知道重新启动数据有效时,才将其状态更改为FAILED