Spring Integration 框架概述
Spring Integration 提供了 Spring 编程模型的扩展,以支持众所周知的 企业集成模式。它在基于 Spring 的应用程序中实现了轻量级消息传递,并通过声明式适配器支持与外部系统的集成。这些适配器提供了比 Spring 对远程处理、消息传递和调度支持更高层次的抽象。
Spring Integration 的主要目标是为构建企业集成解决方案提供一个简单的模型,同时保持对于生成可维护、可测试代码至关重要的关注点分离。
Spring Integration 概述
本章对 Spring Integration 的核心概念和组件进行了高级介绍。它包含一些编程技巧,可帮助您充分利用 Spring Integration。
背景
Spring Framework 的关键主题之一是控制反转 (IoC)。从广义上讲,这意味着框架代表在其上下文中管理的组件处理职责。组件本身得到简化,因为它们摆脱了这些职责。例如,依赖注入将组件从定位或创建其依赖项的责任中解放出来。同样,面向切面编程通过将通用横切关注点模块化为可重用切面,将业务组件从这些关注点中解放出来。在每种情况下,最终结果都是一个更容易测试、理解、维护和扩展的系统。
此外,Spring 框架和产品组合为构建企业应用程序提供了全面的编程模型。开发人员受益于该模型的一致性,尤其是它基于公认的最佳实践,例如面向接口编程和优先使用组合而非继承。Spring 简化的抽象和强大的支持库提高了开发人员的工作效率,同时提高了可测试性和可移植性。
Spring Integration 的动机与这些目标和原则相同。它将 Spring 编程模型扩展到消息传递领域,并建立在 Spring 现有企业集成支持的基础上,提供更高层次的抽象。它支持消息驱动架构,其中控制反转适用于运行时关注点,例如何时应运行某些业务逻辑以及应将响应发送到何处。它支持消息的路由和转换,以便可以集成不同的传输和不同的数据格式,而不会影响可测试性。换句话说,消息传递和集成关注点由框架处理。业务组件进一步与基础设施隔离,开发人员摆脱了复杂的集成职责。
作为 Spring 编程模型的扩展,Spring Integration 提供了各种配置选项,包括注解、带有命名空间支持的 XML、带有通用“bean”元素的 XML 以及直接使用底层 API。该 API 基于定义明确的策略接口和非侵入式、委托适配器。Spring Integration 的设计灵感来自于 Spring 中常见模式与 Gregor Hohpe 和 Bobby Woolf (Addison Wesley, 2004) 所著《企业集成模式》中描述的著名模式之间存在密切关联的认识。读过那本书的开发人员应该会立即熟悉 Spring Integration 的概念和术语。
目标和原则
Spring Integration 的目标如下:
-
提供一个简单的模型来实现复杂的企业集成解决方案。
-
促进基于 Spring 的应用程序中的异步、消息驱动行为。
-
促进现有 Spring 用户直观、渐进式的采用。
Spring Integration 遵循以下原则:
-
组件应松散耦合,以实现模块化和可测试性。
-
框架应强制分离业务逻辑和集成逻辑的关注点。
-
扩展点应具有抽象性质(但在明确定义的边界内),以促进重用和可移植性。
主要组件
从垂直角度看,分层架构有助于关注点分离,层间基于接口的契约促进了松散耦合。基于 Spring 的应用程序通常以此方式设计,Spring 框架和产品组合为遵循企业应用程序整个堆栈的这一最佳实践提供了坚实的基础。消息驱动架构增加了水平视角,但这些目标仍然相关。就像“分层架构”是一个极其通用和抽象的范式一样,消息传递系统通常遵循类似的抽象“管道和过滤器”模型。“过滤器”表示能够生产或消费消息的任何组件,“管道”在过滤器之间传输消息,以便组件本身保持松散耦合。重要的是要注意,这两种高级范式并非相互排斥。支持“管道”的底层消息传递基础设施仍应封装在一个层中,其契约定义为接口。同样,“过滤器”本身应在应用程序服务层逻辑上方的层中进行管理,以与 Web 层大致相同的方式通过接口与这些服务进行交互。
消息
在 Spring Integration 中,消息是任何 Java 对象的通用包装器,并结合了框架在处理该对象时使用的元数据。它由有效载荷和标头组成。有效载荷可以是任何类型,标头包含常用信息,例如 ID、时间戳、关联 ID 和返回地址。标头还用于将值传递给连接的传输和从连接的传输传递值。例如,当从收到的文件创建消息时,文件名可以存储在标头中,供下游组件访问。同样,如果消息的内容最终将由出站邮件适配器发送,则各种属性(收件人、发件人、抄送、主题等)可以由上游组件配置为消息标头值。开发人员还可以将任意键值对存储在标头中。
消息通道
消息通道代表管道和过滤器架构中的“管道”。生产者将消息发送到通道,消费者从通道接收消息。因此,消息通道解耦了消息传递组件,并为消息的拦截和监控提供了一个方便点。
消息通道可以遵循点对点或发布-订阅语义。对于点对点通道,发送到通道的每条消息最多只能由一个消费者接收。另一方面,发布-订阅通道尝试将每条消息广播给通道上的所有订阅者。Spring Integration 支持这两种模型。
虽然“点对点”和“发布-订阅”定义了最终有多少消费者接收每条消息的两种选项,但还有一个重要考虑因素:通道是否应该缓冲消息?在 Spring Integration 中,可轮询通道能够将消息缓冲在队列中。缓冲的优点是它允许限制入站消息,从而防止消费者过载。然而,正如名称所示,这也增加了一些复杂性,因为消费者只有在配置了轮询器的情况下才能从这样的通道接收消息。另一方面,连接到可订阅通道的消费者只是消息驱动的。消息通道实现详细讨论了 Spring Integration 中可用的各种通道实现。
消息端点
Spring Integration 的主要目标之一是通过控制反转简化企业集成解决方案的开发。这意味着您不应该直接实现消费者和生产者,甚至不应该构建消息并调用消息通道上的发送或接收操作。相反,您应该能够专注于自己的特定领域模型,并使用基于普通对象的实现。然后,通过提供声明性配置,您可以将您的领域特定代码“连接”到 Spring Integration 提供的消息传递基础设施。负责这些连接的组件是消息端点。这并不意味着您必须直接连接现有应用程序代码。任何实际的企业集成解决方案都需要一定数量的代码专注于集成关注点,例如路由和转换。重要的是在集成逻辑和业务逻辑之间实现关注点分离。换句话说,就像 Web 应用程序的 Model-View-Controller (MVC) 范式一样,目标应该是提供一个薄但专用的层,将入站请求转换为服务层调用,然后将服务层返回值转换为出站回复。下一节概述了处理这些职责的消息端点类型,在后续章节中,您将看到 Spring Integration 的声明性配置选项如何提供一种非侵入式的方式来使用其中每种类型。
消息端点
消息端点代表了管道和过滤器架构中的“过滤器”。如前所述,端点的主要作用是将应用程序代码连接到消息传递框架,并以非侵入式方式实现。换句话说,理想情况下,应用程序代码不应该知道消息对象或消息通道。这类似于 MVC 范式中控制器的作用。正如控制器处理 HTTP 请求一样,消息端点处理消息。正如控制器映射到 URL 模式一样,消息端点映射到消息通道。这两种情况下的目标都是相同的:将应用程序代码与基础设施隔离。这些概念和所有后续模式在《企业集成模式》一书中都有详细讨论。在这里,我们仅对 Spring Integration 支持的主要端点类型及其相关角色进行高级描述。后续章节将进行阐述并提供示例代码和配置示例。
消息转换器
消息转换器负责转换消息的内容或结构并返回修改后的消息。最常见的转换器类型可能是将消息的有效载荷从一种格式转换为另一种格式(例如从 XML 转换为 java.lang.String)。同样,转换器可以添加、删除或修改消息的标头值。
消息过滤器
消息过滤器决定是否应将消息传递到输出通道。这只需一个布尔测试方法,该方法可以检查特定的有效载荷内容类型、属性值、标头的存在或其他条件。如果消息被接受,它将发送到输出通道。如果未接受,它将被丢弃(或者,对于更严格的实现,可以抛出 Exception)。消息过滤器通常与发布-订阅通道结合使用,其中多个消费者可能会接收相同的消息,并使用过滤器的条件来缩小要处理的消息集。
| 请注意不要将管道和过滤器架构模式中“过滤器”的通用用法与这种特定的端点类型混淆,后者选择性地缩小了两个通道之间流动的消息范围。管道和过滤器概念中的“过滤器”与 Spring Integration 的消息端点更匹配:任何可以连接到消息通道以发送或接收消息的组件。 |
消息路由器
消息路由器负责决定接下来哪个或哪些通道(如果有)应该接收消息。通常,该决定基于消息的内容或消息头中可用的元数据。消息路由器通常用作服务激活器或其他能够发送回复消息的端点上静态配置的输出通道的动态替代方案。同样,消息路由器为多个订阅者使用的响应式消息过滤器提供了一种主动替代方案,如前所述。
聚合器
聚合器基本上是拆分器的镜像,它是一种消息端点,接收多条消息并将它们组合成一条消息。事实上,聚合器通常是包含拆分器的管道中的下游消费者。从技术上讲,聚合器比拆分器更复杂,因为它需要维护状态(要聚合的消息),决定何时所有消息都可用,并在必要时超时。此外,如果发生超时,聚合器需要知道是发送部分结果、丢弃它们,还是将它们发送到单独的通道。Spring Integration 提供了 CorrelationStrategy、ReleaseStrategy 以及用于超时、是否在超时时发送部分结果以及丢弃通道的可配置设置。
服务激活器
服务激活器是用于将服务实例连接到消息系统的通用端点。必须配置输入消息通道,并且如果将调用的服务方法能够返回值,也可以提供输出消息通道。
| 输出通道是可选的,因为每条消息也可以提供自己的“返回地址”标头。此规则适用于所有消费者端点。 |
服务激活器调用某个服务对象上的操作来处理请求消息,提取请求消息的有效负载并进行转换(如果该方法不期望消息类型的参数)。只要服务对象的方法返回值,该返回值也会在必要时转换为回复消息(如果它本身不是消息类型)。该回复消息将发送到输出通道。如果未配置输出通道,则回复将发送到消息“返回地址”中指定的通道(如果可用)。
请求-回复服务激活器端点将目标对象的方法连接到输入和输出消息通道。
| 如前所述,在消息通道中,通道可以是可轮询的或可订阅的。在上述图中,这通过“时钟”符号和实心箭头(轮询)以及虚线箭头(订阅)表示。 |
通道适配器
通道适配器是将消息通道连接到其他系统或传输的端点。通道适配器可以是入站或出站的。通常,通道适配器会在消息与从其他系统(文件、HTTP 请求、JMS 消息等)接收或发送到其他系统的任何对象或资源之间进行一些映射。根据传输方式,通道适配器还可以填充或提取消息头值。Spring Integration 提供了许多通道适配器,这些适配器将在后续章节中介绍。
MessageChannel。| 消息源可以是可轮询的(例如 POP3)或消息驱动的(例如 IMAP Idle)。在上述图中,这通过“时钟”符号和实心箭头(轮询)以及虚线箭头(消息驱动)表示。 |
MessageChannel 连接到目标系统。| 如前所述,在消息通道中,通道可以是可轮询的或可订阅的。在上述图中,这通过“时钟”符号和实心箭头(轮询)以及虚线箭头(订阅)表示。 |
端点 Bean 名称
消费端点(任何带有 inputChannel 的端点)由两个 bean 组成:消费者和消息处理器。消费者引用消息处理器,并在消息到达时调用它。
考虑以下 XML 示例:
<int:service-activator id = "someService" ... />
根据上述示例,bean 名称如下:
-
消费者:
someService(id) -
处理器:
someService.handler
使用企业集成模式 (EIP) 注解时,名称取决于几个因素。考虑以下带注解 POJO 的示例:
@Component
public class SomeComponent {
@ServiceActivator(inputChannel = ...)
public String someMethod(...) {
...
}
}
根据上述示例,bean 名称如下:
-
消费者:
someComponent.someMethod.serviceActivator -
处理器:
someComponent.someMethod.serviceActivator.handler
从版本 5.0.4 开始,您可以使用 @EndpointId 注解修改这些名称,如以下示例所示:
@Component
public class SomeComponent {
@EndpointId("someService")
@ServiceActivator(inputChannel = ...)
public String someMethod(...) {
...
}
}
根据上述示例,bean 名称如下:
-
消费者:
someService -
处理器:
someService.handler
@EndpointId 创建的名称与 XML 配置中的 id 属性创建的名称相同。考虑以下带注解 bean 的示例:
@Configuration
public class SomeConfiguration {
@Bean
@ServiceActivator(inputChannel = ...)
public MessageHandler someHandler() {
...
}
}
根据上述示例,bean 名称如下:
-
消费者:
someConfiguration.someHandler.serviceActivator -
处理器:
someHandler(@Bean名称)
从版本 5.0.4 开始,您可以使用 @EndpointId 注解修改这些名称,如以下示例所示:
@Configuration
public class SomeConfiguration {
@Bean("someService.handler") (1)
@EndpointId("someService") (2)
@ServiceActivator(inputChannel = ...)
public MessageHandler someHandler() {
...
}
}
| 1 | 处理器:someService.handler(bean 名称) |
| 2 | 消费者:someService(端点 ID) |
@EndpointId 注解创建的名称与 XML 配置中的 id 属性创建的名称相同,只要您遵循将 .handler 附加到 @Bean 名称的约定。
有一种特殊情况会创建第三个 bean:出于架构原因,如果 MessageHandler @Bean 未定义 AbstractReplyProducingMessageHandler,框架会将提供的 bean 包装在 ReplyProducingMessageHandlerWrapper 中。此包装器支持请求处理器建议处理并发出正常的“未产生回复”调试日志消息。其 bean 名称是处理器 bean 名称加上 .wrapper(当存在 @EndpointId 时——否则,它是正常生成的处理器名称)。
同样,可轮询消息源会创建两个 bean,一个 SourcePollingChannelAdapter (SPCA) 和一个 MessageSource。
考虑以下 XML 配置:
<int:inbound-channel-adapter id = "someAdapter" ... />
根据上述 XML 配置,bean 名称如下:
-
SPCA:
someAdapter(id) -
处理器:
someAdapter.source
考虑以下 POJO 的 Java 配置,以定义 @EndpointId:
@EndpointId("someAdapter")
@InboundChannelAdapter(channel = "channel3", poller = @Poller(fixedDelay = "5000"))
public String pojoSource() {
...
}
根据上述 Java 配置示例,bean 名称如下:
-
SPCA:
someAdapter -
处理器:
someAdapter.source
考虑以下 Bean 的 Java 配置,以定义 @EndpointID:
@Bean("someAdapter.source")
@EndpointId("someAdapter")
@InboundChannelAdapter(channel = "channel3", poller = @Poller(fixedDelay = "5000"))
public MessageSource<?> source() {
return () -> {
...
};
}
根据上述示例,bean 名称如下:
-
SPCA:
someAdapter -
处理器:
someAdapter.source(只要您遵循将.source附加到@Bean名称的约定)
配置和 @EnableIntegration
在本文档中,您将看到对 XML 命名空间支持的引用,用于在 Spring Integration 流中声明元素。此支持由一系列命名空间解析器提供,这些解析器生成适当的 bean 定义以实现特定组件。例如,许多端点由一个 MessageHandler bean 和一个 ConsumerEndpointFactoryBean 组成,其中注入了处理器和输入通道名称。
首次遇到 Spring Integration 命名空间元素时,框架会自动声明许多 bean(任务调度器、隐式通道创建器等),这些 bean 用于支持运行时环境。
版本 4.0 引入了 @EnableIntegration 注解,允许注册 Spring Integration 基础设施 bean(请参阅 Javadoc)。当只使用 Java 配置时,此注解是必需的——例如,在 Spring Boot 或 Spring Integration Messaging 注解支持以及 Spring Integration Java DSL 中,没有 XML 集成配置。 |
当您有一个没有 Spring Integration 组件的父上下文和两个或更多使用 Spring Integration 的子上下文时,@EnableIntegration 注解也很有用。它允许这些常见组件只在父上下文中声明一次。
@EnableIntegration 注解向应用程序上下文注册了许多基础设施组件。具体来说,它:
-
注册一些内置 bean,例如
errorChannel及其LoggingHandler、用于轮询器的taskScheduler、jsonPathSpEL 函数等。 -
添加多个
BeanFactoryPostProcessor实例,以增强BeanFactory以支持全局和默认集成环境。 -
添加多个
BeanPostProcessor实例,以增强或转换并包装特定 bean,以用于集成目的。 -
添加注解处理器,以解析消息注解并向应用程序上下文注册其组件。
@IntegrationComponentScan 注解还允许类路径扫描。此注解扮演着与标准 Spring Framework @ComponentScan 注解类似的角色,但它仅限于 Spring Integration 特有的组件和注解,这些是标准 Spring Framework 组件扫描机制无法触及的。有关示例,请参见@MessagingGateway 注解。
@EnablePublisher 注解注册了一个 PublisherAnnotationBeanPostProcessor bean,并为那些没有 channel 属性的 @Publisher 注解配置了 default-publisher-channel。如果找到多个 @EnablePublisher 注解,它们必须都具有相同的默认通道值。有关更多信息,请参阅使用 @Publisher 注解进行注解驱动配置。
@GlobalChannelInterceptor 注解已引入,用于将 ChannelInterceptor bean 标记为全局通道拦截。此注解是 <int:channel-interceptor> XML 元素的模拟(请参阅全局通道拦截器配置)。@GlobalChannelInterceptor 注解可以放置在类级别(带有 @Component 构造型注解)或 @Configuration 类中的 @Bean 方法上。在这两种情况下,bean 都必须实现 ChannelInterceptor。
从版本 5.1 开始,全局通道拦截器适用于动态注册的通道——例如,通过 beanFactory.initializeBean() 或在使用 Java DSL 时通过 IntegrationFlowContext 初始化的 bean。以前,在应用程序上下文刷新后创建 bean 时,拦截器不适用。
@IntegrationConverter 注解将 Converter、GenericConverter 或 ConverterFactory bean 标记为 integrationConversionService 的候选转换器。此注解是 <int:converter> XML 元素的模拟(请参阅有效载荷类型转换)。您可以将 @IntegrationConverter 注解放置在类级别(带有 @Component 构造型注解)或 @Configuration 类中的 @Bean 方法上。
有关消息注解的更多信息,请参阅注解支持。
编程注意事项
Spring Integration 中的大多数类,除非另有说明,都必须在应用程序上下文中声明为 bean,并且为单例。这意味着这些类的实例是线程安全的,它们的生命周期以及与其他组件的连接由 Spring 依赖注入容器管理。实用程序和构建器类(JacksonMessagingUtils、MessageBuilder、ExpressionEvalMap、IntegrationReactiveUtils 等)可以直接在 Java 代码中使用。但是,Java DSL 工厂和 IntegrationComponentSpec 实现结果仍必须注册为 bean 到应用程序上下文中。Session 抽象存在于许多模块中,它不是线程安全的,通常由 Factory 模式实现创建,并从线程安全的 Template 模式中使用。例如,请参见 SftpRemoteFileTemplate 及其与 DefaultSftpSessionFactory 的关系。
您应该尽可能使用普通 Java 对象(POJO)(用于目标逻辑中的消息处理),并且仅在绝对必要时才在代码中公开框架。有关更多信息,请参阅POJO 方法调用。
如果您的组件确实向您的类公开了框架,则需要考虑一些事项,尤其是在应用程序启动期间:
-
如果您的组件是
ApplicationContextAware,您通常不应在setApplicationContext()方法中使用ApplicationContext。相反,请存储一个引用,并将此类使用延迟到上下文生命周期的后期。 -
如果您的组件是
InitializingBean或使用@PostConstruct方法,请勿从这些初始化方法发送任何消息。在调用这些方法时,应用程序上下文尚未初始化,发送此类消息很可能会失败。如果您需要在启动期间发送消息,请实现ApplicationListener并等待ContextRefreshedEvent。或者,实现SmartLifecycle,将您的 bean 放在后期阶段,并从start()方法发送消息。
使用打包(例如,阴影)Jar 时的注意事项
Spring Integration 使用 Spring Framework 的 SpringFactories 机制通过加载几个 IntegrationConfigurationInitializer 类来引导某些功能。这包括 -core jar 以及其他一些 jar,包括 -http 和 -jmx。此过程的信息存储在每个 jar 中的 META-INF/spring.factories 文件中。
一些开发人员喜欢使用众所周知的工具(例如 Apache Maven Shade Plugin)将他们的应用程序和所有依赖项重新打包到单个 jar 中。
默认情况下,在生成阴影 jar 时,shade 插件不会合并 spring.factories 文件。
除了 spring.factories,其他 META-INF 文件(spring.handlers 和 spring.schemas)用于 XML 配置。这些文件也需要合并。
Spring Boot 的可执行 jar 机制采用了一种不同的方法,它嵌套了 jar,从而保留了类路径上的每个 spring.factories 文件。因此,对于 Spring Boot 应用程序,如果您使用其默认的可执行 jar 格式,则无需额外操作。 |
即使您不使用 Spring Boot,您仍然可以使用 Boot 提供的工具通过为上述文件添加转换器来增强 shade 插件。以下示例显示了如何配置插件:
...
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<keepDependenciesWithProvidedScope>true</keepDependenciesWithProvidedScope>
<createDependencyReducedPom>true</createDependencyReducedPom>
</configuration>
<dependencies>
<dependency> (1)
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring.boot.version}</version>
</dependency>
</dependencies>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers> (2)
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.handlers</resource>
</transformer>
<transformer
implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
<resource>META-INF/spring.factories</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
...
具体来说,
| 1 | 添加 spring-boot-maven-plugin 作为依赖项。 |
| 2 | 配置转换器。 |
您可以为 ${spring.boot.version} 添加一个属性或使用显式版本。
编程技巧
本节介绍了一些充分利用 Spring Integration 的方法。
XML 模式
使用 XML 配置时,为避免出现错误的模式验证错误,您应该使用“Spring 感知”IDE,例如 Spring Tool Suite (STS)、带有 Spring IDE 插件的 Eclipse 或 IntelliJ IDEA。这些 IDE 知道如何从类路径解析正确的 XML 模式(通过使用 jar 中的 META-INF/spring.schemas 文件)。在使用 STS 或带有插件的 Eclipse 时,您必须在项目上启用 Spring Project Nature。
由于兼容性原因,某些旧模块(1.0 版中存在的模块)在互联网上托管的模式是 1.0 版。如果您的 IDE 使用这些模式,您可能会看到错误。
这些在线模式中的每一个都包含一个类似于以下内容的警告:
|
此模式适用于 Spring Integration Core 的 1.0 版本。我们无法将其更新到当前模式,因为那会破坏使用 1.0.3 或更低版本的所有应用程序。对于后续版本,“无版本”模式将从类路径解析并从 jar 中获取。请参考 GitHub |
受影响的模块是:
-
核心(spring-integration.xsd) -
file -
http -
jms -
mail -
security -
stream -
ws -
xml
查找 Java 和 DSL 配置的类名
通过 XML 配置和 Spring Integration 命名空间支持,XML 解析器隐藏了目标 bean 如何声明和连接在一起。对于 Java 配置,了解目标最终用户应用程序的框架 API 非常重要。
EIP 实现的首要组件是 Message、Channel 和 Endpoint(请参阅本章前面的主要组件)。它们的实现(契约)是:
前两个足够简单,易于理解如何实现、配置和使用。最后一个值得更多关注:
AbstractEndpoint 在整个 Spring 框架中广泛用于不同的组件实现。其主要实现是:
-
EventDrivenConsumer,当我们订阅SubscribableChannel以监听消息时使用。 -
PollingConsumer,当我们从PollableChannel轮询消息时使用。
当您使用消息注解或 Java DSL 时,您无需担心这些组件,因为框架会自动使用适当的注解和 BeanPostProcessor 实现来生成它们。手动构建组件时,您应该使用 ConsumerEndpointFactoryBean 来帮助根据提供的 inputChannel 属性确定要创建的目标 AbstractEndpoint 消费者实现。
另一方面,ConsumerEndpointFactoryBean 委托给框架中的另一个核心组件 - org.springframework.messaging.MessageHandler。此接口实现的目标是处理由端点从通道消费的消息。Spring Integration 中的所有 EIP 组件都是 MessageHandler 实现(例如,AggregatingMessageHandler、MessageTransformingHandler、AbstractMessageSplitter 等)。目标协议出站适配器(FileWritingMessageHandler、HttpRequestExecutingMessageHandler、AbstractMqttMessageHandler 等)也是 MessageHandler 实现。当您使用 Java 配置开发 Spring Integration 应用程序时,您应该查看 Spring Integration 模块以查找适当的 MessageHandler 实现以用于 @ServiceActivator 配置。例如,要发送 XMPP 消息(请参阅XMPP 支持),您应该配置如下内容:
@Bean
@ServiceActivator(inputChannel = "input")
public MessageHandler sendChatMessageHandler(XMPPConnection xmppConnection) {
ChatMessageSendingMessageHandler handler = new ChatMessageSendingMessageHandler(xmppConnection);
DefaultXmppHeaderMapper xmppHeaderMapper = new DefaultXmppHeaderMapper();
xmppHeaderMapper.setRequestHeaderNames("*");
handler.setHeaderMapper(xmppHeaderMapper);
return handler;
}
MessageHandler 实现表示消息流的出站和处理部分。
入站消息流端有其自己的组件,分为轮询和监听行为。监听(消息驱动)组件很简单,通常只需要一个目标类实现即可准备好生成消息。监听组件可以是一向的 MessageProducerSupport 实现(例如 AbstractMqttMessageDrivenChannelAdapter 和 ImapIdleChannelAdapter),也可以是请求-回复 MessagingGatewaySupport 实现(例如 AmqpInboundGateway 和 AbstractWebServiceInboundGateway)。
轮询入站端点适用于那些不提供监听器 API 或不打算用于此类行为的协议,包括任何基于文件的协议(例如 FTP)、任何数据库(RDBMS 或 NoSQL)等。
这些入站端点由两个组件组成:轮询器配置,用于定期启动轮询任务,以及消息源类,用于从目标协议读取数据并为下游集成流生成消息。用于轮询器配置的第一类是 SourcePollingChannelAdapter。它是另一个 AbstractEndpoint 实现,但专门用于轮询以启动集成流。通常,使用消息注解或 Java DSL 时,您无需担心此类别。框架会根据 @InboundChannelAdapter 配置或 Java DSL 构建器规范为其生成一个 bean。
消息源组件对目标应用程序开发更为重要,它们都实现了 MessageSource 接口(例如,MongoDbMessageSource 和 AbstractTwitterMessageSource)。考虑到这一点,我们从 JDBC 的 RDBMS 表中读取数据的配置可能如下所示:
@Bean
@InboundChannelAdapter(value = "fooChannel", poller = @Poller(fixedDelay="5000"))
public MessageSource<?> storedProc(DataSource dataSource) {
return new JdbcPollingChannelAdapter(dataSource, "SELECT * FROM foo where status = 0");
}
您可以在特定的 Spring Integration 模块中(在大多数情况下,在相应的包中)找到目标协议所需的所有入站和出站类。例如,spring-integration-websocket 适配器是:
-
o.s.i.websocket.inbound.WebSocketInboundChannelAdapter:实现MessageProducerSupport以监听套接字上的帧并向通道生成消息。 -
o.s.i.websocket.outbound.WebSocketOutboundMessageHandler:单向AbstractMessageHandler实现,用于将传入消息转换为适当的帧并通过 WebSocket 发送。
如果您熟悉 Spring Integration XML 配置,从版本 4.3 开始,我们在 XSD 元素定义中提供了有关用于为适配器或网关声明 bean 的目标类的信息,如以下示例所示:
<xsd:element name="outbound-async-gateway">
<xsd:annotation>
<xsd:documentation>
Configures a Consumer Endpoint for the 'o.s.i.amqp.outbound.AsyncAmqpOutboundGateway'
that will publish an AMQP Message to the provided Exchange and expect a reply Message.
The sending thread returns immediately; the reply is sent asynchronously; uses 'AsyncRabbitTemplate.sendAndReceive()'.
</xsd:documentation>
</xsd:annotation>
POJO 方法调用
如编程注意事项中所述,我们建议使用 POJO 编程风格,如以下示例所示:
@ServiceActivator
public String myService(String payload) { ... }
在这种情况下,框架会提取一个 String 有效载荷,调用您的方法,并将结果包装在消息中发送到流中的下一个组件(原始标头会复制到新消息中)。实际上,如果您使用 XML 配置,您甚至不需要 @ServiceActivator 注解,如以下配对示例所示:
<int:service-activator ... ref="myPojo" method="myService" />
public String myService(String payload) { ... }
只要类上的公共方法没有歧义,您就可以省略 method 属性。
您也可以在 POJO 方法中获取标头信息,如以下示例所示:
@ServiceActivator
public String myService(@Payload String payload, @Header("foo") String fooHeader) { ... }
您还可以取消引用消息上的属性,如以下示例所示:
@ServiceActivator
public String myService(@Payload("payload.foo") String foo, @Header("bar.baz") String barbaz) { ... }
由于存在各种 POJO 方法调用,5.0 之前的版本使用 SpEL(Spring 表达式语言)来调用 POJO 方法。与这些方法中通常完成的实际工作相比,SpEL(甚至是解释性的)对于这些操作通常“足够快”。但是,从 5.0 版本开始,默认情况下尽可能使用 org.springframework.messaging.handler.invocation.InvocableHandlerMethod。这种技术通常比解释性的 SpEL 执行速度更快,并且与 Spring 的其他消息传递项目保持一致。InvocableHandlerMethod 类似于 Spring MVC 中用于调用控制器方法的技术。在某些情况下,仍然总是使用 SpEL 调用某些方法。例如,如前所述,带有取消引用属性的带注解参数。这是因为 SpEL 能够导航属性路径。
可能还有一些我们没有考虑到的其他特殊情况,它们也无法与 InvocableHandlerMethod 实例一起使用。因此,在这些情况下,我们会自动回退到使用 SpEL。
如果您愿意,您还可以设置您的 POJO 方法,使其始终使用 SpEL,使用 UseSpelInvoker 注解,如以下示例所示:
@UseSpelInvoker(compilerMode = "IMMEDIATE")
public void bar(String bar) { ... }
如果省略了 compilerMode 属性,则 spring.expression.compiler.mode 系统属性决定了编译器模式。有关编译后的 SpEL 的更多信息,请参阅 SpEL 编译。