消息端点

本章的第一部分涵盖了一些背景理论,并揭示了驱动 Spring Integration 各种消息组件的底层 API 的大量信息。如果您想真正了解幕后发生的事情,这些信息可能会有所帮助。但是,如果您想使用各种元素的简化基于命名空间的配置快速上手,可以先跳到Endpoint 命名空间支持

如概述中所述,消息端点负责将各种消息组件连接到通道。在接下来的几章中,我们将介绍许多不同的组件,这些组件会消费消息。其中一些组件也能够发送回复消息。发送消息非常简单。如消息通道中前面所示,您可以将消息发送到消息通道。但是,接收消息稍微复杂一些。主要原因是存在两种类型的消费者:轮询消费者事件驱动消费者

在这两者中,事件驱动消费者要简单得多。它们本质上是带有回调方法的监听器,无需管理和调度单独的轮询线程。连接到 Spring Integration 的可订阅消息通道之一时,此简单选项非常有效。但是,当连接到缓冲、可轮询的消息通道时,某些组件必须调度和管理轮询线程。Spring Integration 提供了两种不同的端点实现来适应这两种类型的消费者。因此,消费者本身只需要实现回调接口。当需要轮询时,端点充当消费者的容器。其好处类似于使用容器来托管消息驱动的 Bean,但是,由于这些消费者是在 ApplicationContext 中运行的 Spring 托管对象,因此它更类似于 Spring 自己的 MessageListener 容器。

消息处理器

Spring Integration 的 MessageHandler 接口由框架中的许多组件实现。换句话说,这不是公共 API 的一部分,您通常不会直接实现 MessageHandler。但是,它被消息消费者用于实际处理已消费的消息,因此了解此策略接口确实有助于理解消费者的整体作用。该接口定义如下

public interface MessageHandler {

    void handleMessage(Message<?> message);

}

尽管该接口很简单,但它为后续章节中介绍的大多数组件(路由器、转换器、拆分器、聚合器、服务激活器等)奠定了基础。这些组件分别对它们处理的消息执行非常不同的功能,但实际接收消息的要求相同,轮询和事件驱动行为之间的选择也相同。Spring Integration 提供了两种端点实现来托管这些基于回调的处理器,并允许它们连接到消息通道。

事件驱动消费者

由于它是两者中比较简单的,因此我们首先介绍事件驱动消费者端点。您可能还记得 SubscribableChannel 接口提供了一个 subscribe() 方法,并且该方法接受 MessageHandler 参数(如SubscribableChannel中所示)。以下列表显示了 subscribe 方法的定义

subscribableChannel.subscribe(messageHandler);

由于订阅通道的处理器不必主动轮询该通道,因此这是一个事件驱动消费者,并且 Spring Integration 提供的实现接受 SubscribableChannelMessageHandler,如下例所示

SubscribableChannel channel = context.getBean("subscribableChannel", SubscribableChannel.class);

EventDrivenConsumer consumer = new EventDrivenConsumer(channel, exampleHandler);

轮询消费者

Spring Integration 还提供了一个 PollingConsumer,并且可以通过相同的方式实例化,只不过通道必须实现 PollableChannel,如下例所示

PollableChannel channel = context.getBean("pollableChannel", PollableChannel.class);

PollingConsumer consumer = new PollingConsumer(channel, exampleHandler);
有关轮询消费者的更多信息,请参阅通道适配器通道适配器

轮询消费者还有许多其他配置选项。以下示例显示了如何设置触发器

PollingConsumer consumer = new PollingConsumer(channel, handler);

consumer.setTrigger(new PeriodicTrigger(Duration.ofSeconds(30)));

PeriodicTrigger 通常使用简单的间隔 (Duration) 定义,但也支持 initialDelay 属性和布尔值 fixedRate 属性(默认为 false,即没有固定延迟)。以下示例设置这两个属性

PeriodicTrigger trigger = new PeriodicTrigger(Duration.ofSeconds(1));
trigger.setInitialDelay(Duration.ofSeconds(5));
trigger.setFixedRate(true);

前面示例中三个设置的结果是一个触发器,它等待五秒钟,然后每秒触发一次。

CronTrigger 需要一个有效的 cron 表达式。有关详细信息,请参阅Javadoc。以下示例设置一个新的 CronTrigger

