批处理和事务
没有重试的简单批处理
考虑以下没有重试的嵌套批处理的简单示例。它展示了批处理的常见场景:处理输入源直到耗尽,并在处理“块”结束时定期提交。
1 | REPEAT(until=exhausted) { | 2 | TX { 3 | REPEAT(size=5) { 3.1 | input; 3.2 | output; | } | } | | }
输入操作(3.1)可以是基于消息的接收(例如来自 JMS)或基于文件的读取,但为了恢复并继续处理以完成整个作业,它必须是事务性的。3.2 的操作也是如此。它必须是事务性的或幂等的。
如果 REPEAT
(3)处的块由于 3.2 处的数据库异常而失败,则 TX
(2)必须回滚整个块。
简单无状态重试
对于非事务性操作,例如调用 Web 服务或其他远程资源,使用重试也很有用,如下例所示
0 | TX { 1 | input; 1.1 | output; 2 | RETRY { 2.1 | remote access; | } | }
这实际上是重试最实用的应用之一,因为远程调用比数据库更新更容易失败并可重试。只要远程访问(2.1)最终成功,事务 TX
(0)就会提交。如果远程访问(2.1)最终失败,则保证事务 TX
(0)回滚。
典型的重复重试模式
最典型的批处理模式是在块的内部块中添加重试,如下例所示
1 | REPEAT(until=exhausted, exception=not critical) { | 2 | TX { 3 | REPEAT(size=5) { | 4 | RETRY(stateful, exception=deadlock loser) { 4.1 | input; 5 | } PROCESS { 5.1 | output; 6 | } SKIP and RECOVER { | notify; | } | | } | } | | }
内部 RETRY
(4)块标记为“有状态”。有关有状态重试的描述,请参见 典型用例。这意味着,如果重试 PROCESS
(5)块失败,则 RETRY
(4)的行为如下
-
抛出异常,在块级别回滚事务
TX
(2),并允许将该项重新提交到输入队列。 -
当该项再次出现时,它可能会被重试,具体取决于现有的重试策略,并再次执行
PROCESS
(5)。第二次及后续尝试可能会再次失败并重新抛出异常。 -
最终,该项将最后一次出现。重试策略不允许再次尝试,因此
PROCESS
(5)永远不会执行。在这种情况下,我们将遵循RECOVER
(6)路径,有效地“跳过”正在接收和处理的项。
请注意,计划中用于 RETRY
(4)的符号明确显示输入步骤(4.1)是重试的一部分。它还明确表明,存在两种处理备用路径:正常情况,如 PROCESS
(5)所示,以及恢复路径,如 RECOVER
(6)的单独块中所示。这两个备用路径完全不同。在正常情况下,只执行其中一个。
在特殊情况下(例如特殊的 TranscationValidException
类型),重试策略可能能够确定在 PROCESS
(5)刚刚失败后,可以在最后一次尝试中采用 RECOVER
(6)路径,而不是等待该项重新提交。这不是默认行为,因为它需要详细了解 PROCESS
(5)块内部发生的情况,而这些信息通常不可用。例如,如果输出在失败之前包含写入访问,则应重新抛出异常以确保事务完整性。
外部 REPEAT
(1) 中的完成策略对于计划的成功至关重要。如果输出 (5.1) 失败,它可能会抛出异常(通常会,如描述的那样),在这种情况下,事务 TX
(2) 失败,并且异常可能会向上传播到外部批处理 REPEAT
(1)。我们不希望整个批处理停止,因为如果我们再次尝试,RETRY
(4) 仍然可能成功,所以我们在外部 REPEAT
(1) 中添加 exception=not critical
。
但是,请注意,如果 TX
(2) 失败,并且我们确实尝试再次执行,由于外部完成策略,内部 REPEAT
(3) 中下一个处理的项目不能保证是刚刚失败的项目。它可能是,但这取决于输入 (4.1) 的实现。因此,输出 (5.1) 可能会在新的项目或旧项目上再次失败。批处理的客户端不应该假设每次 RETRY
(4) 尝试都将处理与上次失败的尝试相同的项目。例如,如果 REPEAT
(1) 的终止策略是在 10 次尝试后失败,它会在 10 次连续尝试后失败,但不一定是在同一个项目上。这与整体重试策略一致。内部 RETRY
(4) 了解每个项目的历史记录,并且可以决定是否对其进行另一次尝试。
异步块处理
通过将外部批处理配置为使用 AsyncTaskExecutor
,可以并发执行 典型示例 中的内部批次或块。外部批处理在所有块完成之前等待完成。以下示例显示了异步块处理
1 | REPEAT(until=exhausted, concurrent, exception=not critical) { | 2 | TX { 3 | REPEAT(size=5) { | 4 | RETRY(stateful, exception=deadlock loser) { 4.1 | input; 5 | } PROCESS { | output; 6 | } RECOVER { | recover; | } | | } | } | | }
异步项目处理
原则上,典型示例 中块中的单个项目也可以并发处理。在这种情况下,事务边界必须移动到单个项目的级别,以便每个事务都在单个线程上,如下面的示例所示
1 | REPEAT(until=exhausted, exception=not critical) { | 2 | REPEAT(size=5, concurrent) { | 3 | TX { 4 | RETRY(stateful, exception=deadlock loser) { 4.1 | input; 5 | } PROCESS { | output; 6 | } RECOVER { | recover; | } | } | | } | | }
此方案牺牲了简单方案中将所有事务资源打包在一起的优化优势。它只有在处理成本 (5) 远高于事务管理成本 (3) 时才有用。
批处理和事务传播之间的交互
批处理重试和事务管理之间存在比我们理想情况下更紧密的耦合。特别是,如果事务管理器不支持 NESTED 传播,则无法使用无状态重试来重试数据库操作。
以下示例使用无重复重试
1 | TX { | 1.1 | input; 2.2 | database access; 2 | RETRY { 3 | TX { 3.1 | database access; | } | } | | }
同样,出于相同的原因,即使 RETRY
(2) 最终成功,内部事务 TX
(3) 也会导致外部事务 TX
(1) 失败。
不幸的是,正如以下示例所示,相同的效果会从重试块向上渗透到周围的重复批处理(如果有)。
1 | TX { | 2 | REPEAT(size=5) { 2.1 | input; 2.2 | database access; 3 | RETRY { 4 | TX { 4.1 | database access; | } | } | } | | }
现在,如果 TX (3) 回滚,它可能会污染 TX (1) 的整个批处理,并迫使其在结束时回滚。
非默认传播怎么样?
-
在前面的示例中,
TX
(3) 上的PROPAGATION_REQUIRES_NEW
可以防止外部TX
(1) 被污染,前提是两个事务最终都成功。但如果TX
(3) 提交而TX
(1) 回滚,则TX
(3) 保持提交状态,因此我们违反了TX
(1) 的事务契约。如果TX
(3) 回滚,TX
(1) 不一定会回滚(但在实践中它可能会回滚,因为重试会抛出回滚异常)。 -
TX
(3) 上的PROPAGATION_NESTED
按照我们在重试情况下(以及对于包含跳过的批处理)的要求工作:TX
(3) 可以提交,但随后可以被外部事务TX
(1) 回滚。如果TX
(3) 回滚,TX
(1) 在实践中会回滚。此选项仅在某些平台上可用,不包括 Hibernate 或 JTA,但它是唯一始终有效的选项。
因此,如果重试块包含任何数据库访问,则 NESTED
模式是最佳选择。
特殊情况:具有正交资源的事务
对于没有嵌套数据库事务的简单情况,默认传播始终可以。考虑以下示例,其中 SESSION
和 TX
不是全局 XA
资源,因此它们的资源是正交的
0 | SESSION { 1 | input; 2 | RETRY { 3 | TX { 3.1 | database access; | } | } | }
这里有一个事务性消息,SESSION
(0),但它不参与与PlatformTransactionManager
的其他事务,因此当TX
(3) 开始时不会传播。在RETRY
(2) 块之外没有数据库访问。如果TX
(3) 失败,然后最终在重试时成功,SESSION
(0) 可以提交(独立于TX
块)。这类似于传统的“尽力而为的一阶段提交”场景。最糟糕的情况是在RETRY
(2) 成功并且SESSION
(0) 无法提交(例如,因为消息系统不可用)时出现重复消息。
无状态重试无法恢复
在前面显示的典型示例中,无状态重试和有状态重试之间的区别很重要。实际上,最终是事务约束迫使了这种区别,而这种约束也使区别的原因显而易见。
我们从观察开始,除非将项目处理包装在事务中,否则无法跳过失败的项目并成功提交块的其余部分。因此,我们将典型的批处理执行计划简化为如下所示
0 | REPEAT(until=exhausted) { | 1 | TX { 2 | REPEAT(size=5) { | 3 | RETRY(stateless) { 4 | TX { 4.1 | input; 4.2 | database access; | } 5 | } RECOVER { 5.1 | skip; | } | | } | } | | }
前面的示例显示了一个无状态的RETRY
(3),它有一个RECOVER
(5) 路径,在最终尝试失败后启动。无状态
标签表示该块重复执行,直到达到某个限制,而不会重新抛出任何异常。这只有在事务TX
(4) 具有嵌套传播的情况下才有效。
如果内部TX
(4) 具有默认传播属性并回滚,它会污染外部TX
(1)。事务管理器假定内部事务已损坏事务资源,因此无法再次使用它。
对嵌套传播的支持非常少,因此我们选择在当前版本的 Spring Batch 中不支持使用无状态重试进行恢复。可以通过使用前面显示的典型模式来实现相同的效果(以重复更多处理为代价)。