批处理的领域语言
对于任何经验丰富的批处理架构师来说,Spring Batch 中使用的批处理总体概念都应该是熟悉和舒适的。存在“作业”和“步骤”,以及开发人员提供的称为 ItemReader
和 ItemWriter
的处理单元。但是,由于 Spring 模式、操作、模板、回调和习惯用法,因此存在以下机会
-
显著提高对明确关注点分离的遵守。
-
清晰界定的架构层和作为接口提供的服务。
-
简单且默认的实现,允许开箱即用地快速采用和易用性。
-
显著增强的可扩展性。
下图是批处理参考架构的简化版本,该架构已使用了几十年。它概述了构成批处理领域语言的组件。此架构框架是一个蓝图,已通过在过去几代平台(大型机上的 COBOL、Unix 上的 C,以及现在的任何地方的 Java)上的数十次实施得到验证。JCL 和 COBOL 开发人员可能与 C、C# 和 Java 开发人员一样熟悉这些概念。Spring Batch 提供了在健壮、可维护的系统中通常发现的层、组件和技术服务的物理实现,这些系统用于解决从简单到复杂的批处理应用程序的创建,以及解决非常复杂的处理需求的基础设施和扩展。
上图突出显示了构成 Spring Batch 领域语言的关键概念。一个 Job
包含一个到多个步骤,每个步骤正好有一个 ItemReader
、一个 ItemProcessor
和一个 ItemWriter
。作业需要启动(使用 JobLauncher
),并且需要存储有关当前正在运行的流程的元数据(在 JobRepository
中)。
作业
本节描述与批处理作业概念相关的原型。一个 Job
是一个封装整个批处理过程的实体。与其他 Spring 项目一样,Job
使用 XML 配置文件或基于 Java 的配置连接在一起。此配置可能被称为“作业配置”。但是,Job
只是整体层次结构的顶层,如下面的图所示
在 Spring Batch 中,Job
只是一个 Step
实例的容器。它组合了在流程中逻辑上属于一起的多个步骤,并允许配置全局适用于所有步骤的属性,例如可重启性。作业配置包含
-
作业的名称。
-
Step
实例的定义和排序。 -
作业是否可重启。
-
Java
-
XML
对于使用 Java 配置的用户,Spring Batch 以 SimpleJob
类的方式提供了 Job
接口的默认实现,该类在 Job
之上创建了一些标准功能。当使用基于 Java 的配置时,将提供一系列构建器用于实例化 Job
,如下面的示例所示
@Bean
public Job footballJob(JobRepository jobRepository) {
return new JobBuilder("footballJob", jobRepository)
.start(playerLoad())
.next(gameLoad())
.next(playerSummarization())
.build();
}
对于使用 XML 配置的用户,Spring Batch 以 SimpleJob
类的方式提供了 Job
接口的默认实现,该类在 Job
之上创建了一些标准功能。但是,批处理命名空间隐藏了直接实例化它的需要。相反,您可以使用 <job>
元素,如下面的示例所示
<job id="footballJob">
<step id="playerload" next="gameLoad"/>
<step id="gameLoad" next="playerSummarization"/>
<step id="playerSummarization"/>
</job>
JobInstance
JobInstance
代表一个逻辑作业运行的概念。考虑一个应该在每天结束时运行一次的批处理作业,例如前面图表中的 EndOfDay
Job
。只有一个 EndOfDay
作业,但 Job
的每次单独运行都必须单独跟踪。在此作业的情况下,每天有一个逻辑 JobInstance
。例如,有一个 1 月 1 日的运行,一个 1 月 2 日的运行,依此类推。如果 1 月 1 日的运行第一次失败并在第二天再次运行,它仍然是 1 月 1 日的运行。(通常,这也与它正在处理的数据相对应,这意味着 1 月 1 日的运行处理 1 月 1 日的数据)。因此,每个 JobInstance
可以有多个执行(JobExecution
在本章后面详细讨论),并且在给定时间只能运行一个 JobInstance
(对应于特定的 Job
和标识 JobParameters
)。
JobInstance
的定义与要加载的数据绝对没有关系。完全由 ItemReader
实现来决定如何加载数据。例如,在 EndOfDay
场景中,数据上可能有一列指示数据所属的 有效日期
或 计划日期
。因此,1 月 1 日的运行将只加载第 1 天的数据,而 1 月 2 日的运行将只使用第 2 天的数据。由于此确定很可能是一个业务决策,因此由 ItemReader
决定。但是,使用相同的 JobInstance
决定是否使用先前执行的“状态”(即 ExecutionContext
,将在本章后面讨论)。使用新的 JobInstance
表示“从头开始”,而使用现有实例通常表示“从上次中断的地方开始”。
JobParameters
在讨论了 JobInstance
及其与 Job
的区别之后,自然会问:“如何区分一个 JobInstance
与另一个?”答案是:JobParameters
。JobParameters
对象保存一组用于启动批处理作业的参数。它们可以用于识别,甚至可以在运行期间用作参考数据,如下面的图片所示
在前面的示例中,有两个实例,一个用于 1 月 1 日,另一个用于 1 月 2 日,实际上只有一个 Job
,但它有两个 JobParameter
对象:一个以 01-01-2017 的作业参数启动,另一个以 01-02-2017 的参数启动。因此,可以将约定定义为:JobInstance
= Job
+ 标识 JobParameters
。这允许开发人员有效地控制 JobInstance
的定义方式,因为他们控制传入的参数。
并非所有作业参数都需要有助于 JobInstance 的识别。默认情况下,它们会这样做。但是,框架也允许提交不有助于 JobInstance 身份识别的参数的 Job 。 |
JobExecution
JobExecution
指的是运行作业的单个尝试的技术概念。执行可能以失败或成功结束,但除非执行成功完成,否则与给定执行相对应的 JobInstance
不被认为已完成。以前面描述的 EndOfDay
Job
为例,考虑一个 01-01-2017 的 JobInstance
,它在第一次运行时失败了。如果使用与第一次运行相同的标识作业参数(01-01-2017)再次运行它,则会创建一个新的 JobExecution
。但是,仍然只有一个 JobInstance
。
Job
定义作业是什么以及如何执行,而 JobInstance
是一个纯粹的组织对象,用于将执行分组在一起,主要目的是启用正确的重启语义。但是,JobExecution
是实际运行过程中发生情况的主要存储机制,并且包含许多必须控制和持久化的属性,如下表所示
属性 |
定义 |
|
一个 |
|
一个 |
|
一个 |
|
|
|
一个 |
|
一个 |
|
“属性包”,包含在执行之间需要持久化的任何用户数据。 |
|
在执行 |
这些属性很重要,因为它们会持久化,并且可用于完全确定执行的状态。例如,如果 01-01 的 EndOfDay
作业在晚上 9:00 执行并在 9:30 失败,则在批处理元数据表中会创建以下条目
JOB_INST_ID |
JOB_NAME |
1 |
EndOfDayJob |
JOB_EXECUTION_ID |
TYPE_CD |
KEY_NAME |
DATE_VAL |
IDENTIFYING |
1 |
DATE |
schedule.Date |
2017-01-01 |
TRUE |
JOB_EXEC_ID |
JOB_INST_ID |
START_TIME |
END_TIME |
STATUS |
1 |
1 |
2017-01-01 21:00 |
2017-01-01 21:30 |
FAILED |
为了清晰和格式化,列名可能已被缩写或删除。 |
现在作业已失败,假设需要整个晚上才能确定问题,因此“批处理窗口”现在已关闭。进一步假设窗口从晚上 9:00 开始,作业再次为 01-01 启动,从上次中断的地方开始,并在 9:30 成功完成。因为现在是第二天,所以必须运行 01-02 作业,它紧随其后在 9:31 启动,并在其正常的一小时时间内在 10:30 完成。除非两个作业可能尝试访问相同的数据,从而导致数据库级别的锁定问题,否则不需要一个 JobInstance
在另一个之后启动。完全由调度程序决定何时运行 Job
。由于它们是单独的 JobInstances
,因此 Spring Batch 不会尝试阻止它们同时运行。(尝试在另一个 JobInstance
正在运行时运行相同的 JobInstance
会导致抛出 JobExecutionAlreadyRunningException
)。现在,JobInstance
和 JobParameters
表中应该都多了一个条目,JobExecution
表中也应该多两个条目,如下表所示
JOB_INST_ID |
JOB_NAME |
1 |
EndOfDayJob |
2 |
EndOfDayJob |
JOB_EXECUTION_ID |
TYPE_CD |
KEY_NAME |
DATE_VAL |
IDENTIFYING |
1 |
DATE |
schedule.Date |
2017-01-01 00:00:00 |
TRUE |
2 |
DATE |
schedule.Date |
2017-01-01 00:00:00 |
TRUE |
3 |
DATE |
schedule.Date |
2017-01-02 00:00:00 |
TRUE |
JOB_EXEC_ID |
JOB_INST_ID |
START_TIME |
END_TIME |
STATUS |
1 |
1 |
2017-01-01 21:00 |
2017-01-01 21:30 |
FAILED |
2 |
1 |
2017-01-02 21:00 |
2017-01-02 21:30 |
COMPLETED |
3 |
2 |
2017-01-02 21:31 |
2017-01-02 22:29 |
COMPLETED |
为了清晰和格式化,列名可能已被缩写或删除。 |
Step
Step
是一个域对象,封装了批处理作业的独立、顺序阶段。因此,每个 Job
完全由一个或多个步骤组成。Step
包含定义和控制实际批处理所需的所有信息。这是一个必要的模糊描述,因为任何给定 Step
的内容都由编写 Job
的开发人员自行决定。Step
可以像开发人员期望的那样简单或复杂。一个简单的 Step
可能将数据从文件加载到数据库中,需要很少或不需要代码(取决于使用的实现)。更复杂的 Step
可能具有作为处理的一部分应用的复杂业务规则。与 Job
一样,Step
有一个单独的 StepExecution
,它与唯一的 JobExecution
相关联,如下面的图片所示
StepExecution
StepExecution
表示执行 Step
的单个尝试。每次运行 Step
时都会创建一个新的 StepExecution
,类似于 JobExecution
。但是,如果步骤由于前面的步骤失败而无法执行,则不会为其持久化执行。仅当 Step
实际启动时才会创建 StepExecution
。
Step
执行由 StepExecution
类的对象表示。每个执行都包含对其相应步骤和 JobExecution
以及与事务相关的数据(例如提交和回滚计数以及开始和结束时间)的引用。此外,每个步骤执行都包含一个 ExecutionContext
,其中包含开发人员需要在批处理运行之间持久化的任何数据,例如统计信息或重启所需的 state 信息。下表列出了 StepExecution
的属性
属性 |
定义 |
|
一个 |
|
一个 |
|
一个 |
|
|
|
“属性包”,包含在执行之间需要持久化的任何用户数据。 |
|
已成功读取的项目数。 |
|
已成功写入的项目数。 |
|
为此执行提交的事务数。 |
|
|
|
|
|
|
|
|
|
|
ExecutionContext
ExecutionContext
代表一组键值对的集合,这些键值对由框架持久化和控制,为开发者提供了一个存储持久化状态的地方,该状态的作用域限定在 StepExecution
对象或 JobExecution
对象。(对于熟悉 Quartz 的人来说,它非常类似于 JobDataMap
。)最佳使用示例是促进重启。以平面文件输入为例,在处理单个行时,框架会在提交点定期持久化 ExecutionContext
。这样做可以让 ItemReader
存储其状态,以防在运行期间发生致命错误,甚至停电。只需要将当前读取的行数放入上下文(如下例所示),框架会完成其余工作。
executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition());
以 Job
构造型部分中的 EndOfDay
示例为例,假设有一个步骤 loadData
将文件加载到数据库中。在第一次运行失败后,元数据表将如下所示
JOB_INST_ID |
JOB_NAME |
1 |
EndOfDayJob |
JOB_INST_ID |
TYPE_CD |
KEY_NAME |
DATE_VAL |
1 |
DATE |
schedule.Date |
2017-01-01 |
JOB_EXEC_ID |
JOB_INST_ID |
START_TIME |
END_TIME |
STATUS |
1 |
1 |
2017-01-01 21:00 |
2017-01-01 21:30 |
FAILED |
STEP_EXEC_ID |
JOB_EXEC_ID |
STEP_NAME |
START_TIME |
END_TIME |
STATUS |
1 |
1 |
loadData |
2017-01-01 21:00 |
2017-01-01 21:30 |
FAILED |
STEP_EXEC_ID |
SHORT_CONTEXT |
1 |
{piece.count=40321} |
在上述情况下,Step
运行了 30 分钟并处理了 40,321 个“片段”,在本例中,片段代表文件中的行。此值在每次提交前由框架更新,并且可以包含对应于 ExecutionContext
中条目的多行。在提交前收到通知需要使用各种 StepListener
实现(或 ItemStream
)之一,这些实现将在本指南的后面部分详细讨论。与前面的示例一样,假设 Job
在第二天重新启动。当它重新启动时,上次运行的 ExecutionContext
中的值将从数据库中重建。当 ItemReader
打开时,它可以检查上下文是否有任何存储的状态,并从此处初始化自身,如下例所示。
if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {
log.debug("Initializing for restart. Restart data is: " + executionContext);
long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));
LineReader reader = getReader();
Object record = "";
while (reader.getPosition() < lineCount && record != null) {
record = readLine();
}
}
在这种情况下,在上述代码运行后,当前行数为 40,322,使 Step
可以从中断的地方重新开始。您还可以使用 ExecutionContext
来存储有关运行本身的需要持久化的统计信息。例如,如果一个平面文件包含跨多行的订单以供处理,则可能需要存储已处理的订单数量(这与读取的行数大不相同),以便在 Step
结束时发送一封电子邮件,邮件正文中包含已处理的订单总数。框架为开发者处理存储此信息,以将其正确地限定在单个 JobInstance
范围内。很难知道是否应该使用现有的 ExecutionContext
。例如,使用上面的 EndOfDay
示例,当 01-01 运行第二次启动时,框架会识别出它是同一个 JobInstance
,并在单个 Step
的基础上,从数据库中提取 ExecutionContext
,并将其(作为 StepExecution
的一部分)传递给 Step
本身。相反,对于 01-02 运行,框架会识别出这是一个不同的实例,因此必须将空上下文传递给 Step
。框架为开发者做出了许多此类判断,以确保在正确的时间向他们提供状态。还需要注意,在任何给定时间,每个 StepExecution
只有一个 ExecutionContext
。ExecutionContext
的客户端应该小心,因为这会创建一个共享的键空间。因此,在放入值时应注意,以确保不会覆盖任何数据。但是,Step
在上下文中绝对不存储任何数据,因此无法对框架产生不利影响。
请注意,每个 JobExecution
至少有一个 ExecutionContext
,并且每个 StepExecution
也都有一个。例如,考虑以下代码片段
ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
//ecStep does not equal ecJob
如注释中所述,ecStep
不等于 ecJob
。它们是两个不同的 ExecutionContext
。限定在 Step
范围内的那个会在 Step
的每个提交点保存,而限定在 Job 范围内的那个会在每个 Step
执行之间保存。
在 ExecutionContext 中,所有非瞬态条目都必须是 Serializable 。执行上下文的正确序列化是步骤和作业重启功能的基础。如果您使用非原生可序列化的键或值,则需要采用定制的序列化方法。未能序列化执行上下文可能会危及状态持久化过程,导致失败的作业无法正确恢复。 |
JobRepository
JobRepository
是前面提到的所有构造型的持久化机制。它为 JobLauncher
、Job
和 Step
实现提供了 CRUD 操作。当 Job
首次启动时,会从存储库中获取 JobExecution
。此外,在执行过程中,StepExecution
和 JobExecution
实现会通过传递到存储库来持久化。
-
Java
-
XML
在使用 Java 配置时,@EnableBatchProcessing
注解会提供一个 JobRepository
作为自动配置的组件之一。
Spring Batch XML 命名空间支持使用 <job-repository>
标签配置 JobRepository
实例,如下例所示
<job-repository id="jobRepository"/>
JobLauncher
JobLauncher
代表一个简单的接口,用于使用给定的 JobParameters
集启动 Job
,如下例所示
public interface JobLauncher {
public JobExecution run(Job job, JobParameters jobParameters)
throws JobExecutionAlreadyRunningException, JobRestartException,
JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}
预期实现从 JobRepository
获取有效的 JobExecution
并执行 Job
。
ItemReader
ItemReader
是一个抽象,表示一次读取一个项目的 Step
输入。当 ItemReader
用尽它可以提供的项目时,它会通过返回 null
来指示这一点。您可以在 读者和编写器 中找到有关 ItemReader
接口及其各种实现的更多详细信息。
ItemWriter
ItemWriter
是一个抽象,表示一次写入一批或一块项目的 Step
输出。通常,ItemWriter
不知道它应该接收哪个输入,只知道在其当前调用中传递的项目。您可以在 读者和编写器 中找到有关 ItemWriter
接口及其各种实现的更多详细信息。
ItemProcessor
ItemProcessor
是一个抽象,表示项目的业务处理。虽然 ItemReader
读取一个项目,而 ItemWriter
写入一个项目,但 ItemProcessor
提供了一个访问点来转换或应用其他业务处理。如果在处理项目时确定项目无效,则返回 null
表示不应写入该项目。您可以在 读者和编写器 中找到有关 ItemProcessor
接口的更多详细信息。
批处理命名空间
前面列出的许多领域概念都需要在 Spring ApplicationContext
中进行配置。虽然您可以使用上述接口的实现来在标准 bean 定义中使用,但为了方便配置,提供了一个命名空间,如下例所示
<beans:beans xmlns="http://www.springframework.org/schema/batch"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/batch
https://www.springframework.org/schema/batch/spring-batch.xsd">
<job id="ioSampleJob">
<step id="step1">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="2"/>
</tasklet>
</step>
</job>
</beans:beans>