消息端点

本章的第一部分介绍了一些背景理论,并揭示了驱动 Spring 集成各种消息组件的底层 API 的一些信息。如果您想真正了解幕后发生的事情,这些信息可能会有所帮助。但是,如果您想使用简化的基于命名空间的配置来运行各种元素,请随时跳到 端点命名空间支持

如概述中所述,消息端点负责将各种消息组件连接到通道。在接下来的几章中,我们将介绍许多不同的组件,这些组件会消耗消息。其中一些也能够发送回复消息。发送消息非常简单。如前面在 消息通道 中所示,您可以将消息发送到消息通道。但是,接收消息稍微复杂一些。主要原因是存在两种类型的消费者:轮询消费者事件驱动消费者

在这两者中,事件驱动消费者要简单得多。它们不需要管理和调度单独的轮询线程,本质上是带有回调方法的监听器。当连接到 Spring 集成的可订阅消息通道之一时,这种简单选项非常有效。但是,当连接到缓冲的、可轮询的消息通道时,某些组件必须调度和管理轮询线程。Spring 集成提供两种不同的端点实现来适应这两种类型的消费者。因此,消费者本身只需要实现回调接口。当需要轮询时,端点充当消费实例的容器。这种好处类似于使用容器来托管消息驱动 Bean,但是,由于这些消费者是在 ApplicationContext 中运行的 Spring 管理对象,因此它更类似于 Spring 自己的 MessageListener 容器。

消息处理程序

Spring 集成的 MessageHandler 接口由框架中的许多组件实现。换句话说,这不是公共 API 的一部分,您通常不会直接实现 MessageHandler。但是,它被消息消费者用来实际处理消费的消息,因此了解这种策略接口有助于理解消费者的整体作用。该接口定义如下

public interface MessageHandler {

    void handleMessage(Message<?> message);

}

尽管它很简单,但这个接口为以下章节中介绍的大多数组件(路由器、转换器、拆分器、聚合器、服务激活器等)提供了基础。这些组件对它们处理的消息执行非常不同的功能,但实际接收消息的要求是相同的,轮询和事件驱动行为之间的选择也是相同的。Spring 集成提供两种端点实现,它们托管这些基于回调的处理程序,并允许它们连接到消息通道。

事件驱动消费者

由于事件驱动消费者更简单,我们先介绍它。您可能还记得,SubscribableChannel 接口提供了一个 subscribe() 方法,该方法接受一个 MessageHandler 参数(如 SubscribableChannel 中所示)。以下清单显示了 subscribe 方法的定义

subscribableChannel.subscribe(messageHandler);

由于订阅了某个通道的处理程序无需主动轮询该通道,因此这是一个事件驱动消费者,Spring Integration 提供的实现接受一个 SubscribableChannel 和一个 MessageHandler,如下例所示

SubscribableChannel channel = context.getBean("subscribableChannel", SubscribableChannel.class);

EventDrivenConsumer consumer = new EventDrivenConsumer(channel, exampleHandler);

轮询消费者

Spring Integration 还提供了一个 PollingConsumer,它可以以相同的方式实例化,只是通道必须实现 PollableChannel,如下例所示

PollableChannel channel = context.getBean("pollableChannel", PollableChannel.class);

PollingConsumer consumer = new PollingConsumer(channel, exampleHandler);
有关轮询消费者的更多信息,请参见 通道适配器通道适配器

轮询消费者还有许多其他配置选项。以下示例显示了如何设置触发器

PollingConsumer consumer = new PollingConsumer(channel, handler);

consumer.setTrigger(new PeriodicTrigger(Duration.ofSeconds(30)));

PeriodicTrigger 通常使用一个简单的间隔(Duration)定义,但也支持 initialDelay 属性和一个布尔 fixedRate 属性(默认值为 false,即没有固定延迟)。以下示例设置了这两个属性

PeriodicTrigger trigger = new PeriodicTrigger(Duration.ofSeconds(1));
trigger.setInitialDelay(Duration.ofSeconds(5));
trigger.setFixedRate(true);

前面示例中三个设置的结果是一个触发器,它等待五秒钟,然后每秒触发一次。

CronTrigger 需要一个有效的 cron 表达式。有关详细信息,请参见 Javadoc。以下示例设置了一个新的 CronTrigger

