Spring Integration 框架概述

Spring Integration 提供了 Spring 编程模型的扩展,以支持众所周知的 企业集成模式。它支持基于 Spring 的应用程序内的轻量级消息传递,并通过声明式适配器支持与外部系统的集成。这些适配器在 Spring 对远程调用、消息传递和调度的支持之上提供了更高层次的抽象。

Spring Integration 的主要目标是在构建企业集成解决方案时提供一个简单的模型,同时保持关注点分离,这对生成可维护、可测试的代码至关重要。

Spring Integration 概述

本章提供 Spring Integration 核心概念和组件的高级介绍。它包含一些编程技巧,以帮助您充分利用 Spring Integration。

背景

Spring 框架的一个关键主题是控制反转 (IoC)。从广义上讲,这意味着框架代表在其上下文中管理的组件处理责任。组件本身得到了简化,因为它们免除了这些责任。例如,依赖注入使组件免于查找或创建其依赖项的责任。同样,面向方面的编程通过将通用横切关注点模块化为可重用的方面,从而使业务组件免于这些关注点。在每种情况下,最终结果都是一个更容易测试、理解、维护和扩展的系统。

此外,Spring 框架和产品组合为构建企业应用程序提供了全面的编程模型。开发人员受益于此模型的一致性,尤其受益于它基于成熟的最佳实践,例如面向接口编程和优先选择组合而不是继承。Spring 简化的抽象和强大的支持库提高了开发人员的生产力,同时提高了可测试性和可移植性。

Spring Integration 受这些相同目标和原则的驱动。它将 Spring 编程模型扩展到消息传递领域,并建立在 Spring 现有的企业集成支持的基础上,以提供更高层次的抽象。它支持消息驱动的架构,其中控制反转应用于运行时问题,例如某些业务逻辑何时运行以及应将响应发送到哪里。它支持消息的路由和转换,以便可以集成不同的传输和不同的数据格式,而不会影响可测试性。换句话说,消息传递和集成问题由框架处理。业务组件进一步与基础设施隔离,开发人员免除了复杂的集成责任。

作为 Spring 编程模型的扩展,Spring Integration 提供了各种配置选项,包括注释、带命名空间支持的 XML、带通用“bean”元素的 XML 以及对底层 API 的直接使用。该 API 基于定义良好的策略接口和非侵入式、委托适配器。Spring Integration 的设计灵感来自对 Spring 中常见模式与 Gregor Hohpe 和 Bobby Woolf 在《企业集成模式》(Addison Wesley,2004 年)中描述的众所周知的模式之间存在强关联性的认识。阅读过该书的开发人员应该立即熟悉 Spring Integration 的概念和术语。

目标和原则

Spring Integration 受以下目标驱动

  • 为实现复杂的企业集成解决方案提供一个简单的模型。

  • 促进基于 Spring 的应用程序内异步、消息驱动的行为。

  • 促进现有 Spring 用户直观、增量地采用。

Spring Integration 受以下原则指导

  • 组件应松散耦合以实现模块化和可测试性。

  • 框架应强制执行业务逻辑和集成逻辑之间的关注点分离。

  • 扩展点本质上应该是抽象的(但在定义明确的边界内),以促进重用和可移植性。

主要组件

从垂直角度来看,分层架构有助于关注点分离,层之间的基于接口的契约促进了松散耦合。基于 Spring 的应用程序通常以这种方式设计,Spring 框架和产品组合为遵循此最佳实践提供了坚实的基础,涵盖企业应用程序的完整堆栈。消息驱动的架构添加了水平视角,但这些相同目标仍然相关。正如“分层架构”是一个极其通用和抽象的范例一样,消息传递系统通常遵循类似的抽象“管道和过滤器”模型。“过滤器”表示任何能够生成或使用消息的组件,“管道”在过滤器之间传输消息,以便组件本身保持松散耦合。需要注意的是,这两个高级范例并不相互排斥。支持“管道”的底层消息传递基础设施仍然应该封装在一个层中,该层的契约定义为接口。同样,“过滤器”本身也应该在一个逻辑上位于应用程序服务层之上的层中进行管理,通过接口与这些服务交互,这与 Web 层的方式非常相似。

