单步批处理作业启动器

本节介绍如何使用 Spring Cloud Task 中包含的启动器来开发具有单个 `Step` 的 Spring Batch `Job`。此启动器允许您使用配置来定义 `ItemReader`、`ItemWriter` 或完整的单步 Spring Batch `Job`。有关 Spring Batch 及其功能的更多信息,请参阅 Spring Batch 文档

要获取 Maven 的启动器,请将以下内容添加到您的构建中

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-single-step-batch-job</artifactId>
    <version>2.3.0</version>
</dependency>

要获取 Gradle 的启动器,请将以下内容添加到您的构建中

compile "org.springframework.cloud:spring-cloud-starter-single-step-batch-job:2.3.0"

定义作业

您可以使用启动器定义尽可能少的 `ItemReader` 或 `ItemWriter`,或定义尽可能多的完整 `Job`。在本节中,我们将定义配置 `Job` 需要定义的属性。

属性

首先,启动器提供了一组属性,允许您配置具有一个步骤的作业的基本信息

表 1. 作业属性
属性 类型 默认值 描述

spring.batch.job.jobName

字符串

null

作业的名称。

spring.batch.job.stepName

字符串

null

步骤的名称。

spring.batch.job.chunkSize

整数

null

每个事务要处理的项目数。

通过配置上述属性,您将拥有一个具有单个基于块的步骤的作业。此基于块的步骤读取、处理和写入 `Map<String, Object>` 实例作为项目。但是,该步骤还没有执行任何操作。您需要配置一个 `ItemReader`、一个可选的 `ItemProcessor` 和一个 `ItemWriter` 来赋予其功能。要配置这些组件之一,您可以使用属性并配置已提供自动配置的选项之一,或者您可以使用标准 Spring 配置机制配置您自己的组件。

如果您配置了自己的组件,则输入和输出类型必须与步骤中的其他类型匹配。此启动器中的 `ItemReader` 实现和 `ItemWriter` 实现都使用 `Map<String, Object>` 作为输入和输出项目。

ItemReader 实现的自动配置

此启动器为四种不同的 `ItemReader` 实现提供了自动配置:`AmqpItemReader`、`FlatFileItemReader`、`JdbcCursorItemReader` 和 `KafkaItemReader`。在本节中,我们将概述如何使用提供的自动配置来配置每个组件。

AmqpItemReader

您可以使用 `AmqpItemReader` 从 AMQP 的队列或主题中读取数据。此 `ItemReader` 实现的自动配置依赖于两组配置。第一组是 `AmqpTemplate` 的配置。您可以自己配置此模板,也可以使用 Spring Boot 提供的自动配置。请参阅 Spring Boot AMQP 文档。配置完 `AmqpTemplate` 后,您可以通过设置以下属性来启用支持它的批处理功能

表 2. `AmqpItemReader` 属性
属性 类型 默认值 描述

spring.batch.job.amqpitemreader.enabled

布尔值

false

如果为 `true`,则自动配置将执行。

spring.batch.job.amqpitemreader.jsonConverterEnabled

布尔值

true

指示是否应注册 `Jackson2JsonMessageConverter` 来解析消息。

有关更多信息,请参阅 `AmqpItemReader` 文档

FlatFileItemReader

`FlatFileItemReader` 允许您从平面文件(如 CSV 和其他文件格式)中读取数据。要从文件读取数据,您可以通过正常的 Spring 配置提供一些组件(`LineTokenizer`、`RecordSeparatorPolicy`、`FieldSetMapper`、`LineMapper` 或 `SkippedLinesCallback`)。您还可以使用以下属性来配置读取器

表 3. `FlatFileItemReader` 属性
属性 类型 默认值 描述

spring.batch.job.flatfileitemreader.saveState

布尔值

true

确定是否应为重启保存状态。

spring.batch.job.flatfileitemreader.name

字符串

null

用于在 `ExecutionContext` 中提供唯一键的名称。

spring.batch.job.flatfileitemreader.maxItemcount

int

Integer.MAX_VALUE

要从文件中读取的最大项目数。

spring.batch.job.flatfileitemreader.currentItemCount

int

0

已经读取的项目数。在重启时使用。

spring.batch.job.flatfileitemreader.comments

List<String>

空列表

指示文件中注释行(要忽略的行)的字符串列表。

spring.batch.job.flatfileitemreader.resource

资源

null

要读取的资源。

spring.batch.job.flatfileitemreader.strict

布尔值

true

如果设置为 `true`,则如果找不到资源,读取器将抛出异常。

spring.batch.job.flatfileitemreader.encoding

字符串

