重复
RepeatTemplate
批处理是关于重复操作的,无论是作为简单的优化还是作为作业的一部分。为了对重复进行策略化和泛化,并提供一个类似迭代器的框架,Spring Batch 提供了 RepeatOperations
接口。RepeatOperations
接口的定义如下:
public interface RepeatOperations {
RepeatStatus iterate(RepeatCallback callback) throws RepeatException;
}
回调是一个接口,其定义如下,它允许您插入一些要重复的业务逻辑:
public interface RepeatCallback {
RepeatStatus doInIteration(RepeatContext context) throws Exception;
}
回调会重复执行,直到实现确定迭代应该结束。这些接口中的返回值是一个枚举值,可以是 RepeatStatus.CONTINUABLE
或 RepeatStatus.FINISHED
。RepeatStatus
枚举向重复操作的调用者传达有关是否还有工作剩余的信息。一般来说,RepeatOperations
的实现应该检查 RepeatStatus
并将其用作结束迭代的决策的一部分。任何希望向调用者发出没有剩余工作的信号的回调都可以返回 RepeatStatus.FINISHED
。
RepeatOperations
最简单的通用实现是 RepeatTemplate
。
RepeatTemplate template = new RepeatTemplate();
template.setCompletionPolicy(new SimpleCompletionPolicy(2));
template.iterate(new RepeatCallback() {
public RepeatStatus doInIteration(RepeatContext context) {
// Do stuff in batch...
return RepeatStatus.CONTINUABLE;
}
});
在前面的示例中,我们返回 RepeatStatus.CONTINUABLE
,以表明还有更多工作要做。回调也可以返回 RepeatStatus.FINISHED
,以向调用者发出信号,表明没有剩余工作。某些迭代可能会因回调中正在执行的工作的内在考虑因素而终止。其他迭代实际上是无限循环(就回调而言),完成决策委托给外部策略,如前面的示例所示。
完成策略
在 RepeatTemplate
内部,iterate
方法中循环的终止由 CompletionPolicy
决定,它也是 RepeatContext
的工厂。RepeatTemplate
负责使用当前策略创建 RepeatContext
,并在迭代的每个阶段将其传递给 RepeatCallback
。在回调完成其 doInIteration
后,RepeatTemplate
必须调用 CompletionPolicy
以要求其更新其状态(该状态将存储在 RepeatContext
中)。然后,它询问策略迭代是否已完成。
Spring Batch 提供了一些简单的通用 CompletionPolicy
实现。SimpleCompletionPolicy
允许执行最多固定次数(RepeatStatus.FINISHED
随时强制提前完成)。
对于更复杂的决策,用户可能需要实现自己的完成策略。例如,一个批处理窗口,它阻止批处理作业在在线系统使用时执行,将需要一个自定义策略。
异常处理
如果在 RepeatCallback
内部抛出异常,RepeatTemplate
会咨询 ExceptionHandler
,它可以决定是否重新抛出异常。
以下清单显示了 ExceptionHandler
接口定义
public interface ExceptionHandler {
void handleException(RepeatContext context, Throwable throwable)
throws Throwable;
}
一个常见的用例是统计给定类型异常的数量,并在达到限制时失败。为此,Spring Batch 提供了 SimpleLimitExceptionHandler
和一个稍微灵活的 RethrowOnThresholdExceptionHandler
。SimpleLimitExceptionHandler
具有一个限制属性和一个异常类型,应该与当前异常进行比较。提供的类型的全部子类也会被统计。给定类型的异常会被忽略,直到达到限制,然后它们会被重新抛出。其他类型的异常总是会被重新抛出。
SimpleLimitExceptionHandler
的一个重要的可选属性是名为 useParent
的布尔标志。默认情况下它为 false
,因此限制只在当前 RepeatContext
中被计算。当设置为 true
时,限制在嵌套迭代(例如步骤中的一个块集)中的兄弟上下文之间保持一致。
监听器
通常,能够为跨多个不同迭代的跨领域问题接收额外的回调非常有用。为此,Spring Batch 提供了 RepeatListener
接口。RepeatTemplate
允许用户注册 RepeatListener
实现,并在迭代期间提供带有 RepeatContext
和 RepeatStatus
的回调(如果可用)。
RepeatListener
接口具有以下定义
public interface RepeatListener {
void before(RepeatContext context);
void after(RepeatContext context, RepeatStatus result);
void open(RepeatContext context);
void onError(RepeatContext context, Throwable e);
void close(RepeatContext context);
}
open
和 close
回调在整个迭代之前和之后出现。before
、after
和 onError
应用于单个 RepeatCallback
调用。
请注意,当有多个监听器时,它们位于一个列表中,因此存在一个顺序。在这种情况下,open
和 before
按相同顺序调用,而 after
、onError
和 close
按相反顺序调用。
并行处理
RepeatOperations
的实现并不局限于顺序执行回调。一些实现能够并行执行回调非常重要。为此,Spring Batch 提供了 TaskExecutorRepeatTemplate
,它使用 Spring 的 TaskExecutor
策略来运行 RepeatCallback
。默认情况下,它使用 SynchronousTaskExecutor
,其效果是在同一个线程中执行整个迭代(与普通的 RepeatTemplate
相同)。
声明式迭代
有时,您知道您希望每次发生时都重复某些业务处理。这方面的经典示例是消息管道的优化。如果一批消息频繁到达,处理它们比为每条消息承担单独事务的成本更有效。Spring Batch 提供了一个 AOP 拦截器,它将方法调用包装在 RepeatOperations
对象中,以实现此目的。RepeatOperationsInterceptor
执行被拦截的方法,并根据提供的 RepeatTemplate
中的 CompletionPolicy
重复执行。
-
Java
-
XML
以下示例使用 Java 配置来重复对名为 processMessage
的方法的服务调用(有关如何配置 AOP 拦截器的更多详细信息,请参阅 <<docs.spring.io/spring-framework/docs/current/reference/html/core.html#aop,Spring 用户指南>>)。
@Bean
public MyService myService() {
ProxyFactory factory = new ProxyFactory(RepeatOperations.class.getClassLoader());
factory.setInterfaces(MyService.class);
factory.setTarget(new MyService());
MyService service = (MyService) factory.getProxy();
JdkRegexpMethodPointcut pointcut = new JdkRegexpMethodPointcut();
pointcut.setPatterns(".*processMessage.*");
RepeatOperationsInterceptor interceptor = new RepeatOperationsInterceptor();
((Advised) service).addAdvisor(new DefaultPointcutAdvisor(pointcut, interceptor));
return service;
}
以下示例展示了使用 Spring AOP 命名空间来重复对名为 processMessage
的方法的服务调用的声明式迭代(有关如何配置 AOP 拦截器的更多详细信息,请参阅 <<docs.spring.io/spring-framework/docs/current/reference/html/core.html#aop,Spring 用户指南>>)。
<aop:config>
<aop:pointcut id="transactional"
expression="execution(* com..*Service.processMessage(..))" />
<aop:advisor pointcut-ref="transactional"
advice-ref="retryAdvice" order="-1"/>
</aop:config>
<bean id="retryAdvice" class="org.spr...RepeatOperationsInterceptor"/>
前面的示例在拦截器中使用默认的 RepeatTemplate
。要更改策略、监听器和其他详细信息,您可以将 RepeatTemplate
的实例注入拦截器。
如果被拦截的方法返回 void
,拦截器始终返回 RepeatStatus.CONTINUABLE
(因此,如果 CompletionPolicy
没有有限的终点,则存在无限循环的风险)。否则,它将返回 RepeatStatus.CONTINUABLE
,直到被拦截方法的返回值为 null
。此时,它将返回 RepeatStatus.FINISHED
。因此,目标方法中的业务逻辑可以通过返回 null
或抛出由提供的 RepeatTemplate
中的 ExceptionHandler
重新抛出的异常来表明没有更多工作要做。