消息

在 Spring Integration 中,消息是任何 Java 对象的通用包装器,以及框架在处理该对象时使用的元数据。它由有效负载和报头组成。有效负载可以是任何类型,报头包含常用信息,例如 ID、时间戳、关联 ID 和返回地址。报头还用于在连接的传输之间传递值。例如,当从接收的文件创建消息时,文件名可能会存储在报头中,以便后续组件访问。同样,如果消息的内容最终将由出站邮件适配器发送,则上游组件可能会将各种属性(收件人、发件人、抄送、主题等)配置为消息报头值。开发人员还可以将任何任意键值对存储在报头中。

Message
图 1. 消息

消息通道

消息通道表示管道和过滤器架构的“管道”。生产者将消息发送到通道,消费者从通道接收消息。因此,消息通道解耦了消息传递组件,并提供了一个方便的拦截和监视消息的点。

Message Channel
图 2. 消息通道

消息通道可以遵循点对点或发布订阅语义。使用点对点通道,每个发送到通道的消息最多只能有一个消费者接收。另一方面,发布订阅通道尝试将每条消息广播到通道上的所有订阅者。Spring Integration 支持这两种模型。

虽然“点对点”和“发布订阅”定义了最终接收每条消息的消费者的数量的两个选项,但还有另一个重要的考虑因素:通道是否应该缓冲消息?在 Spring Integration 中,可轮询通道能够在队列中缓冲消息。缓冲的优点是它允许限制入站消息,从而防止消费者过载。但是,顾名思义,这也增加了一些复杂性,因为只有配置了轮询器,消费者才能从此类通道接收消息。另一方面,连接到可订阅通道的消费者只是消息驱动的。消息通道实现详细讨论了 Spring Integration 中提供的各种通道实现。

消息端点

Spring Integration 的主要目标之一是通过控制反转简化企业集成解决方案的开发。这意味着您不必直接实现消费者和生产者,甚至不必构建消息并调用通道上的发送或接收操作。相反,您应该能够专注于您的特定域模型,并基于普通对象实现。然后,通过提供声明式配置,您可以将您的特定于域的代码“连接”到 Spring Integration 提供的消息传递基础设施。负责这些连接的组件是消息端点。这并不意味着您必须将现有的应用程序代码直接连接起来。任何现实世界的企业集成解决方案都需要一定数量的代码专注于集成问题,例如路由和转换。重要的是在集成逻辑和业务逻辑之间实现关注点分离。换句话说,与 Web 应用程序的模型-视图-控制器 (MVC) 范例一样,目标应该是提供一个精简但专用的层,将入站请求转换为服务层调用,然后将服务层返回值转换为出站回复。下一节概述处理这些职责的消息端点类型,在接下来的章节中,您可以了解 Spring Integration 的声明式配置选项如何提供一种非侵入式的方式来使用其中的每一个。

消息端点

消息端点表示管道过滤器架构的“过滤器”。如前所述,端点的主要作用是将应用程序代码连接到消息框架,并以非侵入式的方式执行此操作。换句话说,应用程序代码理想情况下不应该了解消息对象或消息通道。这类似于MVC范式中控制器的作用。就像控制器处理HTTP请求一样,消息端点处理消息。就像控制器映射到URL模式一样,消息端点映射到消息通道。这两种情况的目标都是相同的:将应用程序代码与基础架构隔离。这些概念以及所有后续模式在企业集成模式一书中进行了详细讨论。在这里,我们仅提供Spring Integration支持的主要端点类型及其关联角色的高级描述。后续章节将详细阐述并提供示例代码以及配置示例。

消息转换器