CronTrigger trigger = new CronTrigger("*/10 * * * * MON-FRI");

前面示例中定义的触发器的结果是一个触发器,它每十秒触发一次,周一到周五。

轮询端点的默认触发器是一个 PeriodicTrigger 实例,固定延迟时间为 1 秒。

除了触发器之外,您还可以指定两个其他与轮询相关的配置属性:maxMessagesPerPollreceiveTimeout。以下示例显示了如何设置这两个属性

PollingConsumer consumer = new PollingConsumer(channel, handler);

consumer.setMaxMessagesPerPoll(10);
consumer.setReceiveTimeout(5000);

maxMessagesPerPoll 属性指定在给定轮询操作中接收的最大消息数。这意味着轮询器会继续调用 receive(),而不等待,直到返回 null 或达到最大值。例如,如果轮询器具有十秒钟的间隔触发器和 maxMessagesPerPoll 设置为 25,并且它正在轮询一个队列中有 100 条消息的通道,则可以在 40 秒内检索所有 100 条消息。它获取 25 条消息,等待十秒钟,获取接下来的 25 条消息,依此类推。如果 maxMessagesPerPoll 配置为负值,则 MessageSource.receive() 会在单个轮询周期内被调用,直到它返回 null。从 5.5 版本开始,0 值具有特殊含义 - 完全跳过 MessageSource.receive() 调用,这可以被认为是暂停此轮询端点,直到 maxMessagesPerPoll 在以后的时间更改为非零值,例如通过控制总线。

receiveTimeout 属性指定了轮询器在调用接收操作时,如果没有可用消息,应该等待的时间。例如,考虑两种表面上看起来相似但实际上截然不同的选项:第一个选项的间隔触发器为 5 秒,接收超时为 50 毫秒,而第二个选项的间隔触发器为 50 毫秒,接收超时为 5 秒。第一个选项可能比它在通道上接受消息的时间晚最多 4950 毫秒接收消息(如果该消息在它的轮询调用之一返回后立即到达)。另一方面,第二个配置永远不会错过超过 50 毫秒的消息。区别在于第二个选项需要一个线程等待。但是,结果是它可以更快地响应到达的消息。这种技术被称为“长轮询”,可以用来模拟轮询源上的事件驱动行为。

轮询消费者也可以委托给 Spring TaskExecutor,如下例所示

PollingConsumer consumer = new PollingConsumer(channel, handler);

TaskExecutor taskExecutor = context.getBean("exampleExecutor", TaskExecutor.class);
consumer.setTaskExecutor(taskExecutor);

此外,PollingConsumer 具有一个名为 adviceChain 的属性。此属性允许您指定一个 List,其中包含用于处理额外横切关注点的 AOP 建议,包括事务。这些建议应用于 doPoll() 方法周围。有关更深入的信息,请参阅 端点命名空间支持 下的 AOP 建议链和事务支持部分。另请参阅 @Poller 注释 Javadocs 和相应的 消息传递注释支持 部分。Java DSL 还提供了一个 .poller() 端点配置选项,以及它相应的 Pollers 工厂。

前面的示例展示了依赖项查找。但是,请记住,这些消费者通常配置为 Spring bean 定义。事实上,Spring Integration 还提供了一个名为 ConsumerEndpointFactoryBeanFactoryBean,它根据通道类型创建相应的消费者类型。此外,Spring Integration 具有完整的 XML 命名空间支持,可以进一步隐藏这些细节。本指南中介绍了基于命名空间的配置,因为每个组件类型都被介绍。

许多 MessageHandler 实现可以生成回复消息。如前所述,与接收消息相比,发送消息非常简单。然而,何时以及发送多少条回复消息取决于处理程序类型。例如,聚合器等待一定数量的消息到达,并且通常配置为拆分器的下游消费者,拆分器可以为它处理的每条消息生成多个回复。使用命名空间配置时,您并不严格需要了解所有细节。但是,了解以下内容可能仍然值得:这些组件中的几个共享一个公共基类,AbstractReplyProducingMessageHandler,并且它提供了一个 setOutputChannel(..) 方法。

端点命名空间支持

在本参考手册中,您可以找到针对端点元素的特定配置示例,例如路由器、转换器、服务激活器等等。大多数这些元素支持一个input-channel属性,并且许多元素支持一个output-channel属性。解析后,这些端点元素会生成一个PollingConsumerEventDrivenConsumer实例,具体取决于所引用input-channel的类型:分别是PollableChannelSubscribableChannel。当通道可轮询时,轮询行为基于端点元素的poller子元素及其属性。