CronTrigger trigger = new CronTrigger("*/10 * * * * MON-FRI");

前面示例中定义的触发器结果是,从周一到周五每十秒触发一次的触发器。

轮询端点的默认触发器是 PeriodicTrigger 实例,固定延迟周期为 1 秒。

除了触发器之外,您还可以指定其他两个与轮询相关的配置属性:maxMessagesPerPollreceiveTimeout。以下示例显示了如何设置这两个属性

PollingConsumer consumer = new PollingConsumer(channel, handler);

consumer.setMaxMessagesPerPoll(10);
consumer.setReceiveTimeout(5000);

maxMessagesPerPoll 属性指定在给定轮询操作中接收的消息的最大数量。这意味着轮询程序会继续调用 receive() 而不等待,直到返回 null 或达到最大值。例如,如果轮询程序具有十秒钟间隔的触发器和 maxMessagesPerPoll 设置为 25,并且它正在轮询其队列中有 100 条消息的通道,则可以在 40 秒内检索所有 100 条消息。它获取 25 条,等待十秒钟,然后获取接下来的 25 条,依此类推。如果 maxMessagesPerPoll 配置为负值,则在单个轮询周期内调用 MessageSource.receive(),直到它返回 null。从 5.5 版开始,值 0 具有特殊含义 - 完全跳过 MessageSource.receive() 调用,这可以被认为是暂停此轮询端点,直到以后将 maxMessagesPerPoll 更改为非零值,例如通过控制总线。

receiveTimeout 属性指定如果在调用接收操作时没有可用的消息,轮询程序应等待的时间量。例如,考虑两个表面上看起来相似但实际上截然不同的选项:第一个选项的间隔触发器为 5 秒,接收超时为 50 毫秒,而第二个选项的间隔触发器为 50 毫秒,接收超时为 5 秒。第一个选项可能比它在通道上接受的消息晚 4950 毫秒接收消息(如果该消息在其轮询调用之一返回后立即到达)。另一方面,第二个配置永远不会错过超过 50 毫秒的消息。不同之处在于,第二个选项需要线程等待。但是,因此,它可以更快地响应到达的消息。这种称为“长轮询”的技术可用于在轮询源上模拟事件驱动行为。

轮询消费者还可以委托给 Spring TaskExecutor,如下例所示

PollingConsumer consumer = new PollingConsumer(channel, handler);

TaskExecutor taskExecutor = context.getBean("exampleExecutor", TaskExecutor.class);
consumer.setTaskExecutor(taskExecutor);

此外,PollingConsumer 具有一个名为 adviceChain 的属性。此属性允许您指定 AOP 建议的 List,以处理其他横切关注点,包括事务。这些建议应用于 doPoll() 方法周围。有关更深入的信息,请参阅Endpoint 命名空间支持下的 AOP 建议链和事务支持部分。另请参阅 @Poller 注释 Javadoc 和相应的消息传递注释支持部分。Java DSL 还提供了一个.poller()端点配置选项及其相应的 Pollers 工厂。

前面的示例显示了依赖项查找。但是,请记住,这些消费者最常配置为 Spring Bean 定义。实际上,Spring Integration 还提供了一个名为 ConsumerEndpointFactoryBeanFactoryBean,它根据通道的类型创建适当的消费者类型。此外,Spring Integration 具有完整的 XML 命名空间支持,可以进一步隐藏这些细节。本指南在介绍每个组件类型时都会介绍基于命名空间的配置。

许多 MessageHandler 实现可以生成回复消息。如前所述,与接收消息相比,发送消息非常简单。但是,发送回复消息的时间和数量取决于处理程序类型。例如,聚合器等待多个消息到达,并且通常配置为拆分器的下游消费者,拆分器可以为其处理的每个消息生成多个回复。使用命名空间配置时,您无需严格了解所有详细信息。但是,仍然值得了解这些组件中的几个共享一个公共基类 AbstractReplyProducingMessageHandler,并且它提供了一个 setOutputChannel(..) 方法。

Endpoint 命名空间支持

