批处理和事务

无需重试的简单批处理

考虑以下无需重试的嵌套批处理的简单示例。它展示了批处理的常见场景:处理输入源直到耗尽,并在处理“块”结束时定期提交。

1   |  REPEAT(until=exhausted) {
|
2   |    TX {
3   |      REPEAT(size=5) {
3.1 |        input;
3.2 |        output;
|      }
|    }
|
|  }

输入操作 (3.1) 可以是基于消息的接收(例如来自 JMS)或基于文件的读取,但为了恢复并继续处理以有机会完成整个作业,它必须是事务性的。3.2 的操作也同样适用。它必须是事务性的或幂等的。

如果由于 3.2 处的数据库异常导致 `REPEAT` (3) 处的块失败,则 `TX` (2) 必须回滚整个块。

简单的无状态重试

对于非事务性操作(例如对 Web 服务或其他远程资源的调用)使用重试也很有用,如下例所示

0   |  TX {
1   |    input;
1.1 |    output;
2   |    RETRY {
2.1 |      remote access;
|    }
|  }

这实际上是重试最常用的应用之一,因为远程调用比数据库更新更容易失败且更易于重试。只要远程访问 (2.1) 最终成功,事务 `TX` (0) 就会提交。如果远程访问 (2.1) 最终失败,则保证事务 `TX` (0) 回滚。

典型的重复重试模式

最典型的批处理模式是在块的内部块中添加重试,如下例所示

1   |  REPEAT(until=exhausted, exception=not critical) {
|
2   |    TX {
3   |      REPEAT(size=5) {
|
4   |        RETRY(stateful, exception=deadlock loser) {
4.1 |          input;
5   |        } PROCESS {
5.1 |          output;
6   |        } SKIP and RECOVER {
|          notify;
|        }
|
|      }
|    }
|
|  }

内部 `RETRY` (4) 块标记为“有状态”。参见 典型用例 以了解有状态重试的描述。这意味着,如果重试 `PROCESS` (5) 块失败,则 `RETRY` (4) 的行为如下:

  1. 抛出异常,回滚块级别的 `TX` (2) 事务,并允许将项目重新提交到输入队列。

  2. 当项目重新出现时,它可能会根据现有的重试策略进行重试,并再次执行 `PROCESS` (5)。第二次及后续尝试也可能再次失败并重新抛出异常。

  3. 最终,项目最后一次重新出现。重试策略不允许再次尝试,因此永远不会执行 `PROCESS` (5)。在这种情况下,我们将遵循 `RECOVER` (6) 路径,有效地“跳过”正在接收和处理的项目。

请注意,计划中用于 `RETRY` (4) 的符号明确表明输入步骤 (4.1) 是重试的一部分。它还明确指出存在两种处理备选路径:正常情况,如 `PROCESS` (5) 所示;以及恢复路径,如 `RECOVER` (6) 的单独块中所示。这两个备选路径是完全不同的。在正常情况下,只执行其中一个。

在特殊情况下(例如特殊的 `TranscationValidException` 类型),重试策略可能能够确定在 `PROCESS` (5) 刚刚失败后的最后一次尝试中可以采用 `RECOVER` (6) 路径,而无需等待项目重新提交。这不是默认行为,因为它需要详细了解 `PROCESS` (5) 块内部发生的情况,而这通常是不可用的。例如,如果输出在失败前包含写访问,则应重新抛出异常以确保事务完整性。

外部 `REPEAT` (1) 中的完成策略对于计划的成功至关重要。如果输出 (5.1) 失败,它可能会抛出异常(通常会抛出,如所述),在这种情况下,事务 `TX` (2) 失败,并且异常可能会传播到外部批处理 `REPEAT` (1)。我们不希望整个批处理停止,因为如果我们再次尝试,`RETRY` (4) 仍然可能成功,因此我们在外部 `REPEAT` (1) 中添加 `exception=not critical`。

但是,需要注意的是,如果TX (2) 失败并且我们确实尝试再次执行,那么由于外部完成策略,在内部REPEAT (3) 中接下来处理的项并不保证是刚刚失败的那一项。它可能是,但这取决于输入的实现 (4.1)。因此,输出 (5.1) 可能会在一个新的项或旧的项上再次失败。批处理的客户端不应假设每次RETRY (4) 尝试都会处理与上次失败相同的项。例如,如果REPEAT (1) 的终止策略是在10次尝试后失败,则它会在10次连续尝试后失败,但不一定是在同一项上。这与整体重试策略一致。内部RETRY (4) 了解每个项目的历史记录,并可以决定是否再次尝试。

异步块处理

典型示例中,可以通过将外部批处理配置为使用AsyncTaskExecutor来并发执行内部批处理或块。外部批处理等待所有块完成之后再完成。以下示例显示了异步块处理

1   |  REPEAT(until=exhausted, concurrent, exception=not critical) {
|
2   |    TX {
3   |      REPEAT(size=5) {
|
4   |        RETRY(stateful, exception=deadlock loser) {
4.1 |          input;
5   |        } PROCESS {
|          output;
6   |        } RECOVER {
|          recover;
|        }
|
|      }
|    }
|
|  }