FlatFileItemReader.DEFAULT_CHARSET

读取文件时要使用的编码。

spring.batch.job.flatfileitemreader.linesToSkip

int

0

指示在文件开头跳过的行数。

spring.batch.job.flatfileitemreader.delimited

布尔值

false

指示文件是否为分隔文件(CSV 和其他格式)。同时只能有一个 `spring.batch.job.flatfileitemreader.delimited` 或 `spring.batch.job.flatfileitemreader.fixedLength` 属性为 `true`。

spring.batch.job.flatfileitemreader.delimiter

字符串

DelimitedLineTokenizer.DELIMITER_COMMA

如果读取分隔文件,则指示要解析的分隔符。

spring.batch.job.flatfileitemreader.quoteCharacter

字符

DelimitedLineTokenizer.DEFAULT_QUOTE_CHARACTER

用于确定用于引用值的字符。

spring.batch.job.flatfileitemreader.includedFields

List<Integer>

空列表

指示要将记录中的哪些字段包含在项目中的索引列表。

spring.batch.job.flatfileitemreader.fixedLength

布尔值

false

指示文件的记录是否按列号解析。同时只能有一个 `spring.batch.job.flatfileitemreader.delimited` 或 `spring.batch.job.flatfileitemreader.fixedLength` 属性为 `true`。

spring.batch.job.flatfileitemreader.ranges

List<Range>

空列表

用于解析定长记录的列范围列表。请参阅 Range 文档

spring.batch.job.flatfileitemreader.names

String []

null

从记录中解析的每个字段的名称列表。这些名称是此ItemReader返回的项目中Map<String, Object>中的键。

spring.batch.job.flatfileitemreader.parsingStrict

布尔值

true

如果设置为true,则如果字段无法映射,则映射失败。

JdbcCursorItemReader

JdbcCursorItemReader 对关系数据库运行查询,并迭代结果游标 (ResultSet) 以提供结果项目。此自动配置允许您提供PreparedStatementSetterRowMapper或两者。您还可以使用以下属性来配置JdbcCursorItemReader

表 4. JdbcCursorItemReader 属性
属性 类型 默认值 描述

spring.batch.job.jdbccursoritemreader.saveState

布尔值

true

确定是否应为重启保存状态。

spring.batch.job.jdbccursoritemreader.name

字符串

null

用于在 `ExecutionContext` 中提供唯一键的名称。

spring.batch.job.jdbccursoritemreader.maxItemcount

int

Integer.MAX_VALUE

要从文件中读取的最大项目数。

spring.batch.job.jdbccursoritemreader.currentItemCount

int

0

已经读取的项目数。在重启时使用。

spring.batch.job.jdbccursoritemreader.fetchSize

int

提示驱动程序指示每次调用数据库系统时要检索多少条记录。为了获得最佳性能,您通常希望将其设置为与块大小匹配。

spring.batch.job.jdbccursoritemreader.maxRows

int

要从数据库读取的最大项目数。

spring.batch.job.jdbccursoritemreader.queryTimeout

int

查询超时的时间(毫秒)。

spring.batch.job.jdbccursoritemreader.ignoreWarnings

布尔值

true

确定读取器在处理时是否应忽略 SQL 警告。

spring.batch.job.jdbccursoritemreader.verifyCursorPosition

布尔值

true

指示是否应在每次读取后验证游标的位置,以验证RowMapper是否未提前游标。

spring.batch.job.jdbccursoritemreader.driverSupportsAbsolute

布尔值

false

指示驱动程序是否支持游标的绝对定位。

spring.batch.job.jdbccursoritemreader.useSharedExtendedConnection

布尔值

false

指示连接是否与其他处理共享(因此是事务的一部分)。

spring.batch.job.jdbccursoritemreader.sql

字符串

null

要从中读取的 SQL 查询。

您还可以使用以下属性专门为读取器指定 JDBC 数据源:.JdbcCursorItemReader 属性

属性 类型 默认值 描述

spring.batch.job.jdbccursoritemreader.datasource.enable

布尔值

false

确定是否应启用JdbcCursorItemReader DataSource

jdbccursoritemreader.datasource.url

字符串

null

数据库的 JDBC URL。

jdbccursoritemreader.datasource.username

字符串

null

数据库的登录用户名。

jdbccursoritemreader.datasource.password

字符串

null

数据库的登录密码。

jdbccursoritemreader.datasource.driver-class-name

字符串

null

JDBC 驱动的完全限定名称。

如果未指定jdbccursoritemreader_datasource,则JDBCCursorItemReader将使用默认的DataSource

KafkaItemReader