以下是poller的所有可用配置选项列表。

<int:poller cron=""                                  (1)
            default="false"                          (2)
            error-channel=""                         (3)
            fixed-delay=""                           (4)
            fixed-rate=""                            (5)
            initial-delay=""                         (6)
            id=""                                    (7)
            max-messages-per-poll=""                 (8)
            receive-timeout=""                       (9)
            ref=""                                   (10)
            task-executor=""                         (11)
            time-unit="MILLISECONDS"                 (12)
            trigger="">                              (13)
            <int:advice-chain />                     (14)
            <int:transactional />                    (15)
</int:poller>
1 提供使用 Cron 表达式配置轮询器的功能。底层实现使用org.springframework.scheduling.support.CronTrigger。如果设置了此属性,则以下属性均不可指定:fixed-delaytriggerfixed-rateref
2 通过将此属性设置为true,您可以定义一个全局默认轮询器。如果在应用程序上下文中定义了多个默认轮询器,则会引发异常。任何连接到PollableChannelPollingConsumer)或任何没有显式配置轮询器的SourcePollingChannelAdapter的端点都会使用全局默认轮询器。默认值为false。可选。
3 标识在该轮询器调用中发生故障时将错误消息发送到的通道。要完全抑制异常,您可以提供对nullChannel的引用。可选。
4 固定延迟触发器在内部使用PeriodicTrigger。数值以time-unit表示,也可以使用持续时间格式(从版本 6.2 开始),例如PT10SP1D。如果设置了此属性,则以下属性均不可指定:fixed-ratetriggercronref
5 固定速率触发器在内部使用PeriodicTrigger。数值以time-unit表示,也可以使用持续时间格式(从版本 6.2 开始),例如PT10SP1D。如果设置了此属性,则以下属性均不可指定:fixed-delaytriggercronref
6 PeriodicTrigger的初始延迟(从版本 6.2 开始)。数值以time-unit表示,也可以使用持续时间格式,例如PT10SP1D
7 引用轮询器底层 bean 定义的 ID,该定义的类型为org.springframework.integration.scheduling.PollerMetadataid属性对于顶级轮询器元素是必需的,除非它是默认轮询器(default="true")。
8 有关更多信息,请参阅 配置入站通道适配器。如果未指定,默认值取决于上下文。如果您使用 PollingConsumer,此属性默认为 -1。但是,如果您使用 SourcePollingChannelAdapter,则 max-messages-per-poll 属性默认为 1。可选。
9 值设置在底层类 PollerMetadata 上。如果未指定,则默认为 1000(毫秒)。可选。
10 指向另一个顶级轮询器的 Bean 引用。ref 属性不能出现在顶级 poller 元素上。但是,如果设置了此属性,则不能指定以下任何属性:fixed-ratetriggercronfixed-delay
11 提供引用自定义任务执行器的能力。有关更多信息,请参阅 TaskExecutor 支持。可选。
12 此属性指定底层 org.springframework.scheduling.support.PeriodicTrigger 上的 java.util.concurrent.TimeUnit 枚举值。因此,此属性只能与 fixed-delayfixed-rate 属性结合使用。如果与 crontrigger 引用属性结合使用,则会导致失败。PeriodicTrigger 支持的最小粒度为毫秒。因此,唯一可用的选项是毫秒和秒。如果未提供此值,则任何 fixed-delayfixed-rate 值都将解释为毫秒。基本上,此枚举为基于秒的间隔触发器值提供便利。对于每小时、每天和每月设置,我们建议使用 cron 触发器。
13 指向任何实现 org.springframework.scheduling.Trigger 接口的 Spring 配置 Bean 的引用。但是,如果设置了此属性,则不能指定以下任何属性:fixed-delayfixed-ratecronref。可选。
14 允许指定额外的 AOP 建议来处理额外的横切关注点。有关更多信息,请参阅 事务。可选。
15 轮询器可以进行事务处理。有关更多信息,请参阅 AOP 建议链。可选。

示例

一个简单的基于间隔的轮询器,间隔为 1 秒,可以按如下方式配置