消息转换器负责转换消息的内容或结构并返回修改后的消息。可能最常见的转换器类型是将消息的有效负载从一种格式转换为另一种格式(例如,从XML转换为java.lang.String)。类似地,转换器可以添加、删除或修改消息的标头值。

消息过滤器

消息过滤器确定是否应将消息传递到输出通道。这只需要一个布尔测试方法,该方法可以检查特定的有效负载内容类型、属性值、标头的存在或其他条件。如果接受消息,则将其发送到输出通道。否则,它将被丢弃(或者,对于更严格的实现,可以抛出Exception)。消息过滤器通常与发布-订阅通道结合使用,其中多个消费者可以接收相同的消息并使用过滤器的条件来缩小要处理的消息集。

请注意,不要将管道过滤器架构模式中“过滤器”的通用用法与这种选择性地缩小两个通道之间消息流的特定端点类型混淆。“过滤器”的管道过滤器概念更接近于Spring Integration的消息端点:任何可以连接到消息通道以发送或接收消息的组件。

消息路由器

消息路由器负责决定哪个通道(如果有)应该接收下一个消息。通常,决策基于消息的内容或消息标头中可用的元数据。消息路由器通常用作服务激活器或其他能够发送回复消息的端点上的静态配置输出通道的动态替代方案。同样,消息路由器为前面描述的多个订阅者使用的反应式消息过滤器提供了一种主动替代方案。

Router
图3. 消息路由器

拆分器

拆分器是另一种类型的消息端点,其职责是从其输入通道接收消息,将该消息拆分为多个消息,并将每个消息发送到其输出通道。这通常用于将“复合”有效负载对象划分为包含细分有效负载的消息组。

聚合器

基本上是拆分器的镜像,聚合器是一种类型的消息端点,它接收多个消息并将它们组合成单个消息。实际上,聚合器通常是包含拆分器的管道中的下游消费者。从技术上讲,聚合器比拆分器更复杂,因为它需要维护状态(要聚合的消息),以确定何时可用完整的消息组,以及在必要时超时。此外,在超时的情况下,聚合器需要知道是否发送部分结果、丢弃它们或将它们发送到单独的通道。Spring Integration提供了一个CorrelationStrategy、一个ReleaseStrategy以及超时、是否在超时时发送部分结果以及丢弃通道的可配置设置。

服务激活器

服务激活器是将服务实例连接到消息系统的通用端点。必须配置输入消息通道,并且,如果要调用的服务方法能够返回值,则还可以提供输出消息通道。

输出通道是可选的,因为每个消息也可以提供自己的“返回地址”标头。相同的规则适用于所有消费者端点。

服务激活器在某个服务对象上调用操作以处理请求消息,提取请求消息的有效负载并进行转换(如果方法不期望消息类型的参数)。每当服务对象的方法返回值时,如果必要(如果它本身不是消息类型),该返回值也会转换为回复消息。该回复消息将发送到输出通道。如果未配置输出通道,则回复将发送到消息中“返回地址”指定的通道(如果可用)。

请求-回复服务激活器端点将目标对象的方法连接到输入和输出消息通道。

handler endpoint
图4. 服务激活器
如前所述,在消息通道中,通道可以是轮询的或可订阅的。在上图中,这由“时钟”符号以及实线箭头(轮询)和虚线箭头(订阅)表示。

通道适配器

通道适配器是将消息通道连接到其他系统或传输的端点。通道适配器可以是入站的或出站的。通常,通道适配器会在消息与从其他系统接收或发送到的任何对象或资源之间进行一些映射(文件、HTTP请求、JMS消息等)。根据传输方式,通道适配器还可以填充或提取消息标头值。Spring Integration提供了许多通道适配器,这些适配器将在后续章节中介绍。