异步项处理

原则上,典型示例中块中的各个项也可以并发处理。在这种情况下,事务边界必须移动到单个项级别,以便每个事务都在单个线程上,如下例所示

1   |  REPEAT(until=exhausted, exception=not critical) {
|
2   |    REPEAT(size=5, concurrent) {
|
3   |      TX {
4   |        RETRY(stateful, exception=deadlock loser) {
4.1 |          input;
5   |        } PROCESS {
|          output;
6   |        } RECOVER {
|          recover;
|        }
|      }
|
|    }
|
|  }

此方案牺牲了简单方案所具有的优化优势,即所有事务资源都被一起分块。只有当处理 (5) 的成本远高于事务管理 (3) 的成本时,它才有用。

批处理和事务传播之间的交互

批处理重试和事务管理之间存在比理想情况更紧密的耦合。特别是,如果事务管理器不支持 NESTED 传播,则无法使用无状态重试来重试数据库操作。

以下示例使用重试而不重复

1   |  TX {
|
1.1 |    input;
2.2 |    database access;
2   |    RETRY {
3   |      TX {
3.1 |        database access;
|      }
|    }
|
|  }

同样,由于同样的原因,即使RETRY (2) 最终成功,内部事务TX (3) 也会导致外部事务TX (1) 失败。

不幸的是,如果存在,同样的效果会从重试块向上渗透到周围的重复批处理,如下例所示

1   |  TX {
|
2   |    REPEAT(size=5) {
2.1 |      input;
2.2 |      database access;
3   |      RETRY {
4   |        TX {
4.1 |          database access;
|        }
|      }
|    }
|
|  }

现在,如果 TX (3) 回滚,它可能会污染 TX (1) 的整个批处理,并强制它在结束时回滚。

非默认传播呢?

  • 在前面的示例中,TX (3) 处的PROPAGATION_REQUIRES_NEW 可以防止如果两个事务最终都成功,则外部TX (1) 被污染。但是,如果TX (3) 提交而TX (1) 回滚,则TX (3) 保持提交状态,因此我们违反了TX (1) 的事务契约。如果TX (3) 回滚,TX (1) 不一定会回滚(但在实践中它可能会回滚,因为重试抛出了回滚异常)。

  • TX (3) 处的PROPAGATION_NESTED 在重试情况下(以及带有跳过的批处理)按我们的要求工作:TX (3) 可以提交,但随后可以被外部事务TX (1) 回滚。如果TX (3) 回滚,则TX (1) 在实践中也会回滚。此选项仅在某些平台上可用,不包括 Hibernate 或 JTA,但它是唯一始终有效的方法。

因此,如果重试块包含任何数据库访问,则NESTED 模式是最佳选择。

特殊情况:具有正交资源的事务

对于没有嵌套数据库事务的简单情况,默认传播始终是可以的。考虑以下示例,其中SESSIONTX 不是全局XA 资源,因此它们的资源是正交的

0   |  SESSION {
1   |    input;
2   |    RETRY {
3   |      TX {
3.1 |        database access;
|      }
|    }
|  }

这里有一个事务性消息SESSION (0),但它不参与使用PlatformTransactionManager 的其他事务,因此当TX (3) 启动时不会传播。RETRY (2) 块之外没有数据库访问。如果TX (3) 失败,然后最终在重试时成功,则SESSION (0) 可以提交(独立于TX 块)。这类似于普通的“尽力而为的一次性提交”方案。最糟糕的情况是在RETRY (2) 成功且SESSION (0) 无法提交(例如,因为消息系统不可用)时出现重复消息。

无状态重试无法恢复

在前面显示的典型示例中,无状态重试和有状态重试之间的区别很重要。实际上,最终是一个事务约束强制了这种区别,而且这个约束也使区别的原因显而易见。

我们首先观察到,除非我们将项目处理包装在事务中,否则无法跳过失败的项目并成功提交块的其余部分。因此,我们将典型的批处理执行计划简化为如下所示

0   |  REPEAT(until=exhausted) {
|
1   |    TX {
2   |      REPEAT(size=5) {
|
3   |        RETRY(stateless) {
4   |          TX {
4.1 |            input;
4.2 |            database access;
|          }
5   |        } RECOVER {
5.1 |          skip;
|        }
|
|      }
|    }
|
|  }

前面的示例显示了一个无状态RETRY (3) 和一个在最终尝试失败后启动的RECOVER (5) 路径。stateless 标签表示该块重复执行,直到达到某个限制而不向上抛出任何异常。这只在事务TX (4) 具有嵌套传播时才有效。

如果内部TX (4) 具有默认传播属性并回滚,它会污染外部TX (1)。事务管理器假定内部事务已损坏事务资源,因此无法再次使用。

嵌套传播的支持非常少见,因此我们选择不在当前版本的 Spring Batch 中支持使用无状态重试进行恢复。通过使用前面显示的典型模式,始终可以实现相同的效果(以重复更多处理为代价)。