批处理

本节将更详细地介绍 Spring Cloud Task 与 Spring Batch 的集成。本节涵盖了跟踪作业执行与其执行所在任务之间的关联以及通过 Spring Cloud Deployer 进行远程分区。

将作业执行与执行它的任务关联

Spring Boot 提供了在 Spring Boot Uber-jar 中执行批处理作业的功能。Spring Boot 对此功能的支持允许开发人员在该执行中执行多个批处理作业。Spring Cloud Task 提供了将作业的执行(作业执行)与任务的执行相关联的功能,以便可以将一个追溯到另一个。

Spring Cloud Task 通过使用 TaskBatchExecutionListener 实现此功能。默认情况下,此监听器会在任何同时配置了 Spring Batch 作业(通过在上下文中定义类型为 Job 的 bean)并且类路径上有 spring-cloud-task-batch jar 的上下文中自动配置。该监听器会注入到满足这些条件的所有作业中。

覆盖 TaskBatchExecutionListener

要阻止监听器注入到当前上下文中的任何批处理作业中,可以使用标准的 Spring Boot 机制禁用自动配置。

要仅将监听器注入到上下文中的特定作业中,请覆盖 batchTaskExecutionListenerBeanPostProcessor 并提供一个作业 bean ID 列表,如下例所示

public static TaskBatchExecutionListenerBeanPostProcessor batchTaskExecutionListenerBeanPostProcessor() {
	TaskBatchExecutionListenerBeanPostProcessor postProcessor =
		new TaskBatchExecutionListenerBeanPostProcessor();

	postProcessor.setJobNames(Arrays.asList(new String[] {"job1", "job2"}));

	return postProcessor;
}
您可以在 Spring Cloud Task 项目的 samples 模块中找到一个示例批处理应用程序,此处

远程分区

Spring Cloud Deployer 提供了在大多数云基础设施上启动基于 Spring Boot 的应用程序的功能。DeployerPartitionHandlerDeployerStepExecutionHandler 将工作步骤执行的启动委托给 Spring Cloud Deployer。

要配置 DeployerStepExecutionHandler,必须提供一个表示要执行的 Spring Boot Uber-jar 的 Resource、一个 TaskLauncherHandler 和一个 JobExplorer。您还可以配置任何环境属性以及一次执行的最大工作程序数量、轮询结果的时间间隔(默认为 10 秒)和超时时间(默认为 -1 或无超时)。以下示例显示了配置此 PartitionHandler 的方式

@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher,
		JobExplorer jobExplorer) throws Exception {

	MavenProperties mavenProperties = new MavenProperties();
	mavenProperties.setRemoteRepositories(new HashMap<>(Collections.singletonMap("springRepo",
		new MavenProperties.RemoteRepository(repository))));

 	Resource resource =
		MavenResource.parse(String.format("%s:%s:%s",
				"io.spring.cloud",
				"partitioned-batch-job",
				"1.1.0.RELEASE"), mavenProperties);

	DeployerPartitionHandler partitionHandler =
		new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "workerStep");

	List<String> commandLineArgs = new ArrayList<>(3);
	commandLineArgs.add("--spring.profiles.active=worker");
	commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
	commandLineArgs.add("--spring.batch.initializer.enabled=false");

	partitionHandler.setCommandLineArgsProvider(
		new PassThroughCommandLineArgsProvider(commandLineArgs));
	partitionHandler.setEnvironmentVariablesProvider(new NoOpEnvironmentVariablesProvider());
	partitionHandler.setMaxWorkers(2);
	partitionHandler.setApplicationName("PartitionedBatchJobTask");

	return partitionHandler;
}
将环境变量传递到分区时,每个分区可能位于具有不同环境设置的不同机器上。因此,您应该只传递必需的环境变量。

请注意,在上面的示例中,我们将最大工作程序数量设置为 2。设置最大工作程序数量会确定应同时运行的最大分区数量。

要执行的 Resource 预计是一个 Spring Boot Uber-jar,其中 DeployerStepExecutionHandler 在当前上下文中配置为 CommandLineRunner。前面示例中枚举的存储库应该是 Spring Boot Uber-jar 所在的远程存储库。管理器和工作程序都预计可以访问用作作业存储库和任务存储库的相同数据存储。一旦底层基础设施启动了 Spring Boot jar 并且 Spring Boot 启动了 DeployerStepExecutionHandler,步骤处理程序就会执行请求的 Step。以下示例显示了如何配置 DeployerStepExecutionHandler

@Bean
public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer) {
	DeployerStepExecutionHandler handler =
		new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository);

	return handler;
}
您可以在 Spring Cloud Task 项目的 samples 模块中找到一个示例远程分区应用程序,此处

异步启动远程批处理分区

默认情况下,批处理分区是顺序启动的。但是,在某些情况下,这可能会影响性能,因为每次启动都会阻塞,直到资源(例如:在 Kubernetes 中预配 Pod)被预配。在这些情况下,您可以向 DeployerPartitionHandler 提供一个 ThreadPoolTaskExecutor。这将根据 ThreadPoolTaskExecutor 的配置启动远程批处理分区。例如

	@Bean
	public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
		ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
		executor.setCorePoolSize(4);
		executor.setThreadNamePrefix("default_task_executor_thread");
		executor.setWaitForTasksToCompleteOnShutdown(true);
		executor.initialize();
		return executor;
	}

	@Bean
	public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer,
		TaskRepository taskRepository, ThreadPoolTaskExecutor executor) throws Exception {
		Resource resource = this.resourceLoader
			.getResource("maven://io.spring.cloud:partitioned-batch-job:2.2.0.BUILD-SNAPSHOT");

		DeployerPartitionHandler partitionHandler =
			new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
				"workerStep", taskRepository, executor);
	...
	}
我们需要关闭上下文,因为使用 ThreadPoolTaskExecutor 会使线程保持活动状态,从而导致应用程序无法终止。为了正确关闭应用程序,我们需要将 spring.cloud.task.closecontextEnabled 属性设置为 true

关于为 Kubernetes 平台开发批处理分区应用程序的说明

  • 在 Kubernetes 平台上部署分区应用程序时,必须使用以下依赖项才能使用 Spring Cloud Kubernetes Deployer

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-deployer-kubernetes</artifactId>
    </dependency>
  • 任务应用程序及其分区的应用程序名称需要遵循以下正则表达式模式:[a-z0-9]([-a-z0-9]*[a-z0-9])。否则,将抛出异常。

批处理信息消息

Spring Cloud Task 使批处理作业能够发出信息消息。“Spring Batch 事件”部分详细介绍了此功能。

批处理作业退出代码

前所述,Spring Cloud Task 应用程序支持记录任务执行的退出代码的功能。但是,在任务中运行 Spring Batch 作业的情况下,无论 Batch 作业执行如何完成,在使用默认的 Batch/Boot 行为时,任务的结果始终为零。请记住,任务是一个引导应用程序,并且从任务返回的退出代码与引导应用程序相同。要覆盖此行为并允许任务在批处理作业返回 FAILEDBatchStatus 时返回非零退出代码,请将 spring.cloud.task.batch.fail-on-job-failure 设置为 true。然后,退出代码可以为 1(默认值)或基于指定的 ExitCodeGenerator

此功能使用一个新的 ApplicationRunner 来替换 Spring Boot 提供的那个。默认情况下,它以相同的顺序配置。但是,如果要自定义 ApplicationRunner 运行的顺序,可以通过设置 spring.cloud.task.batch.applicationRunnerOrder 属性来设置其顺序。要使您的任务根据批处理作业执行的结果返回退出代码,您需要编写自己的 CommandLineRunner