source endpoint
图5. 入站通道适配器端点将源系统连接到MessageChannel
消息源可以是轮询的(例如,POP3)或消息驱动的(例如,IMAP Idle)。在上图中,这由“时钟”符号以及实线箭头(轮询)和虚线箭头(消息驱动)表示。
target endpoint
图6. 出站通道适配器端点将MessageChannel连接到目标系统。
如前所述,在消息通道中,通道可以是轮询的或可订阅的。在上图中,这由“时钟”符号以及实线箭头(轮询)和虚线箭头(订阅)表示。

端点Bean名称

使用端点(任何具有inputChannel的端点)由两个Bean组成,即消费者和消息处理器。消费者持有对消息处理器的引用,并在消息到达时调用它。

考虑以下XML示例

<int:service-activator id = "someService" ... />

根据上述示例,Bean名称如下

  • 消费者:someServiceid

  • 处理器:someService.handler

当使用企业集成模式(EIP)注释时,名称取决于几个因素。考虑以下带注释的POJO示例

@Component
public class SomeComponent {

    @ServiceActivator(inputChannel = ...)
    public String someMethod(...) {
        ...
    }

}

根据上述示例,Bean名称如下

  • 消费者:someComponent.someMethod.serviceActivator

  • 处理器:someComponent.someMethod.serviceActivator.handler

从5.0.4版本开始,您可以使用@EndpointId注释修改这些名称,如下例所示

@Component
public class SomeComponent {

    @EndpointId("someService")
    @ServiceActivator(inputChannel = ...)
    public String someMethod(...) {
        ...
    }

}

根据上述示例,Bean名称如下

  • 消费者:someService

  • 处理器:someService.handler

@EndpointId创建与使用XML配置的id属性创建的名称相同。考虑以下带注释的Bean示例

@Configuration
public class SomeConfiguration {

    @Bean
    @ServiceActivator(inputChannel = ...)
    public MessageHandler someHandler() {
        ...
    }

}

根据上述示例,Bean名称如下

  • 消费者:someConfiguration.someHandler.serviceActivator

  • 处理器:someHandler@Bean名称)

从5.0.4版本开始,您可以使用@EndpointId注释修改这些名称,如下例所示

@Configuration
public class SomeConfiguration {

    @Bean("someService.handler")             (1)
    @EndpointId("someService")               (2)
    @ServiceActivator(inputChannel = ...)
    public MessageHandler someHandler() {
        ...
    }

}
1 处理器:someService.handler(Bean名称)
2 消费者:someService(端点ID)

@EndpointId注释创建与使用XML配置的id属性创建的名称相同,只要您使用将.handler附加到@Bean名称的约定。

有一种特殊情况会创建第三个Bean:出于架构原因,如果MessageHandler @Bean未定义AbstractReplyProducingMessageHandler,则框架会将提供的Bean包装在ReplyProducingMessageHandlerWrapper中。此包装器支持请求处理器建议处理并发出正常的“未产生回复”调试日志消息。它的Bean名称是处理器Bean名称加上.wrapper(当存在@EndpointId时,否则为正常生成的处理器名称)。

类似地,可轮询消息源创建两个Bean,一个SourcePollingChannelAdapter(SPCA)和一个MessageSource

考虑以下XML配置

<int:inbound-channel-adapter id = "someAdapter" ... />

根据上述XML配置,Bean名称如下

  • SPCA:someAdapterid

  • 处理器:someAdapter.source

考虑以下POJO的Java配置以定义@EndpointId

@EndpointId("someAdapter")
@InboundChannelAdapter(channel = "channel3", poller = @Poller(fixedDelay = "5000"))
public String pojoSource() {
    ...
}

根据上述Java配置示例,Bean名称如下

  • SPCA:someAdapter

  • 处理器:someAdapter.source

考虑以下Bean的Java配置以定义@EndpointID

@Bean("someAdapter.source")
@EndpointId("someAdapter")
@InboundChannelAdapter(channel = "channel3", poller = @Poller(fixedDelay = "5000"))
public MessageSource<?> source() {
    return () -> {
        ...
    };
}

