常见批处理模式
一些批处理作业可以完全由 Spring Batch 中的现成组件组装而成。例如,可以配置ItemReader
和ItemWriter
实现来涵盖各种场景。但是,在大多数情况下,必须编写自定义代码。应用程序开发人员的主要 API 入口点是Tasklet
、ItemReader
、ItemWriter
和各种监听器接口。大多数简单的批处理作业可以使用 Spring Batch ItemReader
的现成输入,但通常情况下,处理和写入中存在自定义问题,需要开发人员实现ItemWriter
或ItemProcessor
。
在本章中,我们提供了一些自定义业务逻辑中常见模式的示例。这些示例主要包含监听器接口。需要注意的是,如果合适,ItemReader
或ItemWriter
也可以实现监听器接口。
记录项目处理和失败
一个常见的用例是需要逐项对步骤中的错误进行特殊处理,也许是记录到特殊通道或将记录插入数据库。面向块的Step
(由步骤工厂 Bean 创建)允许用户使用简单的ItemReadListener
来处理read
操作中的错误,使用ItemWriteListener
来处理write
操作中的错误。以下代码片段演示了一个记录读取和写入失败的监听器。
public class ItemFailureLoggerListener extends ItemListenerSupport {
private static Log logger = LogFactory.getLog("item.error");
public void onReadError(Exception ex) {
logger.error("Encountered error on read", e);
}
public void onWriteError(Exception ex, List<? extends Object> items) {
logger.error("Encountered error on write", ex);
}
}
实现此监听器后,必须将其注册到步骤。
-
Java
-
XML
以下示例演示如何使用 Java 将监听器注册到步骤
@Bean
public Step simpleStep(JobRepository jobRepository) {
return new StepBuilder("simpleStep", jobRepository)
...
.listener(new ItemFailureLoggerListener())
.build();
}
以下示例演示如何使用 XML 将监听器注册到步骤
<step id="simpleStep">
...
<listeners>
<listener>
<bean class="org.example...ItemFailureLoggerListener"/>
</listener>
</listeners>
</step>
如果你的监听器在onError() 方法中执行任何操作,它必须位于将要回滚的事务中。如果你需要在onError() 方法中使用事务性资源(例如数据库),请考虑为此方法添加声明式事务(详情请参阅 Spring Core 参考指南),并将它的传播属性值设置为REQUIRES_NEW 。 |
出于业务原因手动停止作业
Spring Batch 通过JobOperator
接口提供了一个stop()
方法,但这实际上是供操作员而非应用程序程序员使用。有时,在业务逻辑中停止作业执行更方便或更有意义。
最简单的方法是抛出一个RuntimeException
(既不会无限重试也不会跳过)。例如,可以使用自定义异常类型,如下例所示。
public class PoisonPillItemProcessor<T> implements ItemProcessor<T, T> {
@Override
public T process(T item) throws Exception {
if (isPoisonPill(item)) {
throw new PoisonPillException("Poison pill detected: " + item);
}
return item;
}
}
另一种简单停止步骤执行的方法是从ItemReader
返回null
,如下例所示。
public class EarlyCompletionItemReader implements ItemReader<T> {
private ItemReader<T> delegate;
public void setDelegate(ItemReader<T> delegate) { ... }
public T read() throws Exception {
T item = delegate.read();
if (isEndItem(item)) {
return null; // end the step here
}
return item;
}
}
前面的示例实际上依赖于这样一个事实:存在一个CompletionPolicy
策略的默认实现,当要处理的项目为null
时,它会发出完成批处理的信号。可以实现更复杂的完成策略,并通过SimpleStepFactoryBean
将其注入到Step
中。
-
Java
-
XML
以下示例演示如何使用 Java 将完成策略注入到步骤中
@Bean
public Step simpleStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("simpleStep", jobRepository)
.<String, String>chunk(new SpecialCompletionPolicy(), transactionManager)
.reader(reader())
.writer(writer())
.build();
}
以下示例演示如何使用 XML 将完成策略注入到步骤中
<step id="simpleStep">
<tasklet>
<chunk reader="reader" writer="writer" commit-interval="10"
chunk-completion-policy="completionPolicy"/>
</tasklet>
</step>
<bean id="completionPolicy" class="org.example...SpecialCompletionPolicy"/>
另一种方法是在StepExecution
中设置一个标志,框架中的Step
实现会在项目处理之间检查此标志。要实现此替代方案,我们需要访问当前的StepExecution
,这可以通过实现StepListener
并将其注册到Step
来实现。以下示例显示了一个设置标志的监听器。
public class CustomItemWriter extends ItemListenerSupport implements StepListener {
private StepExecution stepExecution;
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
public void afterRead(Object item) {
if (isPoisonPill(item)) {
stepExecution.setTerminateOnly();
}
}
}
当设置标志时,默认行为是步骤抛出JobInterruptedException
。此行为可以通过StepInterruptionPolicy
控制。但是,唯一的选择是抛出或不抛出异常,因此这始终是作业的异常结束。
添加页脚记录
通常,当写入平面文件时,必须在所有处理完成后将“页脚”记录附加到文件的末尾。这可以使用Spring Batch提供的FlatFileFooterCallback
接口实现。FlatFileFooterCallback
(及其对应的FlatFileHeaderCallback
)是FlatFileItemWriter
的可选属性,可以添加到项目写入器中。
-
Java
-
XML
以下示例演示如何在Java中使用FlatFileHeaderCallback
和FlatFileFooterCallback
@Bean
public FlatFileItemWriter<String> itemWriter(Resource outputResource) {
return new FlatFileItemWriterBuilder<String>()
.name("itemWriter")
.resource(outputResource)
.lineAggregator(lineAggregator())
.headerCallback(headerCallback())
.footerCallback(footerCallback())
.build();
}
以下示例演示如何在XML中使用FlatFileHeaderCallback
和FlatFileFooterCallback
<bean id="itemWriter" class="org.spr...FlatFileItemWriter">
<property name="resource" ref="outputResource" />
<property name="lineAggregator" ref="lineAggregator"/>
<property name="headerCallback" ref="headerCallback" />
<property name="footerCallback" ref="footerCallback" />
</bean>
页脚回调接口只有一个方法,该方法在必须写入页脚时被调用,如下面的接口定义所示
public interface FlatFileFooterCallback {
void writeFooter(Writer writer) throws IOException;
}
写入摘要页脚
涉及页脚记录的常见需求是在输出过程中聚合信息并将此信息附加到文件的末尾。此页脚通常用作文件的摘要或提供校验和。
例如,如果批处理作业正在将Trade
记录写入平面文件,并且需要将所有Trades
的总金额放在页脚中,则可以使用以下ItemWriter
实现
public class TradeItemWriter implements ItemWriter<Trade>,
FlatFileFooterCallback {
private ItemWriter<Trade> delegate;
private BigDecimal totalAmount = BigDecimal.ZERO;
public void write(Chunk<? extends Trade> items) throws Exception {
BigDecimal chunkTotal = BigDecimal.ZERO;
for (Trade trade : items) {
chunkTotal = chunkTotal.add(trade.getAmount());
}
delegate.write(items);
// After successfully writing all items
totalAmount = totalAmount.add(chunkTotal);
}
public void writeFooter(Writer writer) throws IOException {
writer.write("Total Amount Processed: " + totalAmount);
}
public void setDelegate(ItemWriter delegate) {...}
}
此TradeItemWriter
存储一个totalAmount
值,该值随着写入的每个Trade
项目的amount
而增加。处理完最后一个Trade
后,框架会调用writeFooter
,该方法将totalAmount
写入文件。请注意,write
方法使用了临时变量chunkTotal
,该变量存储块中Trade
金额的总和。这样做是为了确保如果在write
方法中发生跳过,则totalAmount
保持不变。只有在write
方法结束时,一旦我们保证不会抛出异常,我们才会更新totalAmount
。
为了调用writeFooter
方法,必须将实现FlatFileFooterCallback
的TradeItemWriter
作为footerCallback
连接到FlatFileItemWriter
中。
-
Java
-
XML
以下示例演示如何在Java中连接TradeItemWriter
@Bean
public TradeItemWriter tradeItemWriter() {
TradeItemWriter itemWriter = new TradeItemWriter();
itemWriter.setDelegate(flatFileItemWriter(null));
return itemWriter;
}
@Bean
public FlatFileItemWriter<String> flatFileItemWriter(Resource outputResource) {
return new FlatFileItemWriterBuilder<String>()
.name("itemWriter")
.resource(outputResource)
.lineAggregator(lineAggregator())
.footerCallback(tradeItemWriter())
.build();
}
以下示例演示如何在XML中连接TradeItemWriter
<bean id="tradeItemWriter" class="..TradeItemWriter">
<property name="delegate" ref="flatFileItemWriter" />
</bean>
<bean id="flatFileItemWriter" class="org.spr...FlatFileItemWriter">
<property name="resource" ref="outputResource" />
<property name="lineAggregator" ref="lineAggregator"/>
<property name="footerCallback" ref="tradeItemWriter" />
</bean>
到目前为止,TradeItemWriter
的编写方式只有在Step
不可重启时才能正确运行。这是因为该类是有状态的(因为它存储totalAmount
),但totalAmount
不会持久保存到数据库中。因此,在重启事件中无法检索它。为了使此类可重启,应实现ItemStream
接口以及open
和update
方法,如下例所示
public void open(ExecutionContext executionContext) {
if (executionContext.containsKey("total.amount") {
totalAmount = (BigDecimal) executionContext.get("total.amount");
}
}
public void update(ExecutionContext executionContext) {
executionContext.put("total.amount", totalAmount);
}
update
方法在将该对象持久保存到数据库之前,将totalAmount
的最新版本存储到ExecutionContext
中。open
方法从ExecutionContext
检索任何现有的totalAmount
并将其用作处理的起点,允许TradeItemWriter
在重启时从上次运行Step
的地方继续。
驱动基于查询的ItemReaders
在关于读取器和写入器的章节中,讨论了使用分页的数据库输入。许多数据库供应商(例如DB2)具有极其悲观的锁定策略,如果正在读取的表也需要被在线应用程序的其他部分使用,则可能会导致问题。此外,在某些供应商的数据库上打开对超大型数据集的光标可能会导致问题。因此,许多项目更喜欢使用“驱动查询”方法来读取数据。这种方法通过迭代键而不是需要返回的整个对象来工作,如下图所示
如您所见,上图所示的示例使用与基于游标的示例中相同的“FOO”表。但是,它不是选择整行,而是在SQL语句中只选择了ID。因此,从read
返回的不是FOO
对象,而是一个Integer
。然后,可以使用此数字查询“详细信息”,这是一个完整的Foo
对象,如下图所示
应使用ItemProcessor
将从驱动查询获得的键转换为完整的Foo
对象。可以使用现有的DAO根据键查询完整的对象。
多行记录
虽然平面文件的每条记录通常都限制在一行内,但文件可能有跨越多行且具有多种格式的记录的情况很常见。以下是文件摘录,其中显示了此类安排的示例
HEA;0013100345;2007-02-15 NCU;Smith;Peter;;T;20014539;F BAD;;Oak Street 31/A;;Small Town;00235;IL;US FOT;2;2;267.34
从以“HEA”开头的行到以“FOT”开头的行之间的所有内容都被视为一条记录。为了正确处理这种情况,必须考虑一些因素
-
ItemReader
不应一次读取一条记录,而应将多行记录的每一行作为一组读取,以便可以完整地将其传递给ItemWriter
。 -
每种行类型可能需要以不同的方式进行标记。
因为单个记录跨越多行,并且我们可能不知道有多少行,所以ItemReader
必须小心始终读取整个记录。为此,应实现一个自定义ItemReader
作为FlatFileItemReader
的包装器。
-
Java
-
XML
以下示例演示如何在Java中实现自定义ItemReader
@Bean
public MultiLineTradeItemReader itemReader() {
MultiLineTradeItemReader itemReader = new MultiLineTradeItemReader();
itemReader.setDelegate(flatFileItemReader());
return itemReader;
}
@Bean
public FlatFileItemReader flatFileItemReader() {
FlatFileItemReader<Trade> reader = new FlatFileItemReaderBuilder<>()
.name("flatFileItemReader")
.resource(new ClassPathResource("data/iosample/input/multiLine.txt"))
.lineTokenizer(orderFileTokenizer())
.fieldSetMapper(orderFieldSetMapper())
.build();
return reader;
}
以下示例演示如何在XML中实现自定义ItemReader
<bean id="itemReader" class="org.spr...MultiLineTradeItemReader">
<property name="delegate">
<bean class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="data/iosample/input/multiLine.txt" />
<property name="lineMapper">
<bean class="org.spr...DefaultLineMapper">
<property name="lineTokenizer" ref="orderFileTokenizer"/>
<property name="fieldSetMapper" ref="orderFieldSetMapper"/>
</bean>
</property>
</bean>
</property>
</bean>
为了确保每行都正确地进行了标记,这对于定长输入尤其重要,可以在委托FlatFileItemReader
上使用PatternMatchingCompositeLineTokenizer
。有关更多详细信息,请参阅读取器和写入器章节中的FlatFileItemReader
。然后,委托读取器使用PassThroughFieldSetMapper
为每一行返回一个FieldSet
给包装的ItemReader
。
-
Java
-
XML
以下示例演示如何在Java中确保每行都正确标记
@Bean
public PatternMatchingCompositeLineTokenizer orderFileTokenizer() {
PatternMatchingCompositeLineTokenizer tokenizer =
new PatternMatchingCompositeLineTokenizer();
Map<String, LineTokenizer> tokenizers = new HashMap<>(4);
tokenizers.put("HEA*", headerRecordTokenizer());
tokenizers.put("FOT*", footerRecordTokenizer());
tokenizers.put("NCU*", customerLineTokenizer());
tokenizers.put("BAD*", billingAddressLineTokenizer());
tokenizer.setTokenizers(tokenizers);
return tokenizer;
}
以下示例演示如何在XML中确保每行都正确标记
<bean id="orderFileTokenizer" class="org.spr...PatternMatchingCompositeLineTokenizer">
<property name="tokenizers">
<map>
<entry key="HEA*" value-ref="headerRecordTokenizer" />
<entry key="FOT*" value-ref="footerRecordTokenizer" />
<entry key="NCU*" value-ref="customerLineTokenizer" />
<entry key="BAD*" value-ref="billingAddressLineTokenizer" />
</map>
</property>
</bean>
此包装器必须能够识别记录的结尾,以便它可以连续地在其委托上调用read()
,直到到达结尾为止。对于读取的每一行,包装器都应该构建要返回的项目。一旦到达页脚,就可以返回该项目以将其传递给ItemProcessor
和ItemWriter
,如下例所示
private FlatFileItemReader<FieldSet> delegate;
public Trade read() throws Exception {
Trade t = null;
for (FieldSet line = null; (line = this.delegate.read()) != null;) {
String prefix = line.readString(0);
if (prefix.equals("HEA")) {
t = new Trade(); // Record must start with header
}
else if (prefix.equals("NCU")) {
Assert.notNull(t, "No header was found.");
t.setLast(line.readString(1));
t.setFirst(line.readString(2));
...
}
else if (prefix.equals("BAD")) {
Assert.notNull(t, "No header was found.");
t.setCity(line.readString(4));
t.setState(line.readString(6));
...
}
else if (prefix.equals("FOT")) {
return t; // Record must end with footer
}
}
Assert.isNull(t, "No 'END' was found.");
return null;
}
执行系统命令
许多批处理作业都需要从批处理作业内部调用外部命令。此类流程可以由调度程序单独启动,但是会丢失关于运行的公共元数据的优势。此外,多步骤作业也需要拆分为多个作业。
由于需求如此普遍,Spring Batch提供了一个用于调用系统命令的Tasklet
实现。
-
Java
-
XML
以下示例演示如何在Java中调用外部命令
@Bean
public SystemCommandTasklet tasklet() {
SystemCommandTasklet tasklet = new SystemCommandTasklet();
tasklet.setCommand("echo hello");
tasklet.setTimeout(5000);
return tasklet;
}
以下示例演示如何在XML中调用外部命令
<bean class="org.springframework.batch.core.step.tasklet.SystemCommandTasklet">
<property name="command" value="echo hello" />
<!-- 5 second timeout for the command to complete -->
<property name="timeout" value="5000" />
</bean>
处理未找到输入时的步骤完成
在许多批处理场景中,在数据库或文件中找不到要处理的行并非异常情况。Step
被简单地认为没有找到任何工作,并以读取的项目数为0完成。Spring Batch中提供的开箱即用的所有ItemReader
实现都默认使用这种方法。如果即使存在输入也没有写入任何内容(通常是由于文件名错误或出现类似问题导致的),这可能会导致一些混淆。因此,应检查元数据本身以确定框架发现了多少工作要处理。但是,如果找不到输入被认为是异常情况怎么办?在这种情况下,以编程方式检查元数据中是否有未处理的项目并导致失败是最佳解决方案。由于这是一个常见的用例,Spring Batch提供了一个具有此功能的侦听器,如下所示NoWorkFoundStepExecutionListener
类的定义
public class NoWorkFoundStepExecutionListener extends StepExecutionListenerSupport {
public ExitStatus afterStep(StepExecution stepExecution) {
if (stepExecution.getReadCount() == 0) {
return ExitStatus.FAILED;
}
return null;
}
}
前面的StepExecutionListener
在“afterStep”阶段检查StepExecution
的readCount
属性,以确定是否未读取任何项目。如果是这种情况,则返回退出代码FAILED
,表示Step
应该失败。否则,返回null
,这不会影响Step
的状态。
将数据传递给未来的步骤
将信息从一个步骤传递到另一个步骤通常很有用。这可以通过ExecutionContext
完成。问题是存在两个ExecutionContexts
:一个在Step
级别,一个在Job
级别。Step
ExecutionContext
仅在步骤期间存在,而Job
ExecutionContext
在整个Job
期间都存在。另一方面,每次Step
提交一个块时都会更新Step
ExecutionContext
,而只有在每个Step
结束时才会更新Job
ExecutionContext
。
这种分离的结果是所有数据都必须在Step
执行期间放在Step
ExecutionContext
中。这样做可以确保在Step
运行时数据被正确存储。如果数据存储到Job
ExecutionContext
中,则在Step
执行期间不会将其持久化。如果Step
失败,则该数据会丢失。
public class SavingItemWriter implements ItemWriter<Object> {
private StepExecution stepExecution;
public void write(Chunk<? extends Object> items) throws Exception {
// ...
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
stepContext.put("someKey", someObject);
}
@BeforeStep
public void saveStepExecution(StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
}
为了使数据可用于将来的Steps
,必须在步骤完成后将其“提升”到Job
ExecutionContext
。Spring Batch为此目的提供了ExecutionContextPromotionListener
。必须使用与必须提升的ExecutionContext
中的数据相关的键配置侦听器。此外,还可以选择性地使用应进行提升的退出代码模式列表进行配置(默认为COMPLETED
)。与所有侦听器一样,它必须注册在Step
上。
-
Java
-
XML
以下示例演示如何在Java中将步骤提升到Job
ExecutionContext
@Bean
public Job job1(JobRepository jobRepository, Step step1, Step step2) {
return new JobBuilder("job1", jobRepository)
.start(step1)
.next(step2)
.build();
}
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(10, transactionManager)
.reader(reader())
.writer(savingWriter())
.listener(promotionListener())
.build();
}
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[] {"someKey"});
return listener;
}
以下示例演示如何在XML中将步骤提升到Job
ExecutionContext
<job id="job1">
<step id="step1">
<tasklet>
<chunk reader="reader" writer="savingWriter" commit-interval="10"/>
</tasklet>
<listeners>
<listener ref="promotionListener"/>
</listeners>
</step>
<step id="step2">
...
</step>
</job>
<beans:bean id="promotionListener" class="org.spr....ExecutionContextPromotionListener">
<beans:property name="keys">
<list>
<value>someKey</value>
</list>
</beans:property>
</beans:bean>
最后,必须从Job
ExecutionContext
检索保存的值,如下例所示
public class RetrievingItemWriter implements ItemWriter<Object> {
private Object someObject;
public void write(Chunk<? extends Object> items) throws Exception {
// ...
}
@BeforeStep
public void retrieveInterstepData(StepExecution stepExecution) {
JobExecution jobExecution = stepExecution.getJobExecution();
ExecutionContext jobContext = jobExecution.getExecutionContext();
this.someObject = jobContext.get("someKey");
}
}