在本参考手册中,您可以找到端点元素(例如路由器、转换器、服务激活器等)的特定配置示例。这些元素中的大多数都支持 input-channel 属性,并且许多都支持 output-channel 属性。解析后,这些端点元素会生成 PollingConsumerEventDrivenConsumer 的实例,具体取决于引用的 input-channel 的类型:分别为 PollableChannelSubscribableChannel。当通道可轮询时,轮询行为基于端点元素的 poller 子元素及其属性。

以下列出了 poller 的所有可用配置选项

<int:poller cron=""                                  (1)
            default="false"                          (2)
            error-channel=""                         (3)
            fixed-delay=""                           (4)
            fixed-rate=""                            (5)
            initial-delay=""                         (6)
            id=""                                    (7)
            max-messages-per-poll=""                 (8)
            receive-timeout=""                       (9)
            ref=""                                   (10)
            task-executor=""                         (11)
            time-unit="MILLISECONDS"                 (12)
            trigger="">                              (13)
            <int:advice-chain />                     (14)
            <int:transactional />                    (15)
</int:poller>
1 提供使用 Cron 表达式配置轮询程序的功能。底层实现使用 org.springframework.scheduling.support.CronTrigger。如果设置了此属性,则必须不指定以下任何属性:fixed-delaytriggerfixed-rateref
2 通过将此属性设置为 true,您可以定义一个全局默认轮询程序。如果在应用程序上下文中定义了多个默认轮询程序,则会引发异常。然后,连接到 PollableChannel (PollingConsumer) 的任何端点或任何未显式配置轮询程序的 SourcePollingChannelAdapter 都将使用全局默认轮询程序。默认为 false。可选。

3 如果在此轮询器的调用中发生故障,则标识错误消息发送到的通道。要完全抑制异常,您可以提供对nullChannel的引用。可选。
4 固定延迟触发器在内部使用PeriodicTrigger。数值以time-unit为单位,或者可以采用持续时间格式(从版本6.2开始),例如PT10SP1D。如果设置了此属性,则不能指定以下任何属性:fixed-ratetriggercronref
5 固定速率触发器在内部使用PeriodicTrigger。数值以time-unit为单位,或者可以采用持续时间格式(从版本6.2开始),例如PT10SP1D。如果设置了此属性,则不能指定以下任何属性:fixed-delaytriggercronref
6 内部PeriodicTrigger的初始延迟(从版本6.2开始)。数值以time-unit为单位,或者可以采用持续时间格式,例如PT10SP1D
7 引用轮询器底层bean定义的ID,该定义的类型为org.springframework.integration.scheduling.PollerMetadata。对于顶级轮询器元素,id属性是必需的,除非它是默认轮询器(default="true")。
8 有关更多信息,请参阅配置入站通道适配器。如果未指定,则默认值取决于上下文。如果您使用PollingConsumer,则此属性默认为-1。但是,如果您使用SourcePollingChannelAdapter,则max-messages-per-poll属性默认为1。可选。
9 在底层类PollerMetadata上设置的值。如果未指定,则默认为1000(毫秒)。可选。
10 对另一个顶级轮询器的bean引用。ref属性不能存在于顶级poller元素上。但是,如果设置了此属性,则不能指定以下任何属性:fixed-ratetriggercronfixed-delay
11 提供引用自定义任务执行器的能力。有关更多信息,请参阅TaskExecutor支持。可选。
12 此属性在底层org.springframework.scheduling.support.PeriodicTrigger上指定java.util.concurrent.TimeUnit枚举值。因此,此属性只能与fixed-delayfixed-rate属性结合使用。如果与crontrigger引用属性结合使用,则会导致错误。PeriodicTrigger支持的最小粒度是毫秒。因此,唯一可用的选项是毫秒和秒。如果未提供此值,则任何fixed-delayfixed-rate值都将解释为毫秒。基本上,此枚举为基于秒的间隔触发器值提供便利。对于每小时、每天和每月设置,我们建议使用cron触发器。
13 对实现org.springframework.scheduling.Trigger接口的任何Spring配置的bean的引用。但是,如果设置了此属性,则不能指定以下任何属性:fixed-delayfixed-ratecronref。可选。
14 允许指定额外的AOP建议以处理其他横切关注点。有关更多信息,请参阅事务。可选。
15 轮询器可以设置为事务性的。有关更多信息,请参阅AOP建议链。可选。

示例

