DSL 基础

org.springframework.integration.dsl 包包含前面提到的 IntegrationFlowBuilder API 和许多 IntegrationComponentSpec 实现,它们也是构建器,并提供流畅的 API 来配置具体的端点。IntegrationFlowBuilder 基础设施为基于消息的应用程序提供了常见的 企业集成模式 (EIP),例如通道、端点、轮询器和通道拦截器。

重要

IntegrationComponentSpec 是一个 FactoryBean 实现,因此其 getObject() 方法不能从 bean 定义中调用。IntegrationComponentSpec 实现必须保持原样以供 bean 定义使用,框架将管理其生命周期。对于 IntegrationFlow bean 定义,必须使用目标 IntegrationComponentSpec 类型(一个 FactoryBean 值)的 bean 方法参数注入,而不是 bean 方法引用。

端点在 DSL 中以动词的形式表达,以提高可读性。以下列表包括常见的 DSL 方法名称和相关的 EIP 端点

  • transform → Transformer

  • filter → Filter

  • handle → ServiceActivator

  • split → Splitter

  • aggregate → Aggregator

  • route → Router

  • bridge → Bridge

从概念上讲,集成过程是通过将这些端点组合成一个或多个消息流来构建的。请注意,EIP 没有正式定义“消息流”一词,但将其视为使用知名消息模式的工作单元是有用的。DSL 提供了一个 IntegrationFlow 组件来定义通道和它们之间端点的组合,但现在 IntegrationFlow 仅扮演配置角色,以在应用程序上下文中填充真实 bean,并且在运行时不使用。但是,IntegrationFlow 的 bean 可以作为 Lifecycle 自动装配,以控制整个流的 start()stop(),这将委托给与该 IntegrationFlow 关联的所有 Spring 集成组件。以下示例使用 IntegrationFlow 流畅 API 通过使用 IntegrationFlowBuilder 中的 EIP 方法来定义 IntegrationFlow bean

@Bean
public IntegrationFlow integerFlow() {
    return IntegrationFlow.from("input")
            .<String, Integer>transform(Integer::parseInt)
            .get();
}

transform 方法接受一个 lambda 作为端点参数来操作消息有效负载。该方法的实际参数是一个 GenericTransformer<S, T> 实例。因此,这里可以使用任何提供的转换器(ObjectToJsonTransformerFileToStringTransformer 等)。

在幕后,IntegrationFlowBuilder 识别 MessageHandler 及其端点,分别为 MessageTransformingHandlerConsumerEndpointFactoryBean。考虑另一个示例

@Bean
public IntegrationFlow myFlow() {
    return IntegrationFlow.from("input")
                .filter("World"::equals)
                .transform("Hello "::concat)
                .handle(System.out::println)
                .get();
}

前面的示例组合了一个Filter → Transformer → Service Activator序列。该流程是“单向”的。也就是说,它不提供回复消息,而只是将有效负载打印到STDOUT。端点通过使用直接通道自动连接在一起。

Lambda 和Message<?> 参数

在 EIP 方法中使用 lambda 时,“输入”参数通常是消息有效负载。如果您希望访问整个消息,请使用其中一个采用Class<?>作为第一个参数的重载方法。例如,这将不起作用

.<Message<?>, Foo>transform(m -> newFooFromMessage(m))

这将在运行时失败,并出现ClassCastException,因为 lambda 不保留参数类型,框架将尝试将有效负载强制转换为Message<?>

相反,请使用

.(Message.class, m -> newFooFromMessage(m))
Bean 定义覆盖

Java DSL 可以为在流程定义中内联定义的对象注册 bean,也可以重用现有的注入 bean。如果为内联对象和现有 bean 定义定义了相同的 bean 名称,则会抛出BeanDefinitionOverrideException,表明此类配置错误。但是,当您处理prototype bean 时,无法从集成流处理器中检测到现有的 bean 定义,因为每次从BeanFactory调用prototype bean 时,我们都会获得一个新实例。这样,提供的实例就会按原样在IntegrationFlow中使用,而不会进行任何 bean 注册,也不会对现有的prototype bean 定义进行任何可能的检查。但是,如果该对象具有显式id,并且该名称的 bean 定义在prototype范围内,则会为该对象调用BeanFactory.initializeBean()