从 Kafka 主题提取数据分区非常有用,而这正是KafkaItemReader可以做到的。要配置KafkaItemReader,需要两部分配置。首先,需要使用 Spring Boot 的 Kafka 自动配置来配置 Kafka(请参阅Spring Boot Kafka 文档)。配置完 Spring Boot 的 Kafka 属性后,您可以通过设置以下属性来配置KafkaItemReader本身

表 5. KafkaItemReader 属性
属性 类型 默认值 描述

spring.batch.job.kafkaitemreader.name

字符串

null

用于在 `ExecutionContext` 中提供唯一键的名称。

spring.batch.job.kafkaitemreader.topic

字符串

null

要从中读取的主题的名称。

spring.batch.job.kafkaitemreader.partitions

List<Integer>

空列表

要从中读取的分区索引列表。

spring.batch.job.kafkaitemreader.pollTimeOutInSeconds

long

30

poll() 操作的超时时间。

spring.batch.job.kafkaitemreader.saveState

布尔值

true

确定是否应为重启保存状态。

原生编译

单步批处理的优势在于,当您使用 JVM 时,它允许您在运行时动态选择要使用的读取器和写入器 bean。但是,当您使用原生编译时,必须在构建时而不是运行时确定读取器和写入器。以下示例就是这样做的

<plugin>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-maven-plugin</artifactId>
    <executions>
        <execution>
            <id>process-aot</id>
            <goals>
                <goal>process-aot</goal>
            </goals>
            <configuration>
                <jvmArguments>
                    -Dspring.batch.job.flatfileitemreader.name=fooReader
                    -Dspring.batch.job.flatfileitemwriter.name=fooWriter
                </jvmArguments>
            </configuration>
        </execution>
    </executions>
</plugin>

ItemProcessor 配置

如果ApplicationContext中存在ItemProcessor,则单步批处理作业自动配置会接受它。如果找到正确类型的ItemProcessorItemProcessor<Map<String, Object>, Map<String, Object>>),则将其自动装配到步骤中。

ItemWriter 实现的自动配置

此启动器为与支持的ItemReader实现匹配的ItemWriter实现提供自动配置:AmqpItemWriterFlatFileItemWriterJdbcItemWriterKafkaItemWriter。本节介绍如何使用自动配置来配置支持的ItemWriter

AmqpItemWriter

要写入 RabbitMQ 队列,您需要两组配置。首先,您需要一个AmqpTemplate。最简单的方法是使用 Spring Boot 的 RabbitMQ 自动配置。请参阅Spring Boot AMQP 文档

配置完AmqpTemplate后,您可以通过设置以下属性来配置AmqpItemWriter

表 6. AmqpItemWriter 属性
属性 类型 默认值 描述

spring.batch.job.amqpitemwriter.enabled

布尔值

false

如果为true,则自动配置运行。

spring.batch.job.amqpitemwriter.jsonConverterEnabled

布尔值

true

指示是否应注册Jackson2JsonMessageConverter来转换消息。

FlatFileItemWriter

要将文件作为步骤的输出写入,您可以配置FlatFileItemWriter。自动配置接受已明确配置的组件(例如LineAggregatorFieldExtractorFlatFileHeaderCallbackFlatFileFooterCallback)以及通过设置以下指定属性配置的组件

表 7. FlatFileItemWriter 属性
属性 类型 默认值 描述

spring.batch.job.flatfileitemwriter.resource

资源

null

要读取的资源。

spring.batch.job.flatfileitemwriter.delimited

布尔值

false

指示输出文件是否为分隔文件。如果为true,则spring.batch.job.flatfileitemwriter.formatted必须为false

spring.batch.job.flatfileitemwriter.formatted

布尔值

false

指示输出文件是否为格式化文件。如果为true,则spring.batch.job.flatfileitemwriter.delimited必须为false

spring.batch.job.flatfileitemwriter.format

字符串

null

用于为格式化文件生成输出的格式。格式化是通过使用String.format执行的。

spring.batch.job.flatfileitemwriter.locale

区域设置

Locale.getDefault()

生成文件时要使用的Locale

spring.batch.job.flatfileitemwriter.maximumLength

int

0

记录的最大长度。如果为 0,则大小不受限制。

spring.batch.job.flatfileitemwriter.minimumLength

int

0

最小记录长度。

spring.batch.job.flatfileitemwriter.delimiter

字符串

,

用于分隔分隔文件中字段的String

spring.batch.job.flatfileitemwriter.encoding

字符串

FlatFileItemReader.DEFAULT_CHARSET

写入文件时要使用的编码。

spring.batch.job.flatfileitemwriter.forceSync