一个简单的基于间隔的轮询器,间隔为1秒,可以配置如下

<int:transformer input-channel="pollable"
    ref="transformer"
    output-channel="output">
    <int:poller fixed-rate="1000"/>
</int:transformer>

作为使用fixed-rate属性的替代方案,您还可以使用fixed-delay属性。

对于基于Cron表达式的轮询器,请改用cron属性,如下例所示

<int:transformer input-channel="pollable"
    ref="transformer"
    output-channel="output">
    <int:poller cron="*/10 * * * * MON-FRI"/>
</int:transformer>

如果输入通道是PollableChannel,则需要轮询器配置。具体来说,如前所述,triggerPollingConsumer类的必需属性。因此,如果您省略轮询使用者端点配置的poller子元素,则可能会抛出异常。如果您尝试在连接到非可轮询通道的元素上配置轮询器,也可能会抛出异常。

还可以创建顶级轮询器,在这种情况下,只需要ref属性,如下例所示

<int:poller id="weekdayPoller" cron="*/10 * * * * MON-FRI"/>

<int:transformer input-channel="pollable"
    ref="transformer"
    output-channel="output">
    <int:poller ref="weekdayPoller"/>
</int:transformer>
ref属性仅允许在内部轮询器定义上使用。在顶级轮询器上定义此属性会导致在应用程序上下文初始化期间抛出配置异常。

全局默认轮询器

为了进一步简化配置,您可以定义一个全局默认轮询器。XML DSL中的单个顶级轮询器组件可以将default属性设置为true。对于Java配置,在这种情况下,必须声明一个名为PollerMetadata.DEFAULT_POLLERPollerMetadata bean。在这种情况下,任何输入通道为PollableChannel的端点(在同一个ApplicationContext中定义),并且没有显式配置的poller,都将使用该默认值。以下示例显示了这样的轮询器和使用它的转换器

  • Java DSL

  • Java

  • Kotlin DSL

  • XML

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setMaxMessagesPerPoll(5);
    pollerMetadata.setTrigger(new PeriodicTrigger(3000));
    return pollerMetadata;
}

// No 'poller' attribute because there is a default global poller
@Bean
public IntegrationFlow transformFlow(MyTransformer transformer) {
    return IntegrationFlow.from(MessageChannels.queue("pollable"))
                           .transform(transformer) // No 'poller' attribute because there is a default global poller
                           .channel("output")
                           .get();
}
@Bean(PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setMaxMessagesPerPoll(5);
    pollerMetadata.setTrigger(new PeriodicTrigger(3000));
    return pollerMetadata;
}

@Bean
public QueueChannel pollable() {
   return new QueueChannel();
}
// No 'poller' attribute because there is a default global poller
@Transformer(inputChannel = "pollable", outputChannel = "output")
public Object transform(Object payload) {
    ...
}
@Bean(PollerMetadata.DEFAULT_POLLER)
fun defaultPoller() =
    PollerMetadata()
        .also {
            it.maxMessagesPerPoll = 5
            it.trigger = PeriodicTrigger(3000)
        }

@Bean
fun convertFlow() =
    integrationFlow(MessageChannels.queue("pollable")) {
    	transform(transformer) // No 'poller' attribute because there is a default global poller
    	channel("output")
    }
<int:poller id="defaultPoller" default="true" max-messages-per-poll="5" fixed-delay="3000"/>

<!-- No <poller/> sub-element is necessary, because there is a default -->
<int:transformer input-channel="pollable"
                 ref="transformer"
                 output-channel="output"/>

事务支持

Spring Integration还为轮询器提供事务支持,以便可以将每个接收和转发操作作为原子工作单元执行。要为轮询器配置事务,请添加<transactional/>子元素。以下示例显示了可用的属性

<int:poller fixed-delay="1000">
    <int:transactional transaction-manager="txManager"
                       propagation="REQUIRED"
                       isolation="REPEATABLE_READ"
                       timeout="10000"
                       read-only="false"/>
</int:poller>

有关更多信息,请参阅轮询器事务支持

AOP建议链

由于Spring事务支持依赖于使用TransactionInterceptor(AOP建议)处理轮询器启动的消息流的事务行为的代理机制,因此您有时必须提供额外的建议来处理与轮询器相关的其他横切行为。为此,poller定义了一个advice-chain元素,允许您在实现MethodInterceptor接口的类中添加更多建议。以下示例显示了如何为poller定义advice-chain

