Spring Integration 交互

Spring Integration 框架 将 Spring 编程模型扩展到支持众所周知的企业集成模式。它支持基于 Spring 的应用程序内的轻量级消息传递,并通过声明式适配器支持与外部系统的集成。它还提供高级 DSL,用于将各种操作(端点)组合到逻辑集成流中。通过这种 DSL 配置的 lambda 风格,Spring Integration 已经很好地采用了 `java.util.function` 接口。`@MessagingGateway` 代理接口也可以作为 `Function` 或 `Consumer`,根据 Spring Cloud Function 环境,可以将其注册到函数目录中。有关其对函数的支持的更多信息,请参阅 Spring Integration 参考手册

另一方面,从 4.0.3 版本开始,Spring Cloud Function 引入了一个 `spring-cloud-function-integration` 模块,该模块提供更深入、更特定于云的基于自动配置的 API,用于从 Spring Integration DSL 的角度与 `FunctionCatalog` 交互。`FunctionFlowBuilder` 自动配置并自动装配 `FunctionCatalog`,并表示目标 `IntegrationFlow` 实例的特定于函数的 DSL 的入口点。(为方便起见)除了标准的 `IntegrationFlow.from()` 工厂外,`FunctionFlowBuilder` 还公开了一个 `fromSupplier(String supplierDefinition)` 工厂来在提供的 `FunctionCatalog` 中查找目标 `Supplier`。然后,此 `FunctionFlowBuilder` 将导致 `FunctionFlowDefinition`。此 `FunctionFlowDefinition` 是 `IntegrationFlowExtension` 的实现,并公开 `apply(String functionDefinition)` 和 `accept(String consumerDefinition)` 运算符分别从 `FunctionCatalog` 中查找 `Function` 或 `Consumer`。有关更多信息,请参阅它们的 Javadoc。

以下示例演示了 `FunctionFlowBuilder` 的实际应用以及其余 `IntegrationFlow` API 的强大功能

@Configuration
public class IntegrationConfiguration {

    @Bean
    Supplier<byte[]> simpleByteArraySupplier() {
        return "simple test data"::getBytes;
    }

    @Bean
    Function<String, String> upperCaseFunction() {
        return String::toUpperCase;
    }

    @Bean
    BlockingQueue<String> results() {
        return new LinkedBlockingQueue<>();
    }

    @Bean
    Consumer<String> simpleStringConsumer(BlockingQueue<String> results) {
        return results::add;
    }

    @Bean
    QueueChannel wireTapChannel() {
        return new QueueChannel();
    }

    @Bean
    IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
        return functionFlowBuilder
                .fromSupplier("simpleByteArraySupplier")
                .wireTap("wireTapChannel")
                .apply("upperCaseFunction")
                .log(LoggingHandler.Level.WARN)
                .accept("simpleStringConsumer");
    }

}

由于 `FunctionCatalog.lookup()` 功能不仅限于简单的函数名称,因此函数组合功能也可以用于上述 `apply()` 和 `accept()` 运算符

@Bean
IntegrationFlow functionCompositionFlow(FunctionFlowBuilder functionFlowBuilder) {
    return functionFlowBuilder
            .from("functionCompositionInput")
            .accept("upperCaseFunction|simpleStringConsumer");
}

当我们在 Spring Cloud 应用程序中添加预定义函数的自动配置依赖项时,此 API 将变得更加相关。例如,Stream Applications 项目除了应用程序镜像外,还提供具有各种集成用例函数的工件,例如 `debezium-supplier`、`elasticsearch-consumer`、`aggregator-function` 等。

以下配置分别基于 `http-supplier`、`spel-function` 和 `file-consumer`

@Bean
IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
    return functionFlowBuilder
            .fromSupplier("httpSupplier", e -> e.poller(Pollers.trigger(new OnlyOnceTrigger())))
            .<Flux<?>>handle((fluxPayload, headers) -> fluxPayload, e -> e.async(true))
            .channel(c -> c.flux())
            .apply("spelFunction")
            .<String, String>transform(String::toUpperCase)
            .accept("fileConsumer");
}

我们只需要将它们的配置添加到 `application.properties` 中(如有必要)

http.path-pattern=/testPath
spel.function.expression=new String(payload)
file.consumer.name=test-data.txt