根据上述示例,Bean名称如下

  • SPCA:someAdapter

  • 处理器:someAdapter.source(只要您使用将.source附加到@Bean名称的约定)

配置和@EnableIntegration

在整个文档中,您可以看到有关在Spring Integration流中声明元素的XML命名空间支持的引用。此支持由一系列命名空间解析器提供,这些解析器会生成适当的Bean定义以实现特定组件。例如,许多端点由一个MessageHandler Bean和一个ConsumerEndpointFactoryBean组成,其中注入处理器和输入通道名称。

首次遇到 Spring Integration 命名空间元素时,框架会自动声明许多 Bean(任务调度器、隐式通道创建器等),用于支持运行时环境。

版本 4.0 引入了 @EnableIntegration 注解,以允许注册 Spring Integration 基础设施 Bean(请参阅 Javadoc)。当仅使用 Java 配置时,此注解是必需的,例如使用 Spring Boot 或 Spring Integration 消息注解支持以及 Spring Integration Java DSL 且没有 XML 集成配置。

当您有一个没有 Spring Integration 组件的父上下文以及两个或多个使用 Spring Integration 的子上下文时,@EnableIntegration 注解也很有用。它允许这些公共组件仅在父上下文中声明一次。

@EnableIntegration 注解向应用程序上下文注册许多基础设施组件。特别是,它

  • 注册一些内置 Bean,例如 errorChannel 及其 LoggingHandler、用于轮询器的 taskSchedulerjsonPath SpEL 函数等。

  • 添加多个 BeanFactoryPostProcessor 实例以增强 BeanFactory,用于全局和默认集成环境。

  • 添加多个 BeanPostProcessor 实例以增强或转换和包装特定 Bean 以用于集成目的。

  • 添加注解处理器以解析消息注解,并使用应用程序上下文为其注册组件。

@IntegrationComponentScan 注解也允许类路径扫描。此注解的作用类似于标准 Spring Framework @ComponentScan 注解,但它仅限于 Spring Integration 特定的组件和注解,标准 Spring Framework 组件扫描机制无法访问这些组件和注解。例如,请参阅 @MessagingGateway 注解

@EnablePublisher 注解注册一个 PublisherAnnotationBeanPostProcessor Bean 并为那些没有 channel 属性的 @Publisher 注解配置 default-publisher-channel。如果找到多个 @EnablePublisher 注解,则它们都必须为默认通道使用相同的值。有关详细信息,请参阅 使用 @Publisher 注解的注解驱动的配置

@GlobalChannelInterceptor 注解已引入以标记用于全局通道拦截的 ChannelInterceptor Bean。此注解类似于 <int:channel-interceptor> XML 元素(请参阅 全局通道拦截器配置)。@GlobalChannelInterceptor 注解可以放在类级别(使用 @Component 构造型注解)或 @Configuration 类中的 @Bean 方法上。在这两种情况下,Bean 都必须实现 ChannelInterceptor

从版本 5.1 开始,全局通道拦截器适用于动态注册的通道,例如通过使用 beanFactory.initializeBean() 或在使用 Java DSL 时通过 IntegrationFlowContext 初始化的 Bean。以前,在应用程序上下文刷新后创建 Bean 时,不会应用拦截器。

@IntegrationConverter 注解将 ConverterGenericConverterConverterFactory Bean 标记为 integrationConversionService 的候选转换器。此注解类似于 <int:converter> XML 元素(请参阅 有效负载类型转换)。您可以将 @IntegrationConverter 注解放在类级别(使用 @Component 构造型注解)或 @Configuration 类中的 @Bean 方法上。

有关消息注解的更多信息,请参阅 注解支持

编程注意事项

应尽可能使用普通旧 Java 对象 (POJO),并且仅在绝对必要时才在代码中公开框架。有关更多信息,请参阅 POJO 方法调用