<int:service-activator id="advicedSa" input-channel="goodInputWithAdvice" ref="testBean"
		method="good" output-channel="output">
	<int:poller max-messages-per-poll="1" fixed-rate="10000">
		 <int:advice-chain>
			<ref bean="adviceA" />
			<beans:bean class="org.something.SampleAdvice" />
			<ref bean="txAdvice" />
		</int:advice-chain>
	</int:poller>
</int:service-activator>

有关如何实现MethodInterceptor接口的更多信息,请参阅Spring框架参考指南中的AOP部分。建议链也可以应用于没有任何事务配置的轮询器,允许您增强轮询器启动的消息流的行为。

使用建议链时,不能指定<transactional/>子元素。相反,声明一个<tx:advice/> bean并将其添加到<advice-chain/>中。有关完整的配置详细信息,请参阅轮询器事务支持

TaskExecutor支持

轮询线程可以由Spring的TaskExecutor抽象的任何实例执行。这为端点或端点组启用并发。从Spring 3.0开始,核心Spring框架有一个task命名空间,其<executor/>元素支持创建简单的线程池执行器。该元素接受常见并发设置的属性,例如池大小和队列容量。配置线程池执行器可能会对端点在负载下的执行方式产生重大影响。这些设置可用于每个端点,因为端点的性能是需要考虑的主要因素之一(另一个主要因素是端点订阅的通道上的预期容量)。要为使用XML命名空间支持配置的轮询端点启用并发,请在其<poller/>元素上提供task-executor引用,然后提供以下示例中显示的一个或多个属性

<int:poller task-executor="pool" fixed-rate="1000"/>

<task:executor id="pool"
               pool-size="5-25"
               queue-capacity="20"
               keep-alive="120"/>

如果您不提供task-executor,则消费者的处理程序将在调用者的线程中调用。请注意,调用者通常是默认的TaskScheduler(请参阅配置任务调度器)。您还应该记住,task-executor属性可以通过指定bean名称来提供对Spring的TaskExecutor接口的任何实现的引用。前面显示的executor元素是为了方便起见提供的。

轮询使用者背景部分前面所述,您还可以以模拟事件驱动行为的方式配置轮询使用者。使用较长的接收超时和触发器中的较短间隔,您可以确保即使在轮询的消息源上也能及时响应到达的消息。请注意,这仅适用于具有超时阻塞等待调用的源。例如,文件轮询器不会阻塞。每个receive()调用都会立即返回,并且要么包含新文件,要么不包含。因此,即使轮询器包含较长的receive-timeout,在这种情况下该值也永远不会被使用。另一方面,当使用Spring Integration自己的基于队列的通道时,超时值确实有机会参与。以下示例显示了轮询使用者如何几乎立即接收消息

<int:service-activator input-channel="someQueueChannel"
    output-channel="output">
    <int:poller receive-timeout="30000" fixed-rate="10"/>

</int:service-activator>

使用这种方法不会带来太多开销,因为在内部,它只不过是一个定时等待线程,它不需要像(例如)抖动、无限的while循环那样多的CPU资源使用。

在运行时更改轮询速率

使用fixed-delayfixed-rate属性配置轮询器时,默认实现使用PeriodicTrigger实例。PeriodicTrigger是核心Spring框架的一部分。它只接受间隔作为构造函数参数。因此,它不能在运行时更改。

但是,您可以定义自己的org.springframework.scheduling.Trigger接口实现。您甚至可以使用PeriodicTrigger作为起点。然后,您可以为间隔(周期)添加一个setter,或者您甚至可以在触发器本身中嵌入自己的节流逻辑。period属性与每次调用nextExecutionTime以安排下一次轮询一起使用。要在轮询器中使用此自定义触发器,请在您的应用程序上下文中声明自定义触发器的bean定义,并使用引用自定义触发器bean实例的trigger属性将依赖项注入到您的轮询器配置中。您现在可以获取对触发器bean的引用并在轮询之间更改轮询间隔。

例如,请参阅Spring Integration示例项目。它包含一个名为dynamic-poller的示例,该示例使用自定义触发器并演示了在运行时更改轮询间隔的能力。

