创建自定义 ItemReaders 和 ItemWriters
到目前为止,本章已经讨论了Spring Batch中读写的基本契约以及一些常用的实现方式。然而,这些都是相当通用的,并且存在许多开箱即用的实现可能无法覆盖的潜在场景。本节通过一个简单的例子,展示如何创建自定义的ItemReader和ItemWriter实现并正确实现它们的契约。ItemReader还实现了ItemStream,以说明如何使读取器或写入器可重启。
自定义ItemReader示例
为了本例的目的,我们创建一个简单的ItemReader实现,它从提供的列表中读取。我们首先实现ItemReader最基本的契约,即read方法,如以下代码所示:
public class CustomItemReader<T> implements ItemReader<T> {
List<T> items;
public CustomItemReader(List<T> items) {
this.items = items;
}
public T read() throws Exception, UnexpectedInputException,
NonTransientResourceException, ParseException {
if (!items.isEmpty()) {
return items.remove(0);
}
return null;
}
}
前面的类接收一个项目列表,然后一次返回一个,并从列表中移除。当列表为空时,它返回null,从而满足ItemReader最基本的要求,如以下测试代码所示:
List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
ItemReader itemReader = new CustomItemReader<>(items);
assertEquals("1", itemReader.read());
assertEquals("2", itemReader.read());
assertEquals("3", itemReader.read());
assertNull(itemReader.read());
使ItemReader可重启
最后的挑战是使ItemReader可重启。目前,如果处理中断并重新开始,ItemReader必须从头开始。这在许多场景中是有效的,但有时批处理作业最好从上次中断的地方重新开始。关键的区别通常在于读取器是有状态还是无状态的。无状态读取器无需担心可重启性,但有状态读取器必须尝试在重启时恢复其上次已知状态。因此,我们建议您尽可能保持自定义读取器无状态,这样就无需担心可重启性。
如果您确实需要存储状态,则应使用ItemStream接口:
public class CustomItemReader<T> implements ItemReader<T>, ItemStream {
List<T> items;
int currentIndex = 0;
private static final String CURRENT_INDEX = "current.index";
public CustomItemReader(List<T> items) {
this.items = items;
}
public T read() throws Exception, UnexpectedInputException,
ParseException, NonTransientResourceException {
if (currentIndex < items.size()) {
return items.get(currentIndex++);
}
return null;
}
public void open(ExecutionContext executionContext) throws ItemStreamException {
if (executionContext.containsKey(CURRENT_INDEX)) {
currentIndex = new Long(executionContext.getLong(CURRENT_INDEX)).intValue();
}
else {
currentIndex = 0;
}
}
public void update(ExecutionContext executionContext) throws ItemStreamException {
executionContext.putLong(CURRENT_INDEX, new Long(currentIndex).longValue());
}
public void close() throws ItemStreamException {}
}
每次调用ItemStream的update方法时,ItemReader的当前索引都以“current.index”为键存储在提供的ExecutionContext中。当调用ItemStream的open方法时,会检查ExecutionContext以查看是否包含该键的条目。如果找到该键,则当前索引会移动到该位置。这是一个相当简单的示例,但它仍然满足通用契约:
ExecutionContext executionContext = new ExecutionContext();
((ItemStream)itemReader).open(executionContext);
assertEquals("1", itemReader.read());
((ItemStream)itemReader).update(executionContext);
List<String> items = new ArrayList<>();
items.add("1");
items.add("2");
items.add("3");
itemReader = new CustomItemReader<>(items);
((ItemStream)itemReader).open(executionContext);
assertEquals("2", itemReader.read());
大多数ItemReaders都有更复杂的重启逻辑。例如,JdbcCursorItemReader在游标中存储最后处理的行的行ID。
还值得注意的是,ExecutionContext中使用的键不应该过于简单。这是因为同一个ExecutionContext用于Step中的所有ItemStreams。在大多数情况下,只需在键前面加上类名就足以保证唯一性。然而,在极少数情况下,如果同一类型的两个ItemStream在同一个步骤中使用(当输出需要两个文件时可能会发生),则需要一个更唯一的名称。因此,许多Spring Batch的ItemReader和ItemWriter实现都具有setName()属性,允许覆盖此键名。
自定义ItemWriter示例
实现自定义ItemWriter在许多方面与上面的ItemReader示例相似,但在足够多的方面有所不同,因此值得单独作为一个示例。然而,添加可重启性基本相同,因此本示例不涉及。与ItemReader示例一样,使用List以使示例尽可能简单:
public class CustomItemWriter<T> implements ItemWriter<T> {
List<T> output = TransactionAwareProxyFactory.createTransactionalList();
public void write(Chunk<? extends T> items) throws Exception {
output.addAll(items);
}
public List<T> getOutput() {
return output;
}
}
使ItemWriter可重启
要使ItemWriter可重启,我们将遵循与ItemReader相同的过程,添加并实现ItemStream接口以同步执行上下文。在此示例中,我们可能需要计算已处理的项目数量,并将其添加为页脚记录。如果我们需要这样做,我们可以在ItemWriter中实现ItemStream,以便在重新打开流时从执行上下文重新构成计数器。
在许多实际情况下,自定义ItemWriter也会委托给另一个本身可重启的写入器(例如,写入文件时),或者它写入事务性资源,因此无需可重启,因为它无状态。当您拥有有状态写入器时,您应该确保同时实现ItemStream和ItemWriter。请记住,写入器的客户端需要感知ItemStream,因此您可能需要在配置中将其注册为流。