布尔值

false

指示是否应在刷新时将文件强制同步到磁盘。

spring.batch.job.flatfileitemwriter.names

String []

null

从记录中解析的每个字段的名称列表。这些名称是此ItemWriter接收的项目的Map<String, Object>中的键。

spring.batch.job.flatfileitemwriter.append

布尔值

false

指示如果找到输出文件,是否应追加到文件。

spring.batch.job.flatfileitemwriter.lineSeparator

字符串

FlatFileItemWriter.DEFAULT_LINE_SEPARATOR

在输出文件中分隔行时要使用什么String

spring.batch.job.flatfileitemwriter.name

字符串

null

用于在 `ExecutionContext` 中提供唯一键的名称。

spring.batch.job.flatfileitemwriter.saveState

布尔值

true

确定是否应为重启保存状态。

spring.batch.job.flatfileitemwriter.shouldDeleteIfEmpty

布尔值

false

如果设置为true,则作业完成后将删除空文件(没有输出)。

spring.batch.job.flatfileitemwriter.shouldDeleteIfExists

布尔值

true

如果设置为true并且在输出文件应在的位置找到文件,则在步骤开始之前将其删除。

spring.batch.job.flatfileitemwriter.transactional

布尔值

FlatFileItemWriter.DEFAULT_TRANSACTIONAL

指示读取器是否是事务队列(表示读取的项目在失败时将返回到队列)。

JdbcBatchItemWriter

要将步骤的输出写入关系数据库,此启动器提供了自动配置JdbcBatchItemWriter的功能。自动配置允许您提供自己的ItemPreparedStatementSetterItemSqlParameterSourceProvider以及通过设置以下属性的配置选项

表 8. JdbcBatchItemWriter 属性
属性 类型 默认值 描述

spring.batch.job.jdbcbatchitemwriter.name

字符串

null

用于在 `ExecutionContext` 中提供唯一键的名称。

spring.batch.job.jdbcbatchitemwriter.sql

字符串

null

用于插入每个项目的 SQL。

spring.batch.job.jdbcbatchitemwriter.assertUpdates

布尔值

true

是否验证每次插入都会导致更新至少一条记录。

您还可以使用以下属性专门为写入器指定 JDBC 数据源:.JdbcBatchItemWriter 属性

属性 类型 默认值 描述

spring.batch.job.jdbcbatchitemwriter.datasource.enable

布尔值

false

确定是否应启用JdbcCursorItemReader DataSource

jdbcbatchitemwriter.datasource.url

字符串

null

数据库的 JDBC URL。

jdbcbatchitemwriter.datasource.username

字符串

null

数据库的登录用户名。

jdbcbatchitemwriter.datasource.password

字符串

null

数据库的登录密码。

jdbcbatchitemreader.datasource.driver-class-name

字符串

null

JDBC 驱动的完全限定名称。

如果未指定jdbcbatchitemwriter_datasource,则JdbcBatchItemWriter将使用默认的DataSource

KafkaItemWriter

要将步骤输出写入 Kafka 主题,您需要KafkaItemWriter。此启动器通过使用来自两个位置的功能为KafkaItemWriter提供自动配置。第一个是 Spring Boot 的 Kafka 自动配置。(请参阅Spring Boot Kafka 文档。)其次,此启动器允许您在写入器上配置两个属性。

表 9. KafkaItemWriter 属性
属性 类型 默认值 描述

spring.batch.job.kafkaitemwriter.topic

字符串

null

要写入的 Kafka 主题。

spring.batch.job.kafkaitemwriter.delete

布尔值

false

传递给写入器的项目是否都应作为删除事件发送到主题。

有关KafkaItemWriter的配置选项的更多信息,请参阅KafkaItemWiter 文档

Spring AOT

当使用 Spring AOT 与单步批处理启动器时,您必须在编译时设置读取器和写入器名称属性(除非您为读取器和/或写入器创建 bean)。为此,您必须在启动 Maven 插件或 Gradle 插件中包含您希望使用的读取器和写入器的名称作为参数或环境变量。例如,如果您希望在 Maven 中启用FlatFileItemReaderFlatFileItemWriter,则它看起来像

    <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
        <executions>
            <execution>
            <id>process-aot</id>
            <goals>
                <goal>process-aot</goal>
            </goals>
            </execution>
        </executions>
        <configuration>
            <arguments>
                <argument>--spring.batch.job.flatfileitemreader.name=foobar</argument>
                <argument>--spring.batch.job.flatfileitemwriter.name=fooWriter</argument>
            </arguments>
        </configuration>
    </plugin>