此示例提供了一个自定义触发器,它实现了org.springframework.scheduling.Trigger 接口。该示例的触发器基于 Spring 的PeriodicTrigger 实现。但是,自定义触发器的字段不是最终字段,并且属性具有明确的 getter 和 setter,允许您在运行时动态更改轮询周期。

需要注意的是,由于 Trigger 方法是 nextExecutionTime(),因此对动态触发器的任何更改都不会立即生效,而要等到基于现有配置的下一次轮询。无法强制触发器在其当前配置的下次执行时间之前触发。

有效负载类型转换

在本参考手册中,您还可以看到各种端点的特定配置和实现示例,这些端点接受消息或任何任意 Object 作为输入参数。在 Object 的情况下,此类参数将映射到消息有效负载或有效负载的一部分或标头(当使用 Spring 表达式语言时)。但是,端点方法的输入参数类型有时与有效负载或其一部分的类型不匹配。在这种情况下,我们需要执行类型转换。Spring Integration 提供了一种方便的方法来在其自己的转换服务 bean 实例(名为 integrationConversionService)中注册类型转换器(通过使用 Spring ConversionService)。一旦通过使用 Spring Integration 基础设施定义了第一个转换器,就会自动创建该 bean。要注册转换器,您可以实现 org.springframework.core.convert.converter.Converterorg.springframework.core.convert.converter.GenericConverterorg.springframework.core.convert.converter.ConverterFactory

Converter 实现是最简单的,它将一种类型转换为另一种类型。对于更复杂的转换,例如转换为类层次结构,您可以实现 GenericConverter,并可能实现 ConditionalConverter。这些提供了对 fromto 类型描述符的完全访问权限,从而能够进行复杂的转换。例如,如果您有一个名为 Something 的抽象类作为转换的目标(参数类型、通道数据类型等),您有两个名为 Thing1Thing 的具体实现,并且希望根据输入类型转换为其中一个,则 GenericConverter 将是一个不错的选择。有关更多信息,请参阅这些接口的 Javadoc

实现转换器后,您可以使用方便的命名空间支持将其注册,如下例所示

<int:converter ref="sampleConverter"/>

<bean id="sampleConverter" class="foo.bar.TestConverter"/>

或者,您可以使用内部 bean,如下例所示

<int:converter>
    <bean class="o.s.i.config.xml.ConverterParserTests$TestConverter3"/>
</int:converter>

从 Spring Integration 4.0 开始,您可以使用注解来创建前面的配置,如下例所示

@Component
@IntegrationConverter
public class TestConverter implements Converter<Boolean, Number> {

	public Number convert(Boolean source) {
		return source ? 1 : 0;
	}

}

或者,您可以使用 @Configuration 注解,如下例所示

@Configuration
@EnableIntegration
public class ContextConfiguration {

	@Bean
	@IntegrationConverter
	public SerializingConverter serializingConverter() {
		return new SerializingConverter();
	}

}

配置应用程序上下文时,Spring 框架允许您添加 conversionService bean(请参阅配置 ConversionService 章节)。在需要时,此服务用于在 bean 创建和配置期间执行适当的转换。

相反,integrationConversionService 用于运行时转换。这些用途大不相同。在连接 bean 构造函数参数和属性时打算使用的转换器,如果在运行时用于 Spring Integration 对数据类型通道中的消息、有效负载类型转换器等进行表达式求值,可能会产生意外的结果。

但是,如果您确实希望使用 Spring conversionService 作为 Spring Integration integrationConversionService,则可以在应用程序上下文中配置别名,如下例所示

<alias name="conversionService" alias="integrationConversionService"/>

在这种情况下,conversionService 提供的转换器可用于 Spring Integration 运行时转换。

内容类型转换

从 5.0 版开始,默认情况下,方法调用机制基于 org.springframework.messaging.handler.invocation.InvocableHandlerMethod 基础设施。其 HandlerMethodArgumentResolver 实现(例如 PayloadArgumentResolverMessageMethodArgumentResolver)可以使用 MessageConverter 抽象将传入的 payload 转换为目标方法参数类型。转换可以基于 contentType 消息标头。为此,Spring Integration 提供了 ConfigurableCompositeMessageConverter,它委托给已注册转换器的列表,直到其中一个返回非空结果。默认情况下,此转换器按严格顺序提供