如果确实将框架公开给您的类,则需要考虑一些事项,尤其是在应用程序启动期间

  • 如果您的组件是 ApplicationContextAware,则通常不应在 setApplicationContext() 方法中使用 ApplicationContext。相反,请存储一个引用,并将此类用法推迟到上下文生命周期的后期。

  • 如果您的组件是 InitializingBean 或使用 @PostConstruct 方法,则不要从这些初始化方法发送任何消息。在调用这些方法时,应用程序上下文尚未初始化,并且发送此类消息可能会失败。如果需要在启动期间发送消息,请实现 ApplicationListener 并等待 ContextRefreshedEvent。或者,实现 SmartLifecycle,将您的 Bean 放入后期阶段,并从 start() 方法发送消息。

使用打包(例如,阴影)Jar 时注意事项

Spring Integration 通过使用 Spring Framework 的 SpringFactories 机制加载多个 IntegrationConfigurationInitializer 类来引导某些功能。这包括 -core jar 以及某些其他 jar,包括 -http-jmx。此过程的信息存储在每个 jar 中的 META-INF/spring.factories 文件中。

一些开发人员更喜欢使用众所周知的工具(例如 Apache Maven Shade 插件)将其应用程序和所有依赖项重新打包到一个 jar 中。

默认情况下,阴影插件在生成阴影 jar 时不会合并 spring.factories 文件。

除了 spring.factories 之外,其他 META-INF 文件(spring.handlersspring.schemas)也用于 XML 配置。这些文件也需要合并。

Spring Boot 的可执行 jar 机制 采用了一种不同的方法,因为它嵌套了 jar,从而在类路径上保留每个 spring.factories 文件。因此,对于 Spring Boot 应用程序,如果您使用其默认的可执行 jar 格式,则无需执行其他操作。

即使您不使用 Spring Boot,您仍然可以使用 Boot 提供的工具来增强阴影插件,方法是为上述文件添加转换器。以下示例显示了如何配置插件

示例 1. pom.xml
...
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <configuration>
                <keepDependenciesWithProvidedScope>true</keepDependenciesWithProvidedScope>
                <createDependencyReducedPom>true</createDependencyReducedPom>
            </configuration>
            <dependencies>
                <dependency> (1)
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                    <version>${spring.boot.version}</version>
                </dependency>
            </dependencies>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <transformers> (2)
                            <transformer
                                implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                <resource>META-INF/spring.handlers</resource>
                            </transformer>
                            <transformer
                                implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
                                <resource>META-INF/spring.factories</resource>
                            </transformer>
                            <transformer
                                implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                <resource>META-INF/spring.schemas</resource>
                            </transformer>
                            <transformer
                                implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
...

具体来说,

1 添加 spring-boot-maven-plugin 作为依赖项。
2 配置转换器。

您可以为 ${spring.boot.version} 添加属性或使用显式版本。

编程技巧

本节记录了一些充分利用 Spring Integration 的方法。

XML 架构

使用 XML 配置时,为避免出现错误的架构验证错误,应使用“Spring 感知”IDE,例如 Spring Tool Suite (STS)、带有 Spring IDE 插件的 Eclipse 或 IntelliJ IDEA。这些 IDE 知道如何从类路径解析正确的 XML 架构(通过使用 jar 中的 META-INF/spring.schemas 文件)。在使用 STS 或带有插件的 Eclipse 时,必须在项目上启用 Spring Project Nature

出于兼容性原因,为某些旧版模块(存在于版本 1.0 中的那些模块)在互联网上托管的架构是 1.0 版本。如果您的 IDE 使用这些架构,则可能会看到错误。

每个在线架构都包含类似于以下内容的警告

此架构适用于 Spring Integration Core 的 1.0 版本。我们无法将其更新到当前架构,因为这会破坏使用 1.0.3 或更低版本的任何应用程序。对于后续版本,将从类路径解析“未版本化”架构,并从 jar 中获取。请参阅 GitHub

