数据库
与大多数企业应用程序风格一样,数据库是批处理的核心存储机制。但是,由于系统必须处理的数据集规模巨大,批处理与其他应用程序风格有所不同。如果一个 SQL 语句返回 100 万行,则结果集可能会将所有返回的结果保存在内存中,直到所有行都被读取完毕。Spring Batch 为此问题提供了两种类型的解决方案
基于游标的 `ItemReader` 实现
使用数据库游标通常是大多数批处理开发人员的默认方法,因为它是数据库对“流式”处理关系数据的解决方案。Java `ResultSet` 类本质上是一种面向对象的机制,用于操作游标。`ResultSet` 维持指向当前数据行的游标。在 `ResultSet` 上调用 `next` 会将此游标移动到下一行。Spring Batch 基于游标的 `ItemReader` 实现会在初始化时打开游标,并为每次调用 `read` 将游标向前移动一行,返回可用于处理的映射对象。然后调用 `close` 方法以确保释放所有资源。Spring 核心 `JdbcTemplate` 通过使用回调模式来完全映射 `ResultSet` 中的所有行并在返回控制给方法调用者之前关闭来解决此问题。但是,在批处理中,这必须等到步骤完成。下图显示了基于游标的 `ItemReader` 如何工作的通用图表。请注意,虽然示例使用 SQL(因为 SQL 非常流行),但任何技术都可以实现基本方法。
此示例说明了基本模式。给定一个名为“FOO”的表,该表具有三列:`ID`、`NAME` 和 `BAR`,选择 ID 大于 1 但小于 7 的所有行。这将游标的开头(第 1 行)放在 ID 2 上。该行的结果应为完全映射的 `Foo` 对象。再次调用 `read()` 会将游标移动到下一行,即 ID 为 3 的 `Foo`。这些读取的结果在每次 `read` 后写入,允许垃圾回收对象(假设没有实例变量维护对它们的引用)。
`JdbcCursorItemReader`
`JdbcCursorItemReader` 是基于游标技术的 JDBC 实现。它直接使用 `ResultSet`,并需要针对从 `DataSource` 获取的连接运行 SQL 语句。以下数据库模式用作示例
CREATE TABLE CUSTOMER (
ID BIGINT IDENTITY PRIMARY KEY,
NAME VARCHAR(45),
CREDIT FLOAT
);
许多人更喜欢为每一行使用一个域对象,因此以下示例使用 `RowMapper` 接口的实现来映射 `CustomerCredit` 对象
public class CustomerCreditRowMapper implements RowMapper<CustomerCredit> {
public static final String ID_COLUMN = "id";
public static final String NAME_COLUMN = "name";
public static final String CREDIT_COLUMN = "credit";
public CustomerCredit mapRow(ResultSet rs, int rowNum) throws SQLException {
CustomerCredit customerCredit = new CustomerCredit();
customerCredit.setId(rs.getInt(ID_COLUMN));
customerCredit.setName(rs.getString(NAME_COLUMN));
customerCredit.setCredit(rs.getBigDecimal(CREDIT_COLUMN));
return customerCredit;
}
}
因为 `JdbcCursorItemReader` 与 `JdbcTemplate` 共享关键接口,所以查看如何使用 `JdbcTemplate` 读取此数据的示例很有用,以便将其与 `ItemReader` 进行对比。出于本示例的目的,假设 `CUSTOMER` 数据库中有 1,000 行。第一个示例使用 `JdbcTemplate`
//For simplicity sake, assume a dataSource has already been obtained
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
List customerCredits = jdbcTemplate.query("SELECT ID, NAME, CREDIT from CUSTOMER",
new CustomerCreditRowMapper());
运行上述代码片段后,`customerCredits` 列表包含 1,000 个 `CustomerCredit` 对象。在 query 方法中,从 `DataSource` 获取连接,使用提供的 SQL 对其运行,并为 `ResultSet` 中的每一行调用 `mapRow` 方法。将其与 `JdbcCursorItemReader` 的方法进行对比,如下例所示
JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
customerCredit = itemReader.read();
counter++;
}
itemReader.close();
运行上述代码片段后,计数器等于 1,000。如果上面的代码将返回的 `customerCredit` 放入列表中,则结果将与 `JdbcTemplate` 示例完全相同。但是,`ItemReader` 的最大优势在于它允许“流式”处理项目。可以调用一次 `read` 方法,`ItemWriter` 可以写入项目,然后可以使用 `read` 获取下一个项目。这允许以“块”的形式进行项目读取和写入,并定期提交,这是高性能批处理的本质。此外,它可以轻松配置为注入到 Spring Batch `Step` 中。
-
Java
-
XML
以下示例显示如何在 Java 中将 `ItemReader` 注入到 `Step` 中
@Bean
public JdbcCursorItemReader<CustomerCredit> itemReader() {
return new JdbcCursorItemReaderBuilder<CustomerCredit>()
.dataSource(this.dataSource)
.name("creditReader")
.sql("select ID, NAME, CREDIT from CUSTOMER")
.rowMapper(new CustomerCreditRowMapper())
.build();
}
以下示例显示如何在 XML 中将 `ItemReader` 注入到 `Step` 中
<bean id="itemReader" class="org.spr...JdbcCursorItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="sql" value="select ID, NAME, CREDIT from CUSTOMER"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
其他属性
由于在 Java 中打开游标有很多不同的选项,因此 `JdbcCursorItemReader` 上有很多可以设置的属性,如下表所述
ignoreWarnings |
确定是否记录 SQLWarnings 或导致异常。默认为 `true`(表示记录警告)。 |
fetchSize |
向 JDBC 驱动程序提示应从数据库中提取的行数,当 `ItemReader` 使用的 `ResultSet` 对象需要更多行时。默认情况下,不提供提示。 |
maxRows |
设置底层 `ResultSet` 可以在任何时候保存的最大行数的限制。 |
queryTimeout |
设置驱动程序等待 |
verifyCursorPosition |
因为 |
saveState |
指示是否应在 |
driverSupportsAbsolute |
指示JDBC驱动程序是否支持在 |
setUseSharedExtendedConnection |
指示用于游标的连接是否应被所有其他处理使用,从而共享相同的交易。如果将其设置为 |
HibernateCursorItemReader
就像普通的Spring用户需要做出关于是否使用ORM解决方案的重要决定一样,这会影响他们是否使用JdbcTemplate
或HibernateTemplate
,Spring Batch用户也有同样的选择。HibernateCursorItemReader
是游标技术的Hibernate实现。Hibernate在批处理中的用法一直颇具争议。这主要是因为Hibernate最初是为支持在线应用程序风格而开发的。但是,这并不意味着它不能用于批处理。解决此问题的最简单方法是使用StatelessSession
而不是标准会话。这将删除Hibernate使用的所有缓存和脏检查,这些检查可能会在批处理场景中造成问题。有关无状态会话和普通Hibernate会话之间差异的更多信息,请参阅您特定Hibernate版本的文档。HibernateCursorItemReader
允许您声明一个HQL语句并传入一个SessionFactory
,它将每次调用读取时返回一个项目,其基本方式与JdbcCursorItemReader
相同。以下示例配置使用与JDBC读取器相同的“客户信用”示例
HibernateCursorItemReader itemReader = new HibernateCursorItemReader();
itemReader.setQueryString("from CustomerCredit");
//For simplicity sake, assume sessionFactory already obtained.
itemReader.setSessionFactory(sessionFactory);
itemReader.setUseStatelessSession(true);
int counter = 0;
ExecutionContext executionContext = new ExecutionContext();
itemReader.open(executionContext);
Object customerCredit = new Object();
while(customerCredit != null){
customerCredit = itemReader.read();
counter++;
}
itemReader.close();
此配置的ItemReader
返回CustomerCredit
对象的方式与JdbcCursorItemReader
中描述的完全相同,假设已为Customer
表正确创建Hibernate映射文件。“useStatelessSession”属性默认为true,但已在此处添加以引起注意开关功能。还值得注意的是,可以使用setFetchSize
属性设置底层游标的获取大小。与JdbcCursorItemReader
一样,配置非常简单。
-
Java
-
XML
以下示例显示如何在Java中注入Hibernate ItemReader
@Bean
public HibernateCursorItemReader itemReader(SessionFactory sessionFactory) {
return new HibernateCursorItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.sessionFactory(sessionFactory)
.queryString("from CustomerCredit")
.build();
}
以下示例显示如何在XML中注入Hibernate ItemReader
<bean id="itemReader"
class="org.springframework.batch.item.database.HibernateCursorItemReader">
<property name="sessionFactory" ref="sessionFactory" />
<property name="queryString" value="from CustomerCredit" />
</bean>
StoredProcedureItemReader
有时需要使用存储过程获取游标数据。StoredProcedureItemReader
的工作方式类似于JdbcCursorItemReader
,不同之处在于它不是运行查询来获取游标,而是运行返回游标的存储过程。存储过程可以通过三种不同的方式返回游标
-
作为返回的
ResultSet
(SQL Server、Sybase、DB2、Derby和MySQL使用)。 -
作为作为输出参数返回的ref游标(Oracle和PostgreSQL使用)。
-
作为存储函数调用的返回值。
-
Java
-
XML
以下Java示例配置使用与前面示例相同的“客户信用”示例
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
return reader;
}
以下XML示例配置使用与前面示例相同的“客户信用”示例
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
前面的示例依赖于存储过程提供ResultSet
作为返回结果(前面选项1)。
如果存储过程返回一个ref-cursor
(选项2),那么我们需要提供作为返回ref-cursor
的输出参数的位置。
-
Java
-
XML
以下示例显示如何在Java中使用第一个参数作为ref游标
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
reader.setRefCursorPosition(1);
return reader;
}
以下示例显示如何在XML中使用第一个参数作为ref游标
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="refCursorPosition" value="1"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
如果游标是从存储函数返回的(选项3),我们需要将属性“function”设置为true
。它默认为false
。
-
Java
-
XML
以下示例显示如何在Java中将属性设置为true
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("sp_customer_credit");
reader.setRowMapper(new CustomerCreditRowMapper());
reader.setFunction(true);
return reader;
}
以下示例显示如何在XML中将属性设置为true
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="sp_customer_credit"/>
<property name="function" value="true"/>
<property name="rowMapper">
<bean class="org.springframework.batch.samples.domain.CustomerCreditRowMapper"/>
</property>
</bean>
在所有这些情况下,我们都需要定义一个RowMapper
以及一个DataSource
和实际的过程名称。
如果存储过程或函数接受参数,则必须使用parameters
属性声明和设置它们。以下Oracle示例声明了三个参数。第一个是返回ref游标的out
参数,第二个和第三个是采用INTEGER
类型值的输入参数。
-
Java
-
XML
以下示例显示如何在Java中使用参数
@Bean
public StoredProcedureItemReader reader(DataSource dataSource) {
List<SqlParameter> parameters = new ArrayList<>();
parameters.add(new SqlOutParameter("newId", OracleTypes.CURSOR));
parameters.add(new SqlParameter("amount", Types.INTEGER);
parameters.add(new SqlParameter("custId", Types.INTEGER);
StoredProcedureItemReader reader = new StoredProcedureItemReader();
reader.setDataSource(dataSource);
reader.setProcedureName("spring.cursor_func");
reader.setParameters(parameters);
reader.setRefCursorPosition(1);
reader.setRowMapper(rowMapper());
reader.setPreparedStatementSetter(parameterSetter());
return reader;
}
以下示例显示如何在XML中使用参数
<bean id="reader" class="o.s.batch.item.database.StoredProcedureItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="procedureName" value="spring.cursor_func"/>
<property name="parameters">
<list>
<bean class="org.springframework.jdbc.core.SqlOutParameter">
<constructor-arg index="0" value="newid"/>
<constructor-arg index="1">
<util:constant static-field="oracle.jdbc.OracleTypes.CURSOR"/>
</constructor-arg>
</bean>
<bean class="org.springframework.jdbc.core.SqlParameter">
<constructor-arg index="0" value="amount"/>
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.INTEGER"/>
</constructor-arg>
</bean>
<bean class="org.springframework.jdbc.core.SqlParameter">
<constructor-arg index="0" value="custid"/>
<constructor-arg index="1">
<util:constant static-field="java.sql.Types.INTEGER"/>
</constructor-arg>
</bean>
</list>
</property>
<property name="refCursorPosition" value="1"/>
<property name="rowMapper" ref="rowMapper"/>
<property name="preparedStatementSetter" ref="parameterSetter"/>
</bean>
除了参数声明之外,我们还需要指定一个PreparedStatementSetter
实现来设置调用的参数值。这与上面的JdbcCursorItemReader
的工作方式相同。附加属性中列出的所有附加属性也适用于StoredProcedureItemReader
。
分页ItemReader
实现
使用数据库游标的替代方法是运行多个查询,每个查询获取结果的一部分。我们将这部分称为页面。每个查询都必须指定起始行号和我们想要在页面中返回的行数。
JdbcPagingItemReader
分页ItemReader
的一种实现是JdbcPagingItemReader
。JdbcPagingItemReader
需要一个PagingQueryProvider
,负责提供用于检索构成页面的行的SQL查询。由于每个数据库都有其自身的分页支持策略,因此我们需要为每个受支持的数据库类型使用不同的PagingQueryProvider
。还有SqlPagingQueryProviderFactoryBean
可以自动检测正在使用的数据库并确定合适的PagingQueryProvider
实现。这简化了配置,并且是推荐的最佳实践。
SqlPagingQueryProviderFactoryBean
要求您指定一个select
子句和一个from
子句。您还可以提供一个可选的where
子句。这些子句和必需的sortKey
用于构建SQL语句。
对sortKey 具有唯一键约束非常重要,以确保在执行之间不会丢失任何数据。 |
读取器打开后,它每次调用read
时都会返回一个项目,其基本方式与任何其他ItemReader
相同。当需要其他行时,分页会在后台发生。
-
Java
-
XML
以下Java示例配置使用与前面显示的基于游标的ItemReaders
类似的“客户信用”示例
@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
Map<String, Object> parameterValues = new HashMap<>();
parameterValues.put("status", "NEW");
return new JdbcPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.dataSource(dataSource)
.queryProvider(queryProvider)
.parameterValues(parameterValues)
.rowMapper(customerCreditMapper())
.pageSize(1000)
.build();
}
@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();
provider.setSelectClause("select id, name, credit");
provider.setFromClause("from customer");
provider.setWhereClause("where status=:status");
provider.setSortKey("id");
return provider;
}
以下XML示例配置使用与前面显示的基于游标的ItemReaders
类似的“客户信用”示例
<bean id="itemReader" class="org.spr...JdbcPagingItemReader">
<property name="dataSource" ref="dataSource"/>
<property name="queryProvider">
<bean class="org.spr...SqlPagingQueryProviderFactoryBean">
<property name="selectClause" value="select id, name, credit"/>
<property name="fromClause" value="from customer"/>
<property name="whereClause" value="where status=:status"/>
<property name="sortKey" value="id"/>
</bean>
</property>
<property name="parameterValues">
<map>
<entry key="status" value="NEW"/>
</map>
</property>
<property name="pageSize" value="1000"/>
<property name="rowMapper" ref="customerMapper"/>
</bean>
此配置的ItemReader
使用必须指定的RowMapper
返回CustomerCredit
对象。“pageSize”属性确定每次运行查询时从数据库读取的实体数量。
“parameterValues”属性可用于指定查询的参数值Map
。如果您在where
子句中使用命名参数,则每个条目的键应与命名参数的名称匹配。如果您使用传统的“?”占位符,则每个条目的键应该是占位符的编号,从1开始。
JpaPagingItemReader
分页ItemReader
的另一种实现是JpaPagingItemReader
。JPA没有类似于Hibernate StatelessSession
的概念,因此我们必须使用JPA规范提供的其他功能。由于JPA支持分页,因此在使用JPA进行批处理时,这是一个自然的选择。读取每个页面后,实体将分离,并且持久性上下文将被清除,以允许在处理页面后垃圾回收实体。
JpaPagingItemReader
允许您声明一个JPQL语句并传入一个EntityManagerFactory
。然后,它每次调用读取时都会返回一个项目,其基本方式与任何其他ItemReader
相同。当需要其他实体时,分页会在后台发生。
-
Java
-
XML
以下Java示例配置使用与前面显示的JDBC读取器相同的“客户信用”示例
@Bean
public JpaPagingItemReader itemReader() {
return new JpaPagingItemReaderBuilder<CustomerCredit>()
.name("creditReader")
.entityManagerFactory(entityManagerFactory())
.queryString("select c from CustomerCredit c")
.pageSize(1000)
.build();
}
以下XML示例配置使用与前面显示的JDBC读取器相同的“客户信用”示例
<bean id="itemReader" class="org.spr...JpaPagingItemReader">
<property name="entityManagerFactory" ref="entityManagerFactory"/>
<property name="queryString" value="select c from CustomerCredit c"/>
<property name="pageSize" value="1000"/>
</bean>
此配置的ItemReader
返回CustomerCredit
对象的方式与上面描述的JdbcPagingItemReader
完全相同,假设CustomerCredit
对象具有正确的JPA注释或ORM映射文件。“pageSize”属性确定每次查询执行时从数据库读取的实体数量。
数据库ItemWriters
虽然平面文件和XML文件都有一个特定的ItemWriter
实例,但在数据库世界中没有完全等效的实例。这是因为事务提供了所有必要的功能。ItemWriter
实现对于文件是必要的,因为它们必须表现得像事务一样,跟踪写入的项目并在适当的时候刷新或清除。数据库不需要此功能,因为写入已包含在事务中。用户可以创建实现ItemWriter
接口的自己的DAO,或者使用为通用处理问题编写的自定义ItemWriter
中的一个。无论哪种方式,它们都应该可以正常工作。需要注意的一件事是批量输出提供的性能和错误处理功能。当使用Hibernate作为ItemWriter
时,这最常见,但在使用JDBC批处理模式时也可能出现相同的问题。假设我们小心地刷新并且数据中没有错误,那么批量数据库输出没有任何固有的缺陷。但是,写入过程中的任何错误都可能导致混淆,因为无法知道哪个单独的项目导致了异常,甚至不知道是否有任何单独的项目是原因,如下图所示
如果项目在写入之前被缓冲,任何错误都不会在缓冲区在提交前被刷新之前抛出。例如,假设每块写入 20 个项目,并且第 15 个项目抛出 DataIntegrityViolationException
异常。就 Step
而言,所有 20 个项目都被成功写入,因为在实际写入之前无法知道发生了错误。一旦调用 Session#flush()
,缓冲区就会被清空,并且会遇到异常。此时,Step
无法做任何事情。事务必须回滚。通常,此异常可能会导致跳过该项目(取决于跳过/重试策略),然后不会再次写入它。但是,在批量场景中,无法知道哪个项目导致了问题。整个缓冲区在发生故障时正在写入。解决此问题的唯一方法是在每个项目之后刷新,如下面的图片所示。
这是一个常见的用例,尤其是在使用 Hibernate 时,ItemWriter
实现的简单指导原则是每次调用 write()
时都进行刷新。这样做允许可靠地跳过项目,Spring Batch 在内部处理错误后对 ItemWriter
调用的粒度。