有关其用途和转换的适当 contentType 值的更多信息,请参阅 Javadoc(在前面的列表中链接)。使用 ConfigurableCompositeMessageConverter 是因为它可以提供任何其他 MessageConverter 实现,包括或排除前面提到的默认转换器。它也可以在应用程序上下文中注册为适当的 bean,覆盖默认转换器,如下例所示

@Bean(name = IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
public ConfigurableCompositeMessageConverter compositeMessageConverter() {
    List<MessageConverter> converters =
        Arrays.asList(new MarshallingMessageConverter(jaxb2Marshaller()),
                 new JavaSerializationMessageConverter());
    return new ConfigurableCompositeMessageConverter(converters);
}

这两个新转换器在默认值之前在复合中注册。您也可以不使用 ConfigurableCompositeMessageConverter,而是通过注册一个名为 integrationArgumentResolverMessageConverter 的 bean(通过设置 IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME 属性)来提供您自己的 MessageConverter

基于 MessageConverter(包括 contentType 标头)的转换在使用 SpEL 方法调用时不可用。在这种情况下,只有上面有效负载类型转换中提到的常规类到类的转换可用。

异步轮询

如果希望轮询异步进行,则轮询器可以选择指定一个 task-executor 属性,该属性指向任何 TaskExecutor bean 的现有实例(Spring 3.0 通过 task 命名空间提供了方便的命名空间配置)。但是,在使用 TaskExecutor 配置轮询器时,必须了解某些事项。

问题在于存在两个配置,轮询器和 TaskExecutor。它们必须相互协调。否则,您可能会最终创建人工内存泄漏。

考虑以下配置

<int:channel id="publishChannel">
    <int:queue />
</int:channel>

<int:service-activator input-channel="publishChannel" ref="myService">
	<int:poller receive-timeout="5000" task-executor="taskExecutor" fixed-rate="50" />
</int:service-activator>

<task:executor id="taskExecutor" pool-size="20" />

前面的配置演示了一个不协调的配置。

默认情况下,任务执行器具有无界任务队列。即使所有线程都被阻塞,等待新消息到达或超时过期,轮询器也会继续调度新任务。鉴于有 20 个线程以 5 秒的超时执行任务,它们的执行速度为每秒 4 个。但是,新任务的调度速度为每秒 20 个,因此任务执行器中的内部队列以每秒 16 个的速度增长(在进程空闲时),因此我们存在内存泄漏。

处理此问题的一种方法是设置任务执行器的 queue-capacity 属性。即使是 0 也是一个合理的值。您还可以通过指定对无法排队的消息执行的操作来管理它,方法是设置任务执行器的 rejection-policy 属性(例如,设置为 DISCARD)。换句话说,在配置 TaskExecutor 时,必须了解某些细节。有关此主题的更多详细信息,请参阅 Spring 参考手册中的“任务执行和调度”

端点内部 Bean

许多端点都是复合 bean。这包括所有使用者和所有轮询的入站通道适配器。使用者(轮询或事件驱动)委托给 MessageHandler。轮询适配器通过委托给 MessageSource 来获取消息。通常,获取对委托 bean 的引用很有用,例如在运行时更改配置或用于测试。可以使用众所周知的名称从 ApplicationContext 获取这些 bean。MessageHandler 实例使用类似于 someConsumer.handler 的 bean ID 注册到应用程序上下文(其中“consumer”是端点 id 属性的值)。MessageSource 实例使用类似于 somePolledAdapter.source 的 bean ID 注册,其中“somePolledAdapter”是适配器的 ID。

前面内容仅适用于框架组件本身。您可以改为使用内部 bean 定义,如下例所示

<int:service-activator id="exampleServiceActivator" input-channel="inChannel"
            output-channel = "outChannel" method="foo">
    <beans:bean class="org.foo.ExampleServiceActivator"/>
</int:service-activator>

该 bean 与声明的任何内部 bean 一样对待,不会注册到应用程序上下文。如果您希望以其他方式访问此 bean,请在顶层使用 id 声明它,并改为使用 ref 属性。有关更多信息,请参阅Spring 文档