受影响的模块是

  • corespring-integration.xsd

  • file

  • http

  • jms

  • mail

  • security

  • stream

  • ws

  • xml

查找 Java 和 DSL 配置的类名

使用 XML 配置和 Spring Integration 命名空间支持时,XML 解析器会隐藏目标 Bean 的声明和连接方式。对于 Java 配置,了解框架 API 对目标最终用户应用程序非常重要。

EIP 实现的一等公民是 MessageChannelEndpoint(请参阅本章前面部分的 主要组件)。它们的实现(契约)是

  • org.springframework.messaging.Message:请参阅 消息

  • org.springframework.messaging.MessageChannel:请参阅 消息通道

  • org.springframework.integration.endpoint.AbstractEndpoint:请参阅 轮询器

前两个很容易理解如何实现、配置和使用。最后一个需要更多关注

AbstractEndpoint 在整个 Spring Framework 中被广泛用于不同的组件实现。其主要实现是

  • EventDrivenConsumer,当我们订阅 SubscribableChannel 以侦听消息时使用。

  • PollingConsumer,当我们从 PollableChannel 轮询消息时使用。

当您使用消息注解或 Java DSL 时,您无需担心这些组件,因为框架会使用适当的注解和 BeanPostProcessor 实现自动生成它们。在手动构建组件时,应使用 ConsumerEndpointFactoryBean 来帮助确定要创建的目标 AbstractEndpoint 消费者实现,这基于提供的 inputChannel 属性。

另一方面,ConsumerEndpointFactoryBean委托给框架中的另一个一等公民——org.springframework.messaging.MessageHandler。实现此接口的目标是处理端点从通道中消费的消息。Spring Integration 中的所有EIP组件都是MessageHandler的实现(例如,AggregatingMessageHandlerMessageTransformingHandlerAbstractMessageSplitter等)。目标协议出站适配器(FileWritingMessageHandlerHttpRequestExecutingMessageHandlerAbstractMqttMessageHandler等)也是MessageHandler的实现。当使用Java配置开发Spring Integration应用程序时,您应该查看Spring Integration模块以查找适合用于@ServiceActivator配置的MessageHandler实现。例如,要发送XMPP消息(请参阅XMPP 支持),您应该配置如下内容

@Bean
@ServiceActivator(inputChannel = "input")
public MessageHandler sendChatMessageHandler(XMPPConnection xmppConnection) {
    ChatMessageSendingMessageHandler handler = new ChatMessageSendingMessageHandler(xmppConnection);

    DefaultXmppHeaderMapper xmppHeaderMapper = new DefaultXmppHeaderMapper();
    xmppHeaderMapper.setRequestHeaderNames("*");
    handler.setHeaderMapper(xmppHeaderMapper);

    return handler;
}

MessageHandler实现代表消息流的出站和处理部分。

入站消息流端有其自己的组件,这些组件分为轮询和监听行为。监听(消息驱动)组件很简单,通常只需要一个目标类实现即可准备生成消息。监听组件可以是一路MessageProducerSupport实现(例如AbstractMqttMessageDrivenChannelAdapterImapIdleChannelAdapter)或请求回复MessagingGatewaySupport实现(例如AmqpInboundGatewayAbstractWebServiceInboundGateway)。

轮询入站端点适用于不提供侦听器API或不适合此类行为的协议,包括任何基于文件的协议(例如FTP)、任何数据库(RDBMS或NoSQL)等。

这些入站端点由两个组件组成:轮询配置,用于定期启动轮询任务;以及消息源类,用于从目标协议读取数据并为下游集成流生成消息。轮询配置的第一类是SourcePollingChannelAdapter。它是另一个AbstractEndpoint实现,但特别是用于轮询以启动集成流。通常,使用消息传递注释或Java DSL,您无需担心此类。框架会根据@InboundChannelAdapter配置或Java DSL构建器规范为此生成一个bean。

