Spring Cloud Stream 集成

任务本身可能很有用,但将任务集成到更大的生态系统中,使其能够用于更复杂的处理和编排。本节介绍 Spring Cloud Task 与 Spring Cloud Stream 集成的选项。

从 Spring Cloud Stream 启动任务

您可以从流中启动任务。为此,请创建一个接收器,它侦听包含 TaskLaunchRequest 作为其有效负载的消息。TaskLaunchRequest 包含

  • uri:指向要执行的任务工件。

  • applicationName:与任务关联的名称。如果未设置 applicationName,则 TaskLaunchRequest 会生成一个由以下内容组成的任务名称:Task-<UUID>

  • commandLineArguments:包含任务命令行参数的列表。

  • environmentProperties:包含任务要使用的环境变量的映射。

  • deploymentProperties:包含部署程序用于部署任务的属性的映射。

如果有效负载属于其他类型,则接收器会抛出异常。

例如,可以创建一个流,该流具有一个处理器,该处理器从 HTTP 源获取数据并创建一个包含 TaskLaunchRequestGenericMessage,并将消息发送到其输出通道。然后,任务接收器将从其输入通道接收消息,然后启动任务。

要创建 taskSink,您只需要创建一个包含 EnableTaskLauncher 注解的 Spring Boot 应用程序,如下例所示

@SpringBootApplication
@EnableTaskLauncher
public class TaskSinkApplication {
	public static void main(String[] args) {
		SpringApplication.run(TaskSinkApplication.class, args);
	}
}

Spring Cloud Task 项目的 samples 模块 包含一个示例接收器和处理器。要将这些示例安装到本地 Maven 存储库中,请从 spring-cloud-task-samples 目录运行 Maven 构建,并将 skipInstall 属性设置为 false,如下例所示

mvn clean install

maven.remoteRepositories.springRepo.url 属性必须设置为 Spring Boot Uber-jar 所在的远程存储库的位置。如果未设置,则没有远程存储库,因此它仅依赖于本地存储库。

Spring Cloud Data Flow

要在 Spring Cloud Data Flow 中创建流,您必须首先注册我们创建的任务接收器应用程序。在以下示例中,我们使用 Spring Cloud Data Flow shell 注册处理器和接收器示例应用程序

app register --name taskSink --type sink --uri maven://io.spring.cloud:tasksink:<version>
app register --name taskProcessor --type processor --uri maven:io.spring.cloud:taskprocessor:<version>

以下示例显示了如何从 Spring Cloud Data Flow shell 创建流

stream create foo --definition "http --server.port=9000|taskProcessor|taskSink" --deploy

Spring Cloud Task 事件

Spring Cloud Task 提供了在任务通过 Spring Cloud Stream 通道运行时,通过 Spring Cloud Stream 通道发出事件的功能。任务侦听器用于在名为 task-events 的消息通道上发布 TaskExecution。此功能会自动连接到任何具有 spring-cloud-streamspring-cloud-stream-<binder> 以及在其类路径上定义的任务的任务中。

要禁用事件发射侦听器,请将 spring.cloud.task.events.enabled 属性设置为 false

在定义了相应的类路径后,以下任务会在 task-events 通道上(在任务开始和结束时)将 TaskExecution 作为事件发出

@SpringBootApplication
public class TaskEventsApplication {

	public static void main(String[] args) {
		SpringApplication.run(TaskEventsApplication.class, args);
	}

	@Configuration
	public static class TaskConfiguration {

		@Bean
		public ApplicationRunner applicationRunner() {
			return new ApplicationRunner() {
				@Override
				public void run(ApplicationArguments args) {
					System.out.println("The ApplicationRunner was executed");
				}
			};
		}
	}
}
还需要在类路径上使用绑定器实现。
在 Spring Cloud Task 项目的 samples 模块中可以找到示例任务事件应用程序,此处

禁用特定任务事件

要禁用任务事件,可以将 spring.cloud.task.events.enabled 属性设置为 false

Spring Batch 事件

在通过任务执行 Spring Batch 作业时,可以将 Spring Cloud Task 配置为根据 Spring Batch 中可用的 Spring Batch 侦听器发出信息消息。具体来说,以下 Spring Batch 侦听器会自动配置到每个批处理作业中,并在通过 Spring Cloud Task 运行时,在关联的 Spring Cloud Stream 通道上发出消息

  • JobExecutionListener 侦听 job-execution-events

  • StepExecutionListener 侦听 step-execution-events

  • ChunkListener 侦听 chunk-events

  • ItemReadListener 侦听 item-read-events

  • ItemProcessListener 侦听 item-process-events

  • ItemWriteListener 侦听 item-write-events

  • SkipListener 侦听 skip-events

当上下文中存在相应的 bean(一个 Job 和一个 TaskLifecycleListener)时,这些侦听器会自动配置到任何 AbstractJob 中。侦听这些事件的配置方式与绑定到任何其他 Spring Cloud Stream 通道的方式相同。我们的任务(运行批处理作业的任务)充当 Source,侦听应用程序充当 ProcessorSink

一个示例可能是让一个应用程序侦听 job-execution-events 通道以获取作业的启动和停止。要配置侦听应用程序,您可以将输入配置为 job-execution-events,如下所示

spring.cloud.stream.bindings.input.destination=job-execution-events

还需要在类路径上使用绑定器实现。
在 Spring Cloud Task 项目的 samples 模块中可以找到示例批处理事件应用程序,此处

将批处理事件发送到不同的通道

Spring Cloud Task 为批处理事件提供的选项之一是能够更改特定侦听器可以发出其消息的通道。为此,请使用以下配置:spring.cloud.stream.bindings.<the channel>.destination=<new destination>。例如,如果 StepExecutionListener 需要将其消息发送到另一个名为 my-step-execution-events 的通道而不是默认的 step-execution-events,则可以添加以下配置

spring.cloud.task.batch.events.step-execution-events-binding-name=my-step-execution-events

禁用批处理事件

要禁用所有批处理事件的侦听器功能,请使用以下配置

spring.cloud.task.batch.events.enabled=false

要禁用特定批处理事件,请使用以下配置

spring.cloud.task.batch.events.<batch event listener>.enabled=false:

以下列表显示了您可以禁用的各个侦听器

spring.cloud.task.batch.events.job-execution.enabled=false
spring.cloud.task.batch.events.step-execution.enabled=false
spring.cloud.task.batch.events.chunk.enabled=false
spring.cloud.task.batch.events.item-read.enabled=false
spring.cloud.task.batch.events.item-process.enabled=false
spring.cloud.task.batch.events.item-write.enabled=false
spring.cloud.task.batch.events.skip.enabled=false

批处理事件的发射顺序

默认情况下,批处理事件具有Ordered.LOWEST_PRECEDENCE。要更改此值(例如,更改为 5),请使用以下配置

spring.cloud.task.batch.events.job-execution-order=5
spring.cloud.task.batch.events.step-execution-order=5
spring.cloud.task.batch.events.chunk-order=5
spring.cloud.task.batch.events.item-read-order=5
spring.cloud.task.batch.events.item-process-order=5
spring.cloud.task.batch.events.item-write-order=5
spring.cloud.task.batch.events.skip-order=5