拦截 Step 执行

就像Job一样,在Step执行期间,用户可能需要执行一些功能,例如,要写入需要页脚的平面文件,ItemWriter需要在Step完成时收到通知,以便写入页脚。这可以通过许多Step范围的监听器之一来实现。

您可以通过listeners元素将任何实现StepListener扩展之一的类(但不能是该接口本身,因为它为空)应用于步骤。listeners元素在步骤、tasklet或chunk声明中有效。我们建议您在功能适用的级别声明监听器,或者如果它是多功能的(例如StepExecutionListenerItemReadListener),则在适用性最细粒度的级别声明它。

  • Java

  • XML

以下示例展示了在 Java 中应用于块级别的监听器

Java 配置
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(reader())
				.writer(writer())
				.listener(chunkListener())
				.build();
}

以下示例展示了在 XML 中应用于块级别的监听器

XML 配置
<step id="step1">
    <tasklet>
        <chunk reader="reader" writer="writer" commit-interval="10"/>
        <listeners>
            <listener ref="chunkListener"/>
        </listeners>
    </tasklet>
</step>

如果使用命名空间 <step> 元素或其中一个 *StepFactoryBean 工厂,则自动将自身实现 StepListener 接口之一的 ItemReaderItemWriterItemProcessor 注册到 Step 中。这仅适用于直接注入到 Step 中的组件。如果监听器嵌套在另一个组件中,则需要显式注册它(如前面在 ItemStream 注册到 Step 中所述)。

除了 StepListener 接口之外,还提供了注解来解决相同的问题。普通 Java 对象可以具有带有这些注解的方法,这些方法随后将转换为相应的 StepListener 类型。为块组件(如 ItemReaderItemWriterTasklet)的自定义实现添加注解也很常见。XML 解析器会分析这些注解以查找 <listener/> 元素,并将其注册到构建器中的 listener 方法,因此您只需使用 XML 命名空间或构建器将监听器注册到步骤即可。

StepExecutionListener

StepExecutionListener 代表 Step 执行的最通用监听器。它允许在 Step 开始之前和结束之后(无论正常结束还是失败)进行通知,如下面的示例所示

public interface StepExecutionListener extends StepListener {

    void beforeStep(StepExecution stepExecution);

    ExitStatus afterStep(StepExecution stepExecution);

}

ExitStatus 的返回类型为 afterStep,以便监听器有机会修改 Step 完成后返回的退出代码。

与该接口相对应的注解为

  • @BeforeStep

  • @AfterStep

ChunkListener

“块”定义为在事务范围内处理的项目。在每个提交间隔提交事务都会提交一个块。您可以使用 ChunkListener 在块开始处理之前或块成功完成之后执行逻辑,如下面的接口定义所示

public interface ChunkListener extends StepListener {

    void beforeChunk(ChunkContext context);
    void afterChunk(ChunkContext context);
    void afterChunkError(ChunkContext context);

}

beforeChunk 方法在事务启动后但在 ItemReader 开始读取之前调用。相反,afterChunk 方法在块提交后调用(如果回滚,则根本不调用)。

与该接口相对应的注解为

  • @BeforeChunk

  • @AfterChunk

  • @AfterChunkError

当没有块声明时,您可以应用一个 ChunkListenerTaskletStep 负责调用 ChunkListener,因此它也适用于非面向项目的 tasklet(它在 tasklet 之前和之后被调用)。

ItemReadListener

在之前讨论跳过逻辑时,我们提到记录跳过的记录可能是有益的,以便以后处理它们。在读取错误的情况下,可以使用 ItemReaderListener 来完成此操作,如下面的接口定义所示

public interface ItemReadListener<T> extends StepListener {

    void beforeRead();
    void afterRead(T item);
    void onReadError(Exception ex);

}

beforeRead 方法在每次调用 ItemReader 上的读取之前被调用。afterRead 方法在每次成功调用读取之后被调用,并传递读取的项目。如果读取时出现错误,则会调用 onReadError 方法。提供的异常可以记录下来。

与该接口相对应的注解为

  • @BeforeRead

  • @AfterRead

  • @OnReadError

ItemProcessListener

ItemReadListener 一样,可以“监听”项目的处理,如下面的接口定义所示

public interface ItemProcessListener<T, S> extends StepListener {

    void beforeProcess(T item);
    void afterProcess(T item, S result);
    void onProcessError(T item, Exception e);

}

beforeProcess 方法在 ItemProcessor 上的 process 之前被调用,并传递要处理的项目。afterProcess 方法在项目成功处理后被调用。如果处理时出现错误,则会调用 onProcessError 方法。提供的异常和尝试处理的项目可以记录下来。

与该接口相对应的注解为

  • @BeforeProcess

  • @AfterProcess

  • @OnProcessError

ItemWriteListener

您可以使用 ItemWriteListener “监听”项目的写入,如下面的接口定义所示

public interface ItemWriteListener<S> extends StepListener {

    void beforeWrite(List<? extends S> items);
    void afterWrite(List<? extends S> items);
    void onWriteError(Exception exception, List<? extends S> items);

}

beforeWrite 方法在 ItemWriter 上的 write 之前被调用,并传递写入的项目列表。afterWrite 方法在项目成功写入后被调用,但在提交与块处理相关的交易之前。如果写入时出现错误,则会调用 onWriteError 方法。提供的异常和尝试写入的项目可以记录下来。

与该接口相对应的注解为

  • @BeforeWrite

  • @AfterWrite

  • @OnWriteError

SkipListener

ItemReadListenerItemProcessListenerItemWriteListener 都提供了用于通知错误的机制,但没有一个通知您记录实际上已被跳过。例如,即使项目被重试并成功,也会调用 onWriteError。出于这个原因,有一个单独的接口用于跟踪跳过的项目,如下面的接口定义所示

public interface SkipListener<T,S> extends StepListener {

    void onSkipInRead(Throwable t);
    void onSkipInProcess(T item, Throwable t);
    void onSkipInWrite(S item, Throwable t);

}

onSkipInRead 在读取时跳过项目时被调用。需要注意的是,回滚可能会导致同一个项目被注册为跳过多次。onSkipInWrite 在写入时跳过项目时被调用。因为项目已成功读取(未跳过),所以它还将项目本身作为参数提供。

与该接口相对应的注解为

  • @OnSkipInRead

  • @OnSkipInWrite

  • @OnSkipInProcess

跳过监听器和事务

SkipListener 最常见的用例之一是记录跳过的项目,以便其他批处理过程甚至人工过程可以用来评估和解决导致跳过的问题。由于在许多情况下,原始事务可能会回滚,因此 Spring Batch 提供了以下两项保证:

  • 每个项目只调用一次相应的跳过方法(取决于错误发生的时间)。

  • SkipListener 始终在事务提交之前调用。这样做是为了确保监听器调用的任何事务性资源不会因 ItemWriter 中的故障而回滚。