消息源组件对于目标应用程序开发更为重要,它们都实现了MessageSource接口(例如,MongoDbMessageSourceAbstractTwitterMessageSource)。考虑到这一点,我们用于从RDBMS表中使用JDBC读取数据的配置可能类似于以下内容

@Bean
@InboundChannelAdapter(value = "fooChannel", poller = @Poller(fixedDelay="5000"))
public MessageSource<?> storedProc(DataSource dataSource) {
    return new JdbcPollingChannelAdapter(dataSource, "SELECT * FROM foo where status = 0");
}

您可以在特定的Spring Integration模块中找到目标协议所需的所有入站和出站类(在大多数情况下,在相应的包中)。例如,spring-integration-websocket适配器为

  • o.s.i.websocket.inbound.WebSocketInboundChannelAdapter:实现MessageProducerSupport以侦听套接字上的帧并将消息发送到通道。

  • o.s.i.websocket.outbound.WebSocketOutboundMessageHandler:将传入消息转换为适当的帧并通过websocket发送的一路AbstractMessageHandler实现。

如果您熟悉Spring Integration XML配置,从4.3版本开始,我们在XSD元素定义中提供了有关使用哪些目标类为适配器或网关声明bean的信息,如下例所示

<xsd:element name="outbound-async-gateway">
    <xsd:annotation>
		<xsd:documentation>
Configures a Consumer Endpoint for the 'o.s.i.amqp.outbound.AsyncAmqpOutboundGateway'
that will publish an AMQP Message to the provided Exchange and expect a reply Message.
The sending thread returns immediately; the reply is sent asynchronously; uses 'AsyncRabbitTemplate.sendAndReceive()'.
       </xsd:documentation>
	</xsd:annotation>

POJO方法调用

编程注意事项中所述,我们建议使用POJO编程风格,如下例所示

@ServiceActivator
public String myService(String payload) { ... }

在这种情况下,框架会提取String有效负载,调用您的方法,并将结果包装在消息中以发送到流中的下一个组件(原始标头被复制到新消息)。实际上,如果您使用XML配置,则甚至不需要@ServiceActivator注释,如下面的配对示例所示

<int:service-activator ... ref="myPojo" method="myService" />
public String myService(String payload) { ... }

只要类上的公共方法没有歧义,就可以省略method属性。

您也可以在POJO方法中获取标头信息,如下例所示

@ServiceActivator
public String myService(@Payload String payload, @Header("foo") String fooHeader) { ... }

您还可以取消引用消息上的属性,如下例所示

@ServiceActivator
public String myService(@Payload("payload.foo") String foo, @Header("bar.baz") String barbaz) { ... }

由于各种POJO方法调用可用,因此5.0之前的版本使用SpEL(Spring表达式语言)来调用POJO方法。与通常在方法中执行的实际工作相比,SpEL(即使是解释的)通常也“足够快”来执行这些操作。但是,从5.0版本开始,org.springframework.messaging.handler.invocation.InvocableHandlerMethod在可能的情况下默认使用。此技术通常比解释的SpEL执行速度更快,并且与其他Spring消息传递项目保持一致。InvocableHandlerMethod类似于在Spring MVC中用于调用控制器方法的技术。在使用SpEL时,仍然始终调用某些方法。例如,使用取消引用的属性注释的参数,如前所述。这是因为SpEL能够遍历属性路径。

我们可能没有考虑过一些其他极端情况,这些情况也不适用于InvocableHandlerMethod实例。出于这个原因,在这些情况下,我们会自动回退到使用SpEL。

如果需要,您还可以设置POJO方法,使其始终使用SpEL,并使用UseSpelInvoker注释,如下例所示

@UseSpelInvoker(compilerMode = "IMMEDIATE")
public void bar(String bar) { ... }

如果省略了compilerMode属性,则spring.expression.compiler.mode系统属性将确定编译器模式。有关已编译SpEL的更多信息,请参阅SpEL编译