拦截 Step 执行
与 Job 一样,在 Step 执行期间有许多事件,用户可能需要在这些事件中执行某些功能。例如,要写入需要页脚的平面文件,当 Step 完成时需要通知 ItemWriter,以便可以写入页脚。这可以通过许多 Step 作用域的监听器之一来实现。
您可以通过 listeners 元素将实现 StepListener 扩展接口之一(但不包括该接口本身,因为它为空)的任何类应用于 step。listeners 元素在 step、tasklet 或 chunk 声明中有效。我们建议您在其功能适用的级别声明监听器,或者如果它具有多项功能(例如 StepExecutionListener 和 ItemReadListener),则在它适用的最细粒度级别声明它。
- 
Java 
- 
XML 
以下示例展示了在 Java 中应用于 chunk 级别的监听器
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(reader())
				.writer(writer())
				.listener(chunkListener())
				.build();
}以下示例展示了在 XML 中应用于 chunk 级别的监听器
<step id="step1">
    <tasklet>
        <chunk reader="reader" writer="writer" commit-interval="10"/>
        <listeners>
            <listener ref="chunkListener"/>
        </listeners>
    </tasklet>
</step>如果使用命名空间 <step> 元素或其中一个 *StepFactoryBean 工厂,实现 StepListener 接口之一的 ItemReader、ItemWriter 或 ItemProcessor 会自动注册到 Step。这仅适用于直接注入到 Step 中的组件。如果监听器嵌套在另一个组件内部,则需要显式注册它(如之前在将 ItemStream 注册到 Step 中所述)。
除了 StepListener 接口,还提供了注解来解决相同的问题。普通的 Java 对象可以拥有带有这些注解的方法,然后这些方法会被转换为相应的 StepListener 类型。同时,对 chunk 组件的自定义实现(如 ItemReader、ItemWriter 或 Tasklet)进行注解也很常见。XML 解析器和构建器中的 listener 方法都会分析 <listener/> 元素和这些注解,因此您只需使用 XML 命名空间或构建器将监听器注册到 step 即可。
StepExecutionListener
StepExecutionListener 是用于 Step 执行的最通用的监听器。它允许在 Step 开始之前以及在它结束之后(无论正常结束还是失败)进行通知,如下例所示
public interface StepExecutionListener extends StepListener {
    void beforeStep(StepExecution stepExecution);
    ExitStatus afterStep(StepExecution stepExecution);
}afterStep 的返回类型是 ExitStatus,这使监听器有机会修改 Step 完成时返回的退出码。
与此接口对应的注解是
- 
@BeforeStep
- 
@AfterStep
ChunkListener
“chunk” 定义为在事务范围内处理的项。在每个提交间隔提交事务即是提交一个 chunk。您可以使用 ChunkListener 在 chunk 开始处理之前或 chunk 成功完成之后执行逻辑,如下面的接口定义所示
public interface ChunkListener extends StepListener {
    void beforeChunk(ChunkContext context);
    void afterChunk(ChunkContext context);
    void afterChunkError(ChunkContext context);
}beforeChunk 方法在事务启动后、ItemReader 开始读取之前调用。反之,afterChunk 在 chunk 提交后调用(如果发生回滚则完全不调用)。
与此接口对应的注解是
- 
@BeforeChunk
- 
@AfterChunk
- 
@AfterChunkError
即使没有 chunk 声明,您也可以应用 ChunkListener。TaskletStep 负责调用 ChunkListener,因此它也适用于非面向项的 tasklet(在 tasklet 之前和之后调用)。
ChunkListener 不设计用于抛出 checked exception。错误必须在实现中处理,否则 step 将终止。
ItemReadListener
在之前讨论跳过逻辑时,曾提到记录跳过的记录可能会很有益处,以便以后处理它们。在读取错误的情况下,这可以通过 ItemReaderListener 来完成,如下面的接口定义所示
public interface ItemReadListener<T> extends StepListener {
    void beforeRead();
    void afterRead(T item);
    void onReadError(Exception ex);
}beforeRead 方法在每次调用 ItemReader 的 read 方法之前调用。afterRead 方法在每次成功调用 read 方法之后调用,并传递读取的项。如果在读取时发生错误,则调用 onReadError 方法。提供遇到的异常,以便可以记录下来。
与此接口对应的注解是
- 
@BeforeRead
- 
@AfterRead
- 
@OnReadError
ItemProcessListener
与 ItemReadListener 类似,可以“监听”项的处理过程,如下面的接口定义所示
public interface ItemProcessListener<T, S> extends StepListener {
    void beforeProcess(T item);
    void afterProcess(T item, S result);
    void onProcessError(T item, Exception e);
}beforeProcess 方法在调用 ItemProcessor 的 process 方法之前调用,并接收待处理的项。afterProcess 方法在项成功处理后调用。如果在处理时发生错误,则调用 onProcessError 方法。提供遇到的异常和尝试处理的项,以便可以记录下来。
与此接口对应的注解是
- 
@BeforeProcess
- 
@AfterProcess
- 
@OnProcessError
ItemWriteListener
您可以使用 ItemWriteListener“监听”项的写入过程,如下面的接口定义所示
public interface ItemWriteListener<S> extends StepListener {
    void beforeWrite(List<? extends S> items);
    void afterWrite(List<? extends S> items);
    void onWriteError(Exception exception, List<? extends S> items);
}beforeWrite 方法在调用 ItemWriter 的 write 方法之前调用,并接收要写入的项列表。afterWrite 方法在项成功写入后调用,但在提交与 chunk 处理关联的事务之前。如果在写入时发生错误,则调用 onWriteError 方法。提供遇到的异常和尝试写入的项,以便可以记录下来。
与此接口对应的注解是
- 
@BeforeWrite
- 
@AfterWrite
- 
@OnWriteError
SkipListener
ItemReadListener、ItemProcessListener 和 ItemWriteListener 都提供了错误通知机制,但它们都不会告知您某条记录是否已被实际跳过。例如,即使某个项重试成功,onWriteError 也会被调用。因此,有一个单独的接口用于跟踪跳过的项,如下面的接口定义所示
public interface SkipListener<T,S> extends StepListener {
    void onSkipInRead(Throwable t);
    void onSkipInProcess(T item, Throwable t);
    void onSkipInWrite(S item, Throwable t);
}当读取时跳过某个项时,会调用 onSkipInRead。值得注意的是,回滚可能会导致同一个项被多次注册为跳过。当写入时跳过某个项时,会调用 onSkipInWrite。因为该项已被成功读取(未跳过),所以也会将该项本身作为参数提供。
与此接口对应的注解是
- 
@OnSkipInRead
- 
@OnSkipInWrite
- 
@OnSkipInProcess