Spring Cloud Stream 集成
任务本身可能很有用,但将任务集成到更大的生态系统中,使其能够用于更复杂的处理和编排。本节介绍 Spring Cloud Task 与 Spring Cloud Stream 集成的选项。
从 Spring Cloud Stream 启动任务
您可以从流中启动任务。为此,请创建一个接收器,它侦听包含 TaskLaunchRequest
作为其有效负载的消息。TaskLaunchRequest
包含
-
uri
:指向要执行的任务工件。 -
applicationName
:与任务关联的名称。如果未设置 applicationName,则TaskLaunchRequest
会生成一个由以下内容组成的任务名称:Task-<UUID>
。 -
commandLineArguments
:包含任务命令行参数的列表。 -
environmentProperties
:包含任务要使用的环境变量的映射。 -
deploymentProperties
:包含部署程序用于部署任务的属性的映射。
如果有效负载属于其他类型,则接收器会抛出异常。 |
例如,可以创建一个流,该流具有一个处理器,该处理器从 HTTP 源获取数据并创建一个包含 TaskLaunchRequest
的 GenericMessage
,并将消息发送到其输出通道。然后,任务接收器将从其输入通道接收消息,然后启动任务。
要创建 taskSink,您只需要创建一个包含 EnableTaskLauncher
注解的 Spring Boot 应用程序,如下例所示
@SpringBootApplication
@EnableTaskLauncher
public class TaskSinkApplication {
public static void main(String[] args) {
SpringApplication.run(TaskSinkApplication.class, args);
}
}
Spring Cloud Task 项目的 samples 模块 包含一个示例接收器和处理器。要将这些示例安装到本地 Maven 存储库中,请从 spring-cloud-task-samples
目录运行 Maven 构建,并将 skipInstall
属性设置为 false
,如下例所示
mvn clean install
maven.remoteRepositories.springRepo.url 属性必须设置为 Spring Boot Uber-jar 所在的远程存储库的位置。如果未设置,则没有远程存储库,因此它仅依赖于本地存储库。 |
Spring Cloud Data Flow
要在 Spring Cloud Data Flow 中创建流,您必须首先注册我们创建的任务接收器应用程序。在以下示例中,我们使用 Spring Cloud Data Flow shell 注册处理器和接收器示例应用程序
app register --name taskSink --type sink --uri maven://io.spring.cloud:tasksink:<version>
app register --name taskProcessor --type processor --uri maven:io.spring.cloud:taskprocessor:<version>
以下示例显示了如何从 Spring Cloud Data Flow shell 创建流
stream create foo --definition "http --server.port=9000|taskProcessor|taskSink" --deploy
Spring Cloud Task 事件
Spring Cloud Task 提供了在任务通过 Spring Cloud Stream 通道运行时,通过 Spring Cloud Stream 通道发出事件的功能。任务侦听器用于在名为 task-events
的消息通道上发布 TaskExecution
。此功能会自动连接到任何具有 spring-cloud-stream
、spring-cloud-stream-<binder>
以及在其类路径上定义的任务的任务中。
要禁用事件发射侦听器,请将 spring.cloud.task.events.enabled 属性设置为 false 。 |
在定义了相应的类路径后,以下任务会在 task-events
通道上(在任务开始和结束时)将 TaskExecution
作为事件发出
@SpringBootApplication
public class TaskEventsApplication {
public static void main(String[] args) {
SpringApplication.run(TaskEventsApplication.class, args);
}
@Configuration
public static class TaskConfiguration {
@Bean
public ApplicationRunner applicationRunner() {
return new ApplicationRunner() {
@Override
public void run(ApplicationArguments args) {
System.out.println("The ApplicationRunner was executed");
}
};
}
}
}
还需要在类路径上使用绑定器实现。 |
在 Spring Cloud Task 项目的 samples 模块中可以找到示例任务事件应用程序,此处。 |
Spring Batch 事件
在通过任务执行 Spring Batch 作业时,可以将 Spring Cloud Task 配置为根据 Spring Batch 中可用的 Spring Batch 侦听器发出信息消息。具体来说,以下 Spring Batch 侦听器会自动配置到每个批处理作业中,并在通过 Spring Cloud Task 运行时,在关联的 Spring Cloud Stream 通道上发出消息
-
JobExecutionListener
侦听job-execution-events
-
StepExecutionListener
侦听step-execution-events
-
ChunkListener
侦听chunk-events
-
ItemReadListener
侦听item-read-events
-
ItemProcessListener
侦听item-process-events
-
ItemWriteListener
侦听item-write-events
-
SkipListener
侦听skip-events
当上下文中存在相应的 bean(一个 Job
和一个 TaskLifecycleListener
)时,这些侦听器会自动配置到任何 AbstractJob
中。侦听这些事件的配置方式与绑定到任何其他 Spring Cloud Stream 通道的方式相同。我们的任务(运行批处理作业的任务)充当 Source
,侦听应用程序充当 Processor
或 Sink
。
一个示例可能是让一个应用程序侦听 job-execution-events
通道以获取作业的启动和停止。要配置侦听应用程序,您可以将输入配置为 job-execution-events
,如下所示
spring.cloud.stream.bindings.input.destination=job-execution-events
还需要在类路径上使用绑定器实现。 |
在 Spring Cloud Task 项目的 samples 模块中可以找到示例批处理事件应用程序,此处。 |
将批处理事件发送到不同的通道
Spring Cloud Task 为批处理事件提供的选项之一是能够更改特定侦听器可以发出其消息的通道。为此,请使用以下配置:spring.cloud.stream.bindings.<the channel>.destination=<new destination>
。例如,如果 StepExecutionListener
需要将其消息发送到另一个名为 my-step-execution-events
的通道而不是默认的 step-execution-events
,则可以添加以下配置
spring.cloud.task.batch.events.step-execution-events-binding-name=my-step-execution-events
禁用批处理事件
要禁用所有批处理事件的侦听器功能,请使用以下配置
spring.cloud.task.batch.events.enabled=false
要禁用特定批处理事件,请使用以下配置
spring.cloud.task.batch.events.<batch event listener>.enabled=false
:
以下列表显示了您可以禁用的各个侦听器
spring.cloud.task.batch.events.job-execution.enabled=false
spring.cloud.task.batch.events.step-execution.enabled=false
spring.cloud.task.batch.events.chunk.enabled=false
spring.cloud.task.batch.events.item-read.enabled=false
spring.cloud.task.batch.events.item-process.enabled=false
spring.cloud.task.batch.events.item-write.enabled=false
spring.cloud.task.batch.events.skip.enabled=false
批处理事件的发射顺序
默认情况下,批处理事件具有Ordered.LOWEST_PRECEDENCE
。要更改此值(例如,更改为 5),请使用以下配置
spring.cloud.task.batch.events.job-execution-order=5
spring.cloud.task.batch.events.step-execution-order=5
spring.cloud.task.batch.events.chunk-order=5
spring.cloud.task.batch.events.item-read-order=5
spring.cloud.task.batch.events.item-process-order=5
spring.cloud.task.batch.events.item-write-order=5
spring.cloud.task.batch.events.skip-order=5