<int:transformer input-channel="pollable"
    ref="transformer"
    output-channel="output">
    <int:poller fixed-rate="1000"/>
</int:transformer>

作为使用 fixed-rate 属性的替代方法,您也可以使用 fixed-delay 属性。

对于基于 Cron 表达式的轮询器,请使用 cron 属性,如下例所示

<int:transformer input-channel="pollable"
    ref="transformer"
    output-channel="output">
    <int:poller cron="*/10 * * * * MON-FRI"/>
</int:transformer>

如果输入通道是 PollableChannel,则需要轮询器配置。具体来说,如前所述,triggerPollingConsumer 类的必需属性。因此,如果您省略了轮询消费者端点配置的 poller 子元素,则可能会抛出异常。如果您尝试在连接到不可轮询通道的元素上配置轮询器,也可能会抛出异常。

也可以创建顶层轮询器,在这种情况下,只需要一个ref属性,如下例所示

<int:poller id="weekdayPoller" cron="*/10 * * * * MON-FRI"/>

<int:transformer input-channel="pollable"
    ref="transformer"
    output-channel="output">
    <int:poller ref="weekdayPoller"/>
</int:transformer>
ref属性仅允许在内部轮询器定义中使用。在顶层轮询器上定义此属性会导致在应用程序上下文初始化期间抛出配置异常。

全局默认轮询器

为了进一步简化配置,您可以定义一个全局默认轮询器。XML DSL 中的单个顶层轮询器组件可以将default属性设置为true。对于 Java 配置,在这种情况下,必须声明一个名为PollerMetadata.DEFAULT_POLLERPollerMetadata bean。在这种情况下,任何输入通道为PollableChannel的端点,如果在同一个ApplicationContext中定义,并且没有显式配置的poller,都会使用该默认值。以下示例展示了这样一个轮询器和一个使用它的转换器

  • Java DSL

  • Java

  • Kotlin DSL

  • XML

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setMaxMessagesPerPoll(5);
    pollerMetadata.setTrigger(new PeriodicTrigger(3000));
    return pollerMetadata;
}

// No 'poller' attribute because there is a default global poller
@Bean
public IntegrationFlow transformFlow(MyTransformer transformer) {
    return IntegrationFlow.from(MessageChannels.queue("pollable"))
                           .transform(transformer) // No 'poller' attribute because there is a default global poller
                           .channel("output")
                           .get();
}
@Bean(PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setMaxMessagesPerPoll(5);
    pollerMetadata.setTrigger(new PeriodicTrigger(3000));
    return pollerMetadata;
}

@Bean
public QueueChannel pollable() {
   return new QueueChannel();
}
// No 'poller' attribute because there is a default global poller
@Transformer(inputChannel = "pollable", outputChannel = "output")
public Object transform(Object payload) {
    ...
}
@Bean(PollerMetadata.DEFAULT_POLLER)
fun defaultPoller() =
    PollerMetadata()
        .also {
            it.maxMessagesPerPoll = 5
            it.trigger = PeriodicTrigger(3000)
        }

@Bean
fun convertFlow() =
    integrationFlow(MessageChannels.queue("pollable")) {
    	transform(transformer) // No 'poller' attribute because there is a default global poller
    	channel("output")
    }
<int:poller id="defaultPoller" default="true" max-messages-per-poll="5" fixed-delay="3000"/>

<!-- No <poller/> sub-element is necessary, because there is a default -->
<int:transformer input-channel="pollable"
                 ref="transformer"
                 output-channel="output"/>

事务支持

Spring Integration 还为轮询器提供事务支持,以便每个接收和转发操作都可以作为原子工作单元执行。要为轮询器配置事务,请添加<transactional/>子元素。以下示例展示了可用的属性

<int:poller fixed-delay="1000">
    <int:transactional transaction-manager="txManager"
                       propagation="REQUIRED"
                       isolation="REPEATABLE_READ"
                       timeout="10000"
                       read-only="false"/>
</int:poller>

有关更多信息,请参阅轮询器事务支持

AOP 通知链

由于 Spring 事务支持依赖于使用TransactionInterceptor(AOP 通知)处理由轮询器启动的消息流的事务行为的代理机制,因此您有时需要提供额外的通知来处理与轮询器相关的其他横切行为。为此,poller 定义了一个advice-chain 元素,允许您在实现MethodInterceptor 接口的类中添加更多通知。以下示例展示了如何为poller 定义advice-chain

