拦截 Step 执行

Job 一样,在 Step 执行期间,用户可能需要在许多事件中执行某些功能。例如,为了写入需要页脚的平面文件,当 Step 完成时,需要通知 ItemWriter,以便可以写入页脚。这可以通过众多 Step 范围监听器之一实现。

你可以通过 listeners 元素将实现 StepListener 扩展之一(但不是该接口本身,因为它为空)的任何类应用于步骤。listeners 元素在步骤、任务或块声明中有效。我们建议你将监听器声明在其功能适用的级别,或者,如果它是多功能的(例如 StepExecutionListenerItemReadListener),则将其声明在其适用的最细粒度级别。

  • Java

  • XML

以下示例显示了在 Java 中应用于块级别的监听器

Java 配置
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(10).transactionManager(transactionManager)
				.reader(reader())
				.writer(writer())
				.listener(chunkListener())
				.build();
}

以下示例显示了在 XML 中应用于块级别的监听器

XML 配置
<step id="step1">
    <tasklet>
        <chunk reader="reader" writer="writer" commit-interval="10"/>
        <listeners>
            <listener ref="chunkListener"/>
        </listeners>
    </tasklet>
</step>

如果使用命名空间 <step> 元素或其中一个 *StepFactoryBean 工厂,则本身实现 StepListener 接口之一的 ItemReaderItemWriterItemProcessor 会自动注册到 Step。这仅适用于直接注入 Step 的组件。如果监听器嵌套在另一个组件中,则需要明确注册它(如前所述,请参阅 注册 ItemStreamStep)。

除了 StepListener 接口之外,还提供了注解来解决相同的问题。普通 Java 对象的方法可以使用这些注解,然后将其转换为相应的 StepListener 类型。通常还会注解块组件的自定义实现,例如 ItemReaderItemWriterTasklet。XML 解析器会分析 <listener/> 元素的注解,并在构建器中使用 listener 方法进行注册,因此你只需使用 XML 命名空间或构建器将监听器注册到步骤即可。

StepExecutionListener

StepExecutionListener 表示 Step 执行的最通用监听器。它允许在 Step 开始之前和结束之后进行通知,无论它是正常结束还是失败,如下例所示

public interface StepExecutionListener extends StepListener {

    void beforeStep(StepExecution stepExecution);

    ExitStatus afterStep(StepExecution stepExecution);

}

afterStep 的返回类型是 ExitStatus,以便让监听器有机会修改 Step 完成时返回的退出代码。

与此接口对应的注解是

  • @BeforeStep

  • @AfterStep

ChunkListener

“块”定义为在事务范围内处理的项。在每个提交间隔提交事务会提交一个块。你可以使用 ChunkListener 在块开始处理之前或块成功完成之后执行逻辑,如下面的接口定义所示

public interface ChunkListener extends StepListener {

    void beforeChunk(ChunkContext context);
    void afterChunk(ChunkContext context);
    void afterChunkError(ChunkContext context);

}

beforeChunk 方法在事务启动后但在 ItemReader 开始读取之前调用。相反,afterChunk 在块提交后调用(如果回滚则根本不调用)。

与此接口对应的注解是

  • @BeforeChunk

  • @AfterChunk

  • @AfterChunkError

当没有块声明时,你可以应用 ChunkListenerTaskletStep 负责调用 ChunkListener,因此它也适用于非项导向的任务(它在任务之前和之后被调用)。

ChunkListener 不设计为抛出受检异常。错误必须在实现中处理,否则步骤将终止。

ItemReadListener

在前面讨论跳过逻辑时,提到记录跳过的记录可能很有益,以便以后处理。在读取错误的情况下,这可以通过 ItemReaderListener 完成,如下面的接口定义所示

public interface ItemReadListener<T> extends StepListener {

    void beforeRead();
    void afterRead(T item);
    void onReadError(Exception ex);

}

beforeRead 方法在每次调用 ItemReader 上的 read 之前调用。afterRead 方法在每次成功调用 read 之后调用,并传递已读取的项。如果在读取时发生错误,则调用 onReadError 方法。提供了遇到的异常,以便可以记录它。

与此接口对应的注解是

  • @BeforeRead

  • @AfterRead

  • @OnReadError

ItemProcessListener

ItemReadListener 一样,可以“监听”项的处理,如下面的接口定义所示

public interface ItemProcessListener<T, S> extends StepListener {

    void beforeProcess(T item);
    void afterProcess(T item, S result);
    void onProcessError(T item, Exception e);

}

beforeProcess 方法在 ItemProcessor 上的 process 之前调用,并传递要处理的项。afterProcess 方法在项成功处理后调用。如果在处理时发生错误,则调用 onProcessError 方法。提供了遇到的异常和尝试处理的项,以便可以记录它们。

与此接口对应的注解是

  • @BeforeProcess

  • @AfterProcess

  • @OnProcessError

ItemWriteListener

你可以使用 ItemWriteListener “监听”项的写入,如下面的接口定义所示

public interface ItemWriteListener<S> extends StepListener {

    void beforeWrite(List<? extends S> items);
    void afterWrite(List<? extends S> items);
    void onWriteError(Exception exception, List<? extends S> items);

}

beforeWrite 方法在 ItemWriter 上的 write 之前调用,并传递要写入的项列表。afterWrite 方法在项成功写入后调用,但在提交与块处理相关的事务之前。如果在写入时发生错误,则调用 onWriteError 方法。提供了遇到的异常和尝试写入的项,以便可以记录它们。

与此接口对应的注解是

  • @BeforeWrite

  • @AfterWrite

  • @OnWriteError

SkipListener

ItemReadListenerItemProcessListenerItemWriteListener 都提供了错误通知机制,但没有一个会告知你记录实际上已被跳过。例如,即使项被重试并成功,也会调用 onWriteError。因此,有一个单独的接口用于跟踪跳过的项,如下面的接口定义所示

public interface SkipListener<T,S> extends StepListener {

    void onSkipInRead(Throwable t);
    void onSkipInProcess(T item, Throwable t);
    void onSkipInWrite(S item, Throwable t);

}

每当在读取时跳过项时,都会调用 onSkipInRead。应该注意的是,回滚可能会导致同一个项被多次注册为跳过。在写入时跳过项时,会调用 onSkipInWrite。由于该项已成功读取(未跳过),因此也将其自身作为参数提供。

与此接口对应的注解是

  • @OnSkipInRead

  • @OnSkipInWrite

  • @OnSkipInProcess

跳过监听器和事务

SkipListener 最常见的用例之一是记录跳过的项,以便可以使用另一个批处理过程甚至人工过程来评估和修复导致跳过的问题。由于在许多情况下原始事务可能会回滚,Spring Batch 做出以下两个保证

  • 适当的跳过方法(取决于错误发生的时间)每个项只调用一次。

  • SkipListener 始终在事务提交之前调用。这是为了确保监听器调用的任何事务资源不会因 ItemWriter 中的失败而回滚。

© . This site is unofficial and not affiliated with VMware.