拦截 Step 执行
与 Job 一样,在 Step 执行期间,用户可能需要在许多事件中执行某些功能。例如,为了写入需要页脚的平面文件,当 Step 完成时,需要通知 ItemWriter,以便可以写入页脚。这可以通过众多 Step 范围监听器之一实现。
你可以通过 listeners 元素将实现 StepListener 扩展之一(但不是该接口本身,因为它为空)的任何类应用于步骤。listeners 元素在步骤、任务或块声明中有效。我们建议你将监听器声明在其功能适用的级别,或者,如果它是多功能的(例如 StepExecutionListener 和 ItemReadListener),则将其声明在其适用的最细粒度级别。
-
Java
-
XML
以下示例显示了在 Java 中应用于块级别的监听器
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
return new StepBuilder("step1", jobRepository)
.<String, String>chunk(10).transactionManager(transactionManager)
.reader(reader())
.writer(writer())
.listener(chunkListener())
.build();
}
以下示例显示了在 XML 中应用于块级别的监听器
<step id="step1">
<tasklet>
<chunk reader="reader" writer="writer" commit-interval="10"/>
<listeners>
<listener ref="chunkListener"/>
</listeners>
</tasklet>
</step>
如果使用命名空间 <step> 元素或其中一个 *StepFactoryBean 工厂,则本身实现 StepListener 接口之一的 ItemReader、ItemWriter 或 ItemProcessor 会自动注册到 Step。这仅适用于直接注入 Step 的组件。如果监听器嵌套在另一个组件中,则需要明确注册它(如前所述,请参阅 注册 ItemStream 到 Step)。
除了 StepListener 接口之外,还提供了注解来解决相同的问题。普通 Java 对象的方法可以使用这些注解,然后将其转换为相应的 StepListener 类型。通常还会注解块组件的自定义实现,例如 ItemReader、ItemWriter 或 Tasklet。XML 解析器会分析 <listener/> 元素的注解,并在构建器中使用 listener 方法进行注册,因此你只需使用 XML 命名空间或构建器将监听器注册到步骤即可。
StepExecutionListener
StepExecutionListener 表示 Step 执行的最通用监听器。它允许在 Step 开始之前和结束之后进行通知,无论它是正常结束还是失败,如下例所示
public interface StepExecutionListener extends StepListener {
void beforeStep(StepExecution stepExecution);
ExitStatus afterStep(StepExecution stepExecution);
}
afterStep 的返回类型是 ExitStatus,以便让监听器有机会修改 Step 完成时返回的退出代码。
与此接口对应的注解是
-
@BeforeStep -
@AfterStep
ChunkListener
“块”定义为在事务范围内处理的项。在每个提交间隔提交事务会提交一个块。你可以使用 ChunkListener 在块开始处理之前或块成功完成之后执行逻辑,如下面的接口定义所示
public interface ChunkListener extends StepListener {
void beforeChunk(ChunkContext context);
void afterChunk(ChunkContext context);
void afterChunkError(ChunkContext context);
}
beforeChunk 方法在事务启动后但在 ItemReader 开始读取之前调用。相反,afterChunk 在块提交后调用(如果回滚则根本不调用)。
与此接口对应的注解是
-
@BeforeChunk -
@AfterChunk -
@AfterChunkError
当没有块声明时,你可以应用 ChunkListener。TaskletStep 负责调用 ChunkListener,因此它也适用于非项导向的任务(它在任务之前和之后被调用)。
ChunkListener 不设计为抛出受检异常。错误必须在实现中处理,否则步骤将终止。
ItemReadListener
在前面讨论跳过逻辑时,提到记录跳过的记录可能很有益,以便以后处理。在读取错误的情况下,这可以通过 ItemReaderListener 完成,如下面的接口定义所示
public interface ItemReadListener<T> extends StepListener {
void beforeRead();
void afterRead(T item);
void onReadError(Exception ex);
}
beforeRead 方法在每次调用 ItemReader 上的 read 之前调用。afterRead 方法在每次成功调用 read 之后调用,并传递已读取的项。如果在读取时发生错误,则调用 onReadError 方法。提供了遇到的异常,以便可以记录它。
与此接口对应的注解是
-
@BeforeRead -
@AfterRead -
@OnReadError
ItemProcessListener
与 ItemReadListener 一样,可以“监听”项的处理,如下面的接口定义所示
public interface ItemProcessListener<T, S> extends StepListener {
void beforeProcess(T item);
void afterProcess(T item, S result);
void onProcessError(T item, Exception e);
}
beforeProcess 方法在 ItemProcessor 上的 process 之前调用,并传递要处理的项。afterProcess 方法在项成功处理后调用。如果在处理时发生错误,则调用 onProcessError 方法。提供了遇到的异常和尝试处理的项,以便可以记录它们。
与此接口对应的注解是
-
@BeforeProcess -
@AfterProcess -
@OnProcessError
ItemWriteListener
你可以使用 ItemWriteListener “监听”项的写入,如下面的接口定义所示
public interface ItemWriteListener<S> extends StepListener {
void beforeWrite(List<? extends S> items);
void afterWrite(List<? extends S> items);
void onWriteError(Exception exception, List<? extends S> items);
}
beforeWrite 方法在 ItemWriter 上的 write 之前调用,并传递要写入的项列表。afterWrite 方法在项成功写入后调用,但在提交与块处理相关的事务之前。如果在写入时发生错误,则调用 onWriteError 方法。提供了遇到的异常和尝试写入的项,以便可以记录它们。
与此接口对应的注解是
-
@BeforeWrite -
@AfterWrite -
@OnWriteError
SkipListener
ItemReadListener、ItemProcessListener 和 ItemWriteListener 都提供了错误通知机制,但没有一个会告知你记录实际上已被跳过。例如,即使项被重试并成功,也会调用 onWriteError。因此,有一个单独的接口用于跟踪跳过的项,如下面的接口定义所示
public interface SkipListener<T,S> extends StepListener {
void onSkipInRead(Throwable t);
void onSkipInProcess(T item, Throwable t);
void onSkipInWrite(S item, Throwable t);
}
每当在读取时跳过项时,都会调用 onSkipInRead。应该注意的是,回滚可能会导致同一个项被多次注册为跳过。在写入时跳过项时,会调用 onSkipInWrite。由于该项已成功读取(未跳过),因此也将其自身作为参数提供。
与此接口对应的注解是
-
@OnSkipInRead -
@OnSkipInWrite -
@OnSkipInProcess