<int:service-activator id="advicedSa" input-channel="goodInputWithAdvice" ref="testBean"
		method="good" output-channel="output">
	<int:poller max-messages-per-poll="1" fixed-rate="10000">
		 <int:advice-chain>
			<ref bean="adviceA" />
			<beans:bean class="org.something.SampleAdvice" />
			<ref bean="txAdvice" />
		</int:advice-chain>
	</int:poller>
</int:service-activator>

有关如何实现MethodInterceptor 接口的更多信息,请参阅Spring 框架参考指南中的 AOP 部分。通知链也可以应用于没有事务配置的轮询器,让您增强由轮询器启动的消息流的行为。

使用通知链时,不能指定<transactional/> 子元素。相反,声明一个<tx:advice/> bean 并将其添加到<advice-chain/> 中。有关完整的配置详细信息,请参阅轮询器事务支持

TaskExecutor 支持

轮询线程可以由 Spring 的TaskExecutor 抽象的任何实例执行。这为端点或端点组启用并发。从 Spring 3.0 开始,核心 Spring 框架有一个task 命名空间,其<executor/> 元素支持创建简单的线程池执行器。该元素接受用于常见并发设置的属性,例如池大小和队列容量。配置线程池执行器可以对端点在负载下的执行方式产生重大影响。这些设置可用于每个端点,因为端点的性能是需要考虑的主要因素之一(另一个主要因素是端点订阅的通道上的预期流量)。要为使用 XML 命名空间支持配置的轮询端点启用并发,请在其<poller/> 元素上提供task-executor 引用,然后提供以下示例中显示的一个或多个属性

<int:poller task-executor="pool" fixed-rate="1000"/>

<task:executor id="pool"
               pool-size="5-25"
               queue-capacity="20"
               keep-alive="120"/>

如果您没有提供任务执行器,则消费者的处理程序将在调用者的线程中调用。请注意,调用者通常是默认的 TaskScheduler(请参阅 配置任务调度器)。您还应该记住,task-executor 属性可以通过指定 bean 名称来提供对 Spring 的 TaskExecutor 接口的任何实现的引用。前面显示的 executor 元素是为了方便起见提供的。

轮询消费者背景部分 中所述,您还可以配置轮询消费者以模拟事件驱动的行为。使用较长的接收超时时间和较短的触发器间隔,您可以确保即使在轮询的消息源上也能及时响应到达的消息。请注意,这仅适用于具有超时阻塞等待调用的源。例如,文件轮询器不会阻塞。每次 receive() 调用都会立即返回,并且包含新文件或不包含新文件。因此,即使轮询器包含较长的 receive-timeout,该值在这种情况下也永远不会被使用。另一方面,当使用 Spring Integration 自身的基于队列的通道时,超时值确实有机会参与。以下示例显示了轮询消费者如何几乎立即接收消息

<int:service-activator input-channel="someQueueChannel"
    output-channel="output">
    <int:poller receive-timeout="30000" fixed-rate="10"/>

</int:service-activator>

使用这种方法不会带来太多开销,因为在内部,它只不过是一个定时等待线程,它不需要像(例如)一个抖动、无限的 while 循环那样消耗大量的 CPU 资源。

在运行时更改轮询速率

当使用 fixed-delayfixed-rate 属性配置轮询器时,默认实现使用 PeriodicTrigger 实例。PeriodicTrigger 是核心 Spring 框架的一部分。它只接受间隔作为构造函数参数。因此,它不能在运行时更改。

但是,您可以定义自己的 org.springframework.scheduling.Trigger 接口实现。您甚至可以使用 PeriodicTrigger 作为起点。然后,您可以为间隔(周期)添加一个 setter,或者您甚至可以在触发器本身中嵌入自己的节流逻辑。period 属性在每次调用 nextExecutionTime 以调度下一个轮询时使用。要在轮询器中使用此自定义触发器,请在您的应用程序上下文中声明自定义触发器的 bean 定义,并使用 trigger 属性将依赖项注入到您的轮询器配置中,该属性引用自定义触发器 bean 实例。您现在可以获取对触发器 bean 的引用,并在轮询之间更改轮询间隔。

例如,请参阅 Spring Integration 示例 项目。它包含一个名为 dynamic-poller 的示例,该示例使用自定义触发器并演示了在运行时更改轮询间隔的能力。

该示例提供了一个实现 org.springframework.scheduling.Trigger 接口的自定义触发器。该示例的触发器基于 Spring 的 PeriodicTrigger 实现。但是,自定义触发器的字段不是最终的,并且属性具有显式的 getter 和 setter,允许您在运行时动态更改轮询周期。

需要注意的是,由于 Trigger 方法是 nextExecutionTime(),因此对动态触发器的任何更改都不会立即生效,而是在基于现有配置的下次轮询时生效。无法强制触发器在其当前配置的下次执行时间之前触发。

有效负载类型转换

在本参考手册中,您还可以看到各种端点的特定配置和实现示例,这些端点接受消息或任何任意 Object 作为输入参数。在 Object 的情况下,此类参数将映射到消息有效负载或有效负载的一部分或标头(当使用 Spring 表达式语言时)。但是,端点方法的输入参数类型有时与有效负载类型或其部分类型不匹配。在这种情况下,我们需要执行类型转换。Spring Integration 提供了一种便捷的方式,可以在其自己的转换服务 bean 实例(名为 integrationConversionService)中注册类型转换器(通过使用 Spring ConversionService)。该 bean 在使用 Spring Integration 基础设施定义第一个转换器时自动创建。要注册转换器,您可以实现 org.springframework.core.convert.converter.Converterorg.springframework.core.convert.converter.GenericConverterorg.springframework.core.convert.converter.ConverterFactory

Converter 实现是最简单的,它将从一种类型转换为另一种类型。为了更复杂的操作,例如转换为类层次结构,您可以实现 GenericConverter,甚至可能实现 ConditionalConverter。这些为您提供了对 fromto 类型描述符的完全访问权限,从而能够进行复杂的转换。例如,如果您有一个名为 Something 的抽象类作为转换的目标(参数类型、通道数据类型等),您有两个名为 Thing1Thing 的具体实现,并且您希望根据输入类型转换为其中一个,那么 GenericConverter 将是一个不错的选择。有关更多信息,请参阅这些接口的 Javadoc

实现转换器后,您可以使用方便的命名空间支持进行注册,如下例所示

<int:converter ref="sampleConverter"/>

<bean id="sampleConverter" class="foo.bar.TestConverter"/>

或者,您可以使用内部 bean,如下例所示

<int:converter>
    <bean class="o.s.i.config.xml.ConverterParserTests$TestConverter3"/>
</int:converter>

从 Spring Integration 4.0 开始,您可以使用注解来创建前面的配置,如下例所示

@Component
@IntegrationConverter
public class TestConverter implements Converter<Boolean, Number> {

	public Number convert(Boolean source) {
		return source ? 1 : 0;
	}

}

或者,您可以使用 @Configuration 注解,如下例所示

@Configuration
@EnableIntegration
public class ContextConfiguration {

	@Bean
	@IntegrationConverter
	public SerializingConverter serializingConverter() {
		return new SerializingConverter();
	}

}

在配置应用程序上下文时,Spring 框架允许您添加 conversionService bean(请参阅 配置 ConversionService 章节)。此服务在需要时用于在 bean 创建和配置期间执行适当的转换。

相反,integrationConversionService 用于运行时转换。这些用途截然不同。旨在用于连接 bean 构造函数参数和属性的转换器,如果在运行时用于 Spring Integration 表达式评估(针对数据类型通道、有效负载类型转换器等中的消息),可能会产生意想不到的结果。

但是,如果您确实想使用 Spring conversionService 作为 Spring Integration integrationConversionService,您可以在应用程序上下文中配置别名,如下例所示

<alias name="conversionService" alias="integrationConversionService"/>

在这种情况下,conversionService 提供的转换器可用于 Spring Integration 运行时转换。

内容类型转换

从 5.0 版本开始,默认情况下,方法调用机制基于 org.springframework.messaging.handler.invocation.InvocableHandlerMethod 基础结构。它的 HandlerMethodArgumentResolver 实现(例如 PayloadArgumentResolverMessageMethodArgumentResolver)可以使用 MessageConverter 抽象来将传入的 payload 转换为目标方法参数类型。转换可以基于 contentType 消息头。为此,Spring Integration 提供了 ConfigurableCompositeMessageConverter,它委托给注册的转换器列表,直到其中一个返回非空结果。默认情况下,此转换器提供(严格顺序)

有关其用途和转换的适当contentType值的更多信息,请参阅 Javadoc(在前面的列表中链接)。ConfigurableCompositeMessageConverter 用于因为它可以与任何其他MessageConverter 实现一起使用,包括或排除前面提到的默认转换器。它也可以在应用程序上下文中注册为适当的 bean,覆盖默认转换器,如下例所示

@Bean(name = IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
public ConfigurableCompositeMessageConverter compositeMessageConverter() {
    List<MessageConverter> converters =
        Arrays.asList(new MarshallingMessageConverter(jaxb2Marshaller()),
                 new JavaSerializationMessageConverter());
    return new ConfigurableCompositeMessageConverter(converters);
}

这两个新的转换器在默认值之前在复合中注册。您也可以不使用ConfigurableCompositeMessageConverter,而是通过注册一个名为integrationArgumentResolverMessageConverter 的 bean 来提供您自己的MessageConverter(通过设置IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME 属性)。

当使用 SpEL 方法调用时,基于MessageConverter(包括contentType 标头)的转换不可用。在这种情况下,只有上面在有效负载类型转换中提到的常规类到类转换可用。

异步轮询

如果您希望轮询是异步的,轮询器可以选择指定一个task-executor 属性,该属性指向任何TaskExecutor bean 的现有实例(Spring 3.0 通过task 命名空间提供了一个方便的命名空间配置)。但是,在使用TaskExecutor 配置轮询器时,您必须了解某些事项。

问题在于有两个配置,轮询器和TaskExecutor。它们必须相互协调。否则,您可能会最终创建一个人工内存泄漏。

考虑以下配置

<int:channel id="publishChannel">
    <int:queue />
</int:channel>

<int:service-activator input-channel="publishChannel" ref="myService">
	<int:poller receive-timeout="5000" task-executor="taskExecutor" fixed-rate="50" />
</int:service-activator>

<task:executor id="taskExecutor" pool-size="20" />

前面的配置演示了一个不协调的配置。

默认情况下,任务执行器拥有一个无界任务队列。即使所有线程都处于阻塞状态,等待新消息到达或超时,轮询器也会持续调度新任务。假设有 20 个线程执行任务,每个任务的超时时间为 5 秒,则每秒执行 4 个任务。然而,新任务的调度速度为每秒 20 个,因此任务执行器内部队列的增长速度为每秒 16 个(在进程空闲时),这会导致内存泄漏。

解决此问题的一种方法是设置任务执行器的 `queue-capacity` 属性。即使设置为 0 也是合理的。您也可以通过指定对无法入队的消息的处理方式来管理队列,例如设置任务执行器的 `rejection-policy` 属性为 `DISCARD`。换句话说,在配置 `TaskExecutor` 时,您需要了解一些细节。有关此主题的更多信息,请参阅 Spring 参考手册中的 “任务执行和调度” 部分。

端点内部 Bean

许多端点都是复合 Bean。这包括所有消费者和所有轮询的入站通道适配器。消费者(轮询或事件驱动)委托给 `MessageHandler`。轮询适配器通过委托给 `MessageSource` 来获取消息。通常,获取对委托 Bean 的引用非常有用,例如在运行时更改配置或进行测试。这些 Bean 可以从 `ApplicationContext` 中使用已知名称获取。`MessageHandler` 实例使用类似于 `someConsumer.handler` 的 Bean ID 注册到应用程序上下文(其中 'consumer' 是端点的 `id` 属性的值)。`MessageSource` 实例使用类似于 `somePolledAdapter.source` 的 Bean ID 注册,其中 'somePolledAdapter' 是适配器的 ID。

以上内容仅适用于框架组件本身。您也可以使用内部 Bean 定义,如下例所示

<int:service-activator id="exampleServiceActivator" input-channel="inChannel"
            output-channel = "outChannel" method="foo">
    <beans:bean class="org.foo.ExampleServiceActivator"/>
</int:service-activator>

该 Bean 被视为任何声明的内部 Bean,不会注册到应用程序上下文。如果您希望以其他方式访问此 Bean,请在顶层声明它并使用 `id`,然后使用 `ref` 属性。有关更多信息,请参阅 Spring 文档