聚合器
聚合器基本上是拆分器的镜像,它是一种消息处理器,接收多个消息并将它们组合成单个消息。实际上,聚合器通常是包含拆分器的管道中的下游消费者。
从技术上讲,聚合器比拆分器更复杂,因为它是有状态的。它必须保存要聚合的消息,并确定何时准备好聚合完整的组消息。为此,它需要一个MessageStore
。
功能
聚合器通过关联和存储相关消息组,直到该组被认为已完成。此时,聚合器通过处理整个组来创建单个消息,并将聚合的消息作为输出发送。
实现聚合器需要提供执行聚合的逻辑(即从多个消息创建单个消息)。两个相关的概念是关联和释放。
关联确定如何将消息分组以进行聚合。在 Spring Integration 中,关联默认情况下基于IntegrationMessageHeaderAccessor.CORRELATION_ID
消息头进行。具有相同IntegrationMessageHeaderAccessor.CORRELATION_ID
的消息将分组在一起。但是,您可以自定义关联策略以允许其他方式指定如何将消息分组在一起。为此,您可以实现一个CorrelationStrategy
(本章稍后介绍)。
为了确定消息组何时准备好进行处理,会咨询一个ReleaseStrategy
。聚合器的默认释放策略在基于IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
头部的序列中存在所有消息时,释放一个组。您可以通过提供对自定义ReleaseStrategy
实现的引用来覆盖此默认策略。
编程模型
聚合 API 由多个类组成
-
接口
MessageGroupProcessor
及其子类:MethodInvokingAggregatingMessageGroupProcessor
和ExpressionEvaluatingMessageGroupProcessor
-
ReleaseStrategy
接口及其默认实现:SimpleSequenceSizeReleaseStrategy
-
CorrelationStrategy
接口及其默认实现:HeaderAttributeCorrelationStrategy
AggregatingMessageHandler
AggregatingMessageHandler
(AbstractCorrelatingMessageHandler
的子类)是一个MessageHandler
实现,封装了聚合器(和其他关联用例)的通用功能,这些功能如下
-
将消息关联到一个要聚合的组中
-
在
MessageStore
中维护这些消息,直到该组可以被释放 -
决定何时可以释放该组
-
将释放的组聚合到单个消息中
-
识别和响应已过期的组
决定如何将消息分组在一起的责任委托给一个CorrelationStrategy
实例。决定消息组是否可以释放的责任委托给一个ReleaseStrategy
实例。
以下列表简要介绍了基本AbstractAggregatingMessageGroupProcessor
(实现aggregatePayloads
方法的责任留给开发人员)
public abstract class AbstractAggregatingMessageGroupProcessor
implements MessageGroupProcessor {
protected Map<String, Object> aggregateHeaders(MessageGroup group) {
// default implementation exists
}
protected abstract Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders);
}
请参阅DefaultAggregatingMessageGroupProcessor
、ExpressionEvaluatingMessageGroupProcessor
和MethodInvokingMessageGroupProcessor
,它们是AbstractAggregatingMessageGroupProcessor
的开箱即用实现。
从 5.2 版本开始,AbstractAggregatingMessageGroupProcessor
提供了一个Function<MessageGroup, Map<String, Object>>
策略,用于合并和计算(聚合)输出消息的标头。DefaultAggregateHeadersFunction
实现提供了一种逻辑,该逻辑返回所有在组中没有冲突的标头;组内一个或多个消息上缺少的标头不被视为冲突。冲突的标头将被省略。除了新引入的DelegatingMessageGroupProcessor
之外,此函数还用于任何任意(非AbstractAggregatingMessageGroupProcessor
)MessageGroupProcessor
实现。本质上,框架将提供的函数注入到AbstractAggregatingMessageGroupProcessor
实例中,并将所有其他实现包装到DelegatingMessageGroupProcessor
中。AbstractAggregatingMessageGroupProcessor
和DelegatingMessageGroupProcessor
之间的逻辑差异在于,后者不会在调用委托策略之前预先计算标头,并且如果委托返回Message
或AbstractIntegrationMessageBuilder
,则不会调用该函数。在这种情况下,框架假设目标实现已负责生成一组适当的标头,这些标头填充到返回的结果中。Function<MessageGroup, Map<String, Object>>
策略可作为 XML 配置的headers-function
引用属性、Java DSL 的AggregatorSpec.headersFunction()
选项以及普通 Java 配置的AggregatorFactoryBean.setHeadersFunction()
使用。
CorrelationStrategy
由AbstractCorrelatingMessageHandler
拥有,并根据IntegrationMessageHeaderAccessor.CORRELATION_ID
消息标头具有默认值,如下例所示
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
...
this.correlationStrategy = correlationStrategy == null ?
new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) : correlationStrategy;
this.releaseStrategy = releaseStrategy == null ? new SimpleSequenceSizeReleaseStrategy() : releaseStrategy;
...
}
至于消息组的实际处理,默认实现是DefaultAggregatingMessageGroupProcessor
。它创建一个单一的Message
,其有效负载是为给定组接收的有效负载的List
。这对于使用拆分器、发布-订阅通道或接收者列表路由器上游的简单散布-收集实现非常有效。
在使用发布-订阅通道或收件人列表路由器时,请确保启用 apply-sequence 标志。这样做会添加必要的标头:CORRELATION_ID 、SEQUENCE_NUMBER 和 SEQUENCE_SIZE 。这种行为在 Spring Integration 中默认情况下对拆分器启用,但它没有为发布-订阅通道或收件人列表路由器启用,因为这些组件可能在各种上下文中使用,在这些上下文中不需要这些标头。
|
在为应用程序实现特定聚合器策略时,可以扩展 AbstractAggregatingMessageGroupProcessor
并实现 aggregatePayloads
方法。但是,有更好的解决方案,与 API 的耦合度更低,用于实现聚合逻辑,可以通过 XML 或注释进行配置。
通常,任何 POJO 都可以实现聚合算法,如果它提供一个接受单个 java.util.List
作为参数的方法(也支持参数化列表)。此方法用于按如下方式聚合消息
-
如果参数是
java.util.Collection<T>
并且参数类型 T 可分配给Message
,则将为聚合累积的消息列表发送到聚合器。 -
如果参数是非参数化的
java.util.Collection
或参数类型不可分配给Message
,则该方法将接收累积消息的有效负载。 -
如果返回类型不可分配给
Message
,则将其视为框架自动创建的Message
的有效负载。
为了代码简单性和促进最佳实践(如低耦合、可测试性等),实现聚合逻辑的首选方法是通过 POJO 并使用 XML 或注释支持在应用程序中进行配置。 |
从 5.3 版本开始,在处理消息组后,AbstractCorrelatingMessageHandler
会对消息标头执行 MessageBuilder.popSequenceDetails()
修改,以实现具有多个嵌套级别的正确拆分器-聚合器场景。这仅在消息组释放结果不是消息集合时才执行。在这种情况下,目标 MessageGroupProcessor
负责在构建这些消息时调用 MessageBuilder.popSequenceDetails()
。
如果 MessageGroupProcessor
返回 Message
,则仅当 sequenceDetails
与组中的第一条消息匹配时,才会在输出消息上执行 MessageBuilder.popSequenceDetails()
。(以前,只有在从 MessageGroupProcessor
返回普通有效负载或 AbstractIntegrationMessageBuilder
时才会执行此操作。)
此功能可以通过一个新的popSequence
boolean
属性来控制,因此MessageBuilder.popSequenceDetails()
可以在某些情况下被禁用,此时相关性详细信息尚未由标准拆分器填充。本质上,此属性撤销了最近的上游AbstractMessageSplitter
中applySequence = true
所做的操作。有关更多信息,请参阅拆分器。
SimpleMessageGroup.getMessages() 方法返回一个unmodifiableCollection 。因此,如果聚合 POJO 方法具有Collection<Message> 参数,则传入的参数正是该Collection 实例,并且当您对聚合器使用SimpleMessageStore 时,该原始Collection<Message> 将在释放组后被清除。因此,如果 POJO 中的Collection<Message> 变量被传递出聚合器,它也会被清除。如果您希望简单地按原样释放该集合以供进一步处理,则必须构建一个新的Collection (例如,new ArrayList<Message>(messages) )。从 4.3 版开始,框架不再将消息复制到新的集合中,以避免不必要的额外对象创建。
|
在 4.2 版之前,无法通过使用 XML 配置来提供MessageGroupProcessor
。只有 POJO 方法可用于聚合。现在,如果框架检测到引用的(或内部的)bean 实现了MessageProcessor
,则它将用作聚合器的输出处理器。
如果您希望从自定义MessageGroupProcessor
中释放一组对象作为消息的有效负载,您的类应该扩展AbstractAggregatingMessageGroupProcessor
并实现aggregatePayloads()
。
此外,从 4.2 版开始,提供了一个SimpleMessageGroupProcessor
。它返回来自组的消息集合,如前所述,这会导致释放的消息被单独发送。
这使得聚合器可以作为消息屏障工作,到达的消息将被保留,直到释放策略触发并以单个消息序列的形式释放该组。
从 6.0 版本开始,上述拆分行为仅在组处理器为 SimpleMessageGroupProcessor
时有效。否则,对于返回 Collection<Message>
的任何其他 MessageGroupProcessor
实现,只会发出一个包含整个消息集合作为其有效载荷的回复消息。这种逻辑由聚合器的典型目的决定 - 按某个键收集请求消息并生成单个分组消息。
ReleaseStrategy
ReleaseStrategy
接口定义如下
public interface ReleaseStrategy {
boolean canRelease(MessageGroup group);
}
一般来说,任何 POJO 都可以实现完成决策逻辑,只要它提供一个接受单个 java.util.List
作为参数(支持参数化列表)并返回布尔值的方法。此方法在每条新消息到达后被调用,以决定组是否已完成,如下所示
-
如果参数是
java.util.List<T>
且参数类型T
可分配给Message
,则将组中累积的整个消息列表发送到该方法。 -
如果参数是非参数化的
java.util.List
或参数类型不可分配给Message
,则该方法接收累积消息的有效载荷。 -
如果消息组已准备好进行聚合,则该方法必须返回
true
,否则返回false
。
以下示例展示了如何为 Message
类型的 List
使用 @ReleaseStrategy
注解
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<Message<?>>) {...}
}
以下示例展示了如何为 String
类型的 List
使用 @ReleaseStrategy
注解
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<String>) {...}
}
根据前面两个示例中的签名,基于 POJO 的发布策略传递一个尚未发布的消息的 Collection
(如果您需要访问整个 Message
)或一个有效载荷对象的 Collection
(如果类型参数不是 Message
)。这满足了大多数用例。但是,如果出于某种原因,您需要访问完整的 MessageGroup
,则应提供 ReleaseStrategy
接口的实现。
在处理可能很大的组时,您应该了解这些方法是如何调用的,因为发布策略可能在组发布之前被多次调用。最有效的是 出于这些原因,对于大型组,我们建议您实现 |
当组被释放以进行聚合时,其所有尚未发布的消息都会被处理并从组中删除。如果该组也已完成(即,如果来自序列的所有消息都已到达或没有定义序列),则该组将被标记为已完成。该组的任何新消息都将发送到丢弃通道(如果已定义)。将expire-groups-upon-completion
设置为true
(默认值为false
)将删除整个组,任何新消息(与已删除组具有相同的关联 ID)将形成一个新组。您可以使用MessageGroupStoreReaper
以及将send-partial-result-on-expiry
设置为true
来释放部分序列。
为了便于丢弃迟到的消息,聚合器必须在组被释放后维护有关该组的状态。这最终会导致内存不足的情况。为了避免这种情况,您应该考虑配置MessageGroupStoreReaper 来删除组元数据。应将过期参数设置为在达到某个点后过期组,在此之后预计不会收到迟到的消息。有关配置收割器的信息,请参见在聚合器中管理状态:MessageGroupStore 。
|
Spring Integration 为ReleaseStrategy
提供了一个实现:SimpleSequenceSizeReleaseStrategy
。此实现会查询每个到达消息的SEQUENCE_NUMBER
和SEQUENCE_SIZE
标头,以决定何时消息组已完成并准备进行聚合。如前所述,它也是默认策略。
在 5.0 版之前,默认的发布策略是SequenceSizeReleaseStrategy ,它在处理大型组时表现不佳。使用该策略,会检测到重复的序列号并被拒绝。此操作可能很昂贵。
|
如果您正在聚合大型组,您不需要释放部分组,也不需要检测/拒绝重复序列,请考虑使用SimpleSequenceSizeReleaseStrategy
- 它在这些用例中效率更高,并且是自5.0 版以来的默认策略,此时未指定部分组发布。
聚合大型组
4.3 版本更改了 `SimpleMessageGroup` 中消息的默认 `Collection` 为 `HashSet`(之前是 `BlockingQueue`)。当从大型组中删除单个消息时,这将非常昂贵(需要进行 O(n) 线性扫描)。虽然哈希集通常在删除方面快得多,但对于大型消息来说,它可能很昂贵,因为必须在插入和删除时都计算哈希值。如果您有哈希值昂贵的消息,请考虑使用其他集合类型。如 使用 `MessageGroupFactory` 中所述,提供了 `SimpleMessageGroupFactory`,以便您可以选择最适合您需求的 `Collection`。您也可以提供自己的工厂实现来创建其他 `Collection<Message<?>>`。
以下示例展示了如何使用之前的实现和 `SimpleSequenceSizeReleaseStrategy` 配置聚合器。
<int:aggregator input-channel="aggregate"
output-channel="out" message-store="store" release-strategy="releaser" />
<bean id="store" class="org.springframework.integration.store.SimpleMessageStore">
<property name="messageGroupFactory">
<bean class="org.springframework.integration.store.SimpleMessageGroupFactory">
<constructor-arg value="BLOCKING_QUEUE"/>
</bean>
</property>
</bean>
<bean id="releaser" class="SimpleSequenceSizeReleaseStrategy" />
如果过滤器端点参与聚合器上游的流,则序列大小释放策略(固定或基于 `sequenceSize` 标头)将无法发挥作用,因为来自序列的一些消息可能会被过滤器丢弃。在这种情况下,建议选择其他 `ReleaseStrategy`,或者使用从丢弃子流发送的补偿消息,这些消息在内容中携带一些信息,以便在自定义完成组函数中跳过。有关更多信息,请参阅 过滤器。 |
关联策略
`CorrelationStrategy` 接口定义如下
public interface CorrelationStrategy {
Object getCorrelationKey(Message<?> message);
}
该方法返回一个 `Object`,它表示用于将消息与消息组关联的关联键。该键必须满足 `Map` 中键的标准,关于 `equals()` 和 `hashCode()` 的实现。
通常,任何 POJO 都可以实现关联逻辑,将消息映射到方法参数(或参数)的规则与 `ServiceActivator` 相同(包括对 `@Header` 注释的支持)。该方法必须返回值,并且该值不能为 `null`。
Spring Integration 为 `CorrelationStrategy` 提供了一个实现:`HeaderAttributeCorrelationStrategy`。此实现将消息标头之一的值(其名称由构造函数参数指定)作为关联键返回。默认情况下,关联策略是 `HeaderAttributeCorrelationStrategy`,它返回 `CORRELATION_ID` 标头属性的值。如果您想使用自定义标头名称进行关联,您可以在 `HeaderAttributeCorrelationStrategy` 的实例上配置它,并将其作为聚合器关联策略的引用提供。
锁注册表
对组的更改是线程安全的。因此,当您并发地为相同的关联 ID 发送消息时,它们中只有一个将在聚合器中处理,使其有效地成为 **每个消息组的单线程**。`LockRegistry` 用于获取已解析关联 ID 的锁。默认情况下使用 `DefaultLockRegistry`(内存中)。为了在使用共享 `MessageGroupStore` 的服务器之间同步更新,您必须配置共享锁注册表。
避免死锁
如上所述,当消息组被修改(添加或释放消息)时,会持有锁。
考虑以下流程
...->aggregator1-> ... ->aggregator2-> ...
如果有多个线程,并且聚合器共享一个公共锁注册表,则可能会发生死锁。这会导致线程挂起,并且jstack <pid>
可能会显示类似的结果
Found one Java-level deadlock:
=============================
"t2":
waiting for ownable synchronizer 0x000000076c1cbfa0, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t1"
"t1":
waiting for ownable synchronizer 0x000000076c1ccc00, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t2"
有几种方法可以避免此问题
-
确保每个聚合器都有自己的锁注册表(这可以在应用程序实例之间共享注册表,但流程中的两个或多个聚合器必须分别拥有不同的注册表)
-
使用
ExecutorChannel
或QueueChannel
作为聚合器的输出通道,以便下游流程在新的线程上运行 -
从 5.1.1 版本开始,将
releaseLockBeforeSend
聚合器属性设置为true
如果由于某种原因,单个聚合器的输出最终被路由回同一个聚合器,也会导致此问题。当然,上述第一个解决方案在这种情况下不适用。 |
在 Java DSL 中配置聚合器
有关如何在 Java DSL 中配置聚合器的信息,请参阅聚合器和重新排序器。
使用 XML 配置聚合器
Spring Integration 支持通过<aggregator/>
元素使用 XML 配置聚合器。以下示例显示了一个聚合器示例
<channel id="inputChannel"/>
<int:aggregator id="myAggregator" (1)
auto-startup="true" (2)
input-channel="inputChannel" (3)
output-channel="outputChannel" (4)
discard-channel="throwAwayChannel" (5)
message-store="persistentMessageStore" (6)
order="1" (7)
send-partial-result-on-expiry="false" (8)
send-timeout="1000" (9)
correlation-strategy="correlationStrategyBean" (10)
correlation-strategy-method="correlate" (11)
correlation-strategy-expression="headers['foo']" (12)
ref="aggregatorBean" (13)
method="aggregate" (14)
release-strategy="releaseStrategyBean" (15)
release-strategy-method="release" (16)
release-strategy-expression="size() == 5" (17)
expire-groups-upon-completion="false" (18)
empty-group-min-timeout="60000" (19)
lock-registry="lockRegistry" (20)
group-timeout="60000" (21)
group-timeout-expression="size() ge 2 ? 100 : -1" (22)
expire-groups-upon-timeout="true" (23)
scheduler="taskScheduler" > (24)
<expire-transactional/> (25)
<expire-advice-chain/> (26)
</aggregator>
<int:channel id="outputChannel"/>
<int:channel id="throwAwayChannel"/>
<bean id="persistentMessageStore" class="org.springframework.integration.jdbc.store.JdbcMessageStore">
<constructor-arg ref="dataSource"/>
</bean>
<bean id="aggregatorBean" class="sample.PojoAggregator"/>
<bean id="releaseStrategyBean" class="sample.PojoReleaseStrategy"/>
<bean id="correlationStrategyBean" class="sample.PojoCorrelationStrategy"/>
1 | 聚合器的 ID 是可选的。 |
2 | 生命周期属性表示聚合器是否应该在应用程序上下文启动期间启动。可选(默认值为“true”)。 |
3 | 聚合器接收消息的通道。必需。 |
4 | 聚合器发送聚合结果的通道。可选(因为传入消息本身可以在“replyChannel”消息头中指定回复通道)。 |
5 | 聚合器发送超时消息的通道(如果send-partial-result-on-expiry 为false )。可选。 |
6 | 对MessageGroupStore 的引用,用于在消息完成之前根据其关联键存储消息组。可选。默认情况下,它是一个易失性的内存内存储。有关更多信息,请参阅消息存储。 |
7 | 当多个处理程序订阅同一个 DirectChannel 时,此聚合器的顺序(用于负载均衡)。可选。 |
8 | 指示已过期的消息应在它们包含的 MessageGroup 过期后聚合并发送到“output-channel”或“replyChannel”(参见 MessageGroupStore.expireMessageGroups(long) )。过期 MessageGroup 的一种方法是配置 MessageGroupStoreReaper 。但是,您也可以通过调用 MessageGroupStore.expireMessageGroups(timeout) 来过期 MessageGroup 。您可以通过控制总线操作来实现这一点,或者,如果您有对 MessageGroupStore 实例的引用,则可以通过调用 expireMessageGroups(timeout) 来实现。否则,此属性本身不会执行任何操作。它仅作为是否丢弃或发送到输出或回复通道即将过期的 MessageGroup 中的任何消息的指示器。可选(默认值为 false )。注意:此属性可能更恰当地称为 send-partial-result-on-timeout ,因为如果 expire-groups-upon-timeout 设置为 false ,则该组可能实际上不会过期。 |
9 | 将回复 Message 发送到 output-channel 或 discard-channel 时等待的超时间隔。默认为 30 秒。它仅在输出通道存在一些“发送”限制时应用,例如具有固定“容量”的 QueueChannel 。在这种情况下,将抛出 MessageDeliveryException 。对于 AbstractSubscribableChannel 实现,send-timeout 将被忽略。对于 group-timeout(-expression) ,来自计划的过期任务的 MessageDeliveryException 会导致此任务重新计划。可选。 |
10 | 对实现消息关联(分组)算法的 Bean 的引用。该 Bean 可以是 CorrelationStrategy 接口的实现或 POJO。在后一种情况下,还必须定义 correlation-strategy-method 属性。可选(默认情况下,聚合器使用 IntegrationMessageHeaderAccessor.CORRELATION_ID 标头)。 |
11 | 在 correlation-strategy 引用的 Bean 上定义的方法。它实现了关联决策算法。可选,有限制(correlation-strategy 必须存在)。 |
12 | 表示关联策略的 SpEL 表达式。示例:"headers['something']" 。correlation-strategy 或 correlation-strategy-expression 只能使用一个。 |
13 | 在应用程序上下文中定义的 Bean 的引用。该 Bean 必须实现前面描述的聚合逻辑。可选(默认情况下,聚合消息的列表将成为输出消息的有效负载)。 |
14 | 在 ref 属性引用的 Bean 上定义的方法。它实现了消息聚合算法。可选(它取决于 ref 属性是否已定义)。 |
15 | 对实现发布策略的 Bean 的引用。该 Bean 可以是 ReleaseStrategy 接口的实现或 POJO。在后一种情况下,还必须定义 release-strategy-method 属性。可选(默认情况下,聚合器使用 IntegrationMessageHeaderAccessor.SEQUENCE_SIZE 标头属性)。 |
16 | 由release-strategy 属性引用的 Bean 上定义的方法。它实现完成决策算法。可选,但有限制(release-strategy 必须存在)。 |
17 | 表示发布策略的 SpEL 表达式。表达式的根对象是MessageGroup 。例如:"size() == 5" 。只能使用release-strategy 或release-strategy-expression 中的一个。 |
18 | 当设置为true 时(默认值为false ),已完成的组将从消息存储中删除,让后续具有相同相关性的消息形成一个新组。默认行为是将具有与已完成组相同相关性的消息发送到discard-channel 。 |
19 | 仅在为<aggregator> 的MessageStore 配置了MessageGroupStoreReaper 时适用。默认情况下,当配置MessageGroupStoreReaper 以使部分组过期时,空组也会被删除。空组在组正常释放后存在。空组允许检测和丢弃迟到的消息。如果您希望以比过期部分组更长的计划来过期空组,请设置此属性。然后,空组不会从MessageStore 中删除,直到它们至少在该毫秒数内未被修改。请注意,过期空组的实际时间也会受到收割机timeout 属性的影响,它可能与该值加上超时时间一样长。 |
20 | 对org.springframework.integration.util.LockRegistry Bean 的引用。它用于根据groupId 获取Lock ,以便对MessageGroup 进行并发操作。默认情况下,使用内部DefaultLockRegistry 。使用分布式LockRegistry (例如ZookeeperLockRegistry )可确保聚合器的单个实例可以同时操作一个组。有关更多信息,请参见Redis 锁注册表或Zookeeper 锁注册表。 |
21 | 当 `ReleaseStrategy` 在当前消息到达时未释放分组时,强制 `MessageGroup` 完成的超时时间(以毫秒为单位)。此属性为聚合器提供了一个内置的基于时间的释放策略,当需要在 `MessageGroup` 在超时时间内(从最后一条消息到达的时间开始计算)没有收到新消息时发出部分结果(或丢弃分组)时。要设置从 `MessageGroup` 创建时开始计算的超时时间,请参阅 `group-timeout-expression` 信息。当聚合器收到新消息时,其 `MessageGroup` 的任何现有 `ScheduledFuture>` 都将被取消。如果 `ReleaseStrategy` 返回 `false`(表示不释放)并且 `groupTimeout > 0`,则会安排一个新任务来使分组过期。我们建议不要将此属性设置为零(或负值)。这样做实际上会禁用聚合器,因为每个消息分组都会立即完成。但是,您可以使用表达式有条件地将其设置为零(或负值)。有关信息,请参阅 `group-timeout-expression`。此属性与 `group-timeout-expression` 属性互斥。有关更多信息,请参阅 聚合器和分组超时。 |
22 | 计算为 `groupTimeout` 的 SpEL 表达式,其中 `MessageGroup` 作为 `#root` 评估上下文对象。用于安排 `MessageGroup` 强制完成。如果表达式计算结果为 `null`,则不会安排完成。如果它计算结果为零,则分组将在当前线程上立即完成。实际上,这提供了一个动态的 `group-timeout` 属性。例如,如果您希望在分组创建后 10 秒强制完成 `MessageGroup`,您可以考虑使用以下 SpEL 表达式:`timestamp + 10000 - T(System).currentTimeMillis()`,其中 `timestamp` 由 `MessageGroup.getTimestamp()` 提供,因为这里的 `MessageGroup` 是 `#root` 评估上下文对象。但是请记住,根据其他分组过期属性的配置,分组创建时间可能与第一条到达消息的时间不同。有关更多信息,请参阅 `group-timeout`。此属性与 `group-timeout` 属性互斥。 |
23 | 当分组由于超时(或由 `MessageGroupStoreReaper`)完成时,默认情况下分组会过期(完全删除)。到达的延迟消息将启动一个新的分组。您可以将其设置为 `false` 以完成分组,但保留其元数据,以便丢弃到达的延迟消息。可以使用 `MessageGroupStoreReaper` 以及 `empty-group-min-timeout` 属性,稍后过期空分组。默认值为 `true`。 |
24 | 一个TaskScheduler bean 引用,用于调度MessageGroup ,如果在groupTimeout 内没有新的消息到达MessageGroup ,则强制完成。如果没有提供,则使用在ApplicationContext (ThreadPoolTaskScheduler )中注册的默认调度程序(taskScheduler )。如果未指定group-timeout 或group-timeout-expression ,则此属性不适用。 |
25 | 从版本 4.1 开始。它允许为forceComplete 操作启动事务。它由group-timeout(-expression) 或MessageGroupStoreReaper 启动,不应用于正常的add 、release 和discard 操作。只允许此子元素或<expire-advice-chain/> 。 |
26 | 从版本 4.1开始。它允许为forceComplete 操作配置任何Advice 。它由group-timeout(-expression) 或MessageGroupStoreReaper 启动,不应用于正常的add 、release 和discard 操作。只允许此子元素或<expire-transactional/> 。也可以使用 Spring tx 命名空间在此处配置事务Advice 。 |
过期组
有两个属性与过期(完全删除)组相关。当组过期时,没有关于它的记录,并且如果新的消息到达具有相同的关联,则会启动一个新的组。当组完成(没有过期)时,空组会保留,并且延迟到达的消息会被丢弃。空组可以使用
如果组没有正常完成,而是由于超时而被释放或丢弃,则该组通常会过期。从版本 4.1 开始,可以使用
从版本 5.0 开始,空组也会在 从 5.4 版本开始,聚合器(和重排序器)可以配置为过期孤立组(持久消息存储中可能无法释放的组)。 |
我们通常建议使用 ref
属性,如果自定义聚合器处理程序实现可能在其他 <aggregator>
定义中被引用。但是,如果自定义聚合器实现只被 <aggregator>
的单个定义使用,则可以使用内部 bean 定义(从 1.0.3 版本开始)在 <aggregator>
元素中配置聚合 POJO,如下面的示例所示
<aggregator input-channel="input" method="sum" output-channel="output">
<beans:bean class="org.foo.PojoAggregator"/>
</aggregator>
在同一个 <aggregator> 配置中同时使用 ref 属性和内部 bean 定义是不允许的,因为它会创建一个模棱两可的情况。在这种情况下,会抛出异常。
|
以下示例显示了聚合器 bean 的实现
public class PojoAggregator {
public Long add(List<Long> results) {
long total = 0l;
for (long partialResult: results) {
total += partialResult;
}
return total;
}
}
前面示例的完成策略 bean 的实现可能如下所示
public class PojoReleaseStrategy {
...
public boolean canRelease(List<Long> numbers) {
int sum = 0;
for (long number: numbers) {
sum += number;
}
return sum >= maxValue;
}
}
在有意义的情况下,释放策略方法和聚合器方法可以合并到一个 bean 中。 |
上面示例的相关性策略 bean 的实现可能如下所示
public class PojoCorrelationStrategy {
...
public Long groupNumbersByLastDigit(Long number) {
return number % 10;
}
}
前面示例中的聚合器将根据某些标准(在本例中,是除以十后的余数)对数字进行分组,并将该组保留,直到有效负载提供的数字之和超过某个值。
在合理的情况下,发布策略方法、关联策略方法和聚合器方法可以组合在一个 bean 中。(实际上,所有方法或任意两个方法都可以组合。) |
聚合器和 Spring 表达式语言 (SpEL)
从 Spring Integration 2.0 开始,您可以使用 SpEL 处理各种策略(关联、发布和聚合),如果这种发布策略背后的逻辑相对简单,我们建议您使用这种方法。假设您有一个遗留组件,该组件被设计为接收对象数组。我们知道默认的发布策略会将所有聚合的消息组装到 List
中。现在我们有两个问题。首先,我们需要从列表中提取单个消息。其次,我们需要提取每个消息的有效负载并组装对象数组。以下示例解决了这两个问题
public String[] processRelease(List<Message<String>> messages){
List<String> stringList = new ArrayList<String>();
for (Message<String> message : messages) {
stringList.add(message.getPayload());
}
return stringList.toArray(new String[]{});
}
但是,使用 SpEL,这样的需求实际上可以通过一行表达式轻松处理,从而免去您编写自定义类并将其配置为 bean 的麻烦。以下示例展示了如何做到这一点
<int:aggregator input-channel="aggChannel"
output-channel="replyChannel"
expression="#this.![payload].toArray()"/>
在前面的配置中,我们使用 集合投影 表达式从列表中所有消息的有效负载组装一个新的集合,然后将其转换为数组,从而实现与之前 Java 代码相同的结果。
在处理自定义发布和关联策略时,您可以应用相同的基于表达式的方案。
您无需在 correlation-strategy
属性中定义一个自定义 CorrelationStrategy
bean,而是可以将简单的关联逻辑实现为 SpEL 表达式,并在 correlation-strategy-expression
属性中配置它,如下面的示例所示
correlation-strategy-expression="payload.person.id"
在前面的示例中,我们假设有效负载具有一个 person
属性,该属性具有一个 id
,将用于关联消息。
同样,对于 ReleaseStrategy
,您可以将发布逻辑实现为 SpEL 表达式,并在 release-strategy-expression
属性中配置它。评估上下文的根对象是 MessageGroup
本身。可以使用表达式中组的 message
属性引用消息列表。
在 5.0 版本之前的版本中,根对象是 Message<?> 的集合,如前面的示例所示。
|
release-strategy-expression="!messages.?[payload==5].empty"
在前面的示例中,SpEL 评估上下文的根对象是 MessageGroup
本身,并且您正在声明,一旦该组中存在有效负载为 5
的消息,该组就应该被释放。
聚合器和组超时
从 4.0 版本开始,引入了两个新的互斥属性:group-timeout
和 group-timeout-expression
。请参阅 使用 XML 配置聚合器。在某些情况下,您可能需要在超时后发出聚合器结果(或丢弃组),如果 ReleaseStrategy
在当前消息到达时未释放。为此,groupTimeout
选项允许调度 MessageGroup
强制完成,如下面的示例所示。
<aggregator input-channel="input" output-channel="output"
send-partial-result-on-expiry="true"
group-timeout-expression="size() ge 2 ? 10000 : -1"
release-strategy-expression="messages[0].headers.sequenceNumber == messages[0].headers.sequenceSize"/>
通过此示例,如果聚合器按 release-strategy-expression
定义的顺序接收最后一条消息,则正常释放是可能的。如果该特定消息未到达,则 groupTimeout
会强制该组在十秒后完成,只要该组包含至少两条消息。
强制组完成的结果取决于 ReleaseStrategy
和 send-partial-result-on-expiry
。首先,再次咨询发布策略以查看是否要进行正常发布。虽然组没有改变,但 ReleaseStrategy
可以决定此时发布该组。如果发布策略仍然没有发布该组,则该组已过期。如果 send-partial-result-on-expiry
为 true
,则(部分)MessageGroup
中的现有消息将作为正常聚合器回复消息发布到 output-channel
。否则,它将被丢弃。
groupTimeout
行为和 MessageGroupStoreReaper
之间存在差异(请参阅 使用 XML 配置聚合器)。Reaper 定期为 MessageGroupStore
中的所有 MessageGroup
启动强制完成。如果在 groupTimeout
期间没有到达新消息,则 groupTimeout
会针对每个 MessageGroup
单独执行此操作。此外,Reaper 可用于删除空组(如果 expire-groups-upon-completion
为 false,则保留空组以丢弃延迟消息)。
从 5.5 版本开始,groupTimeoutExpression
可以评估为 java.util.Date
实例。这在确定基于组创建时间(MessageGroup.getTimestamp()
)的计划任务时间而不是当前消息到达时间(如 groupTimeoutExpression
评估为 long
时计算的那样)的情况下很有用。
group-timeout-expression="size() ge 2 ? new java.util.Date(timestamp + 200) : null"
使用注释配置聚合器
以下示例显示了一个使用注释配置的聚合器。
public class Waiter {
...
@Aggregator (1)
public Delivery aggregatingMethod(List<OrderItem> items) {
...
}
@ReleaseStrategy (2)
public boolean releaseChecker(List<Message<?>> messages) {
...
}
@CorrelationStrategy (3)
public String correlateBy(OrderItem item) {
...
}
}
1 | 一个注解,表示此方法应作为聚合器使用。如果此类用作聚合器,则必须指定此注解。 |
2 | 一个注解,表示此方法用作聚合器的发布策略。如果任何方法上都没有此注解,则聚合器使用 SimpleSequenceSizeReleaseStrategy 。 |
3 | 一个注解,表示此方法应作为聚合器的关联策略使用。如果没有指定关联策略,则聚合器使用基于 CORRELATION_ID 的 HeaderAttributeCorrelationStrategy 。 |
XML 元素提供的全部配置选项也适用于 @Aggregator
注解。
聚合器可以从 XML 中显式引用,或者如果在类上定义了 @MessageEndpoint
,则可以通过类路径扫描自动检测。
聚合器组件的注解配置(@Aggregator
及其他)仅涵盖简单用例,其中大多数默认选项就足够了。如果您需要在使用注解配置时对这些选项进行更多控制,请考虑使用 @Bean
定义 AggregatingMessageHandler
并将其 @Bean
方法标记为 @ServiceActivator
,如下例所示
@ServiceActivator(inputChannel = "aggregatorChannel")
@Bean
public MessageHandler aggregator(MessageGroupStore jdbcMessageGroupStore) {
AggregatingMessageHandler aggregator =
new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
jdbcMessageGroupStore);
aggregator.setOutputChannel(resultsChannel());
aggregator.setGroupTimeoutExpression(new ValueExpression<>(500L));
aggregator.setTaskScheduler(this.taskScheduler);
return aggregator;
}
有关更多信息,请参阅 编程模型 和 @Bean
方法上的注解。
从 4.2 版本开始,AggregatorFactoryBean 可用于简化 AggregatingMessageHandler 的 Java 配置。
|
在聚合器中管理状态:MessageGroupStore
聚合器(以及 Spring Integration 中的一些其他模式)是一种有状态模式,需要根据一段时间内到达的一组消息(所有消息都具有相同的关联键)做出决策。有状态模式(例如 ReleaseStrategy
)中接口的设计遵循以下原则:组件(无论是框架定义的还是用户定义的)应该能够保持无状态。所有状态都由 MessageGroup
携带,其管理委托给 MessageGroupStore
。MessageGroupStore
接口定义如下
public interface MessageGroupStore {
int getMessageCountForAllMessageGroups();
int getMarkedMessageCountForAllMessageGroups();
int getMessageGroupCount();
MessageGroup getMessageGroup(Object groupId);
MessageGroup addMessageToGroup(Object groupId, Message<?> message);
MessageGroup markMessageGroup(MessageGroup group);
MessageGroup removeMessageFromGroup(Object key, Message<?> messageToRemove);
MessageGroup markMessageFromGroup(Object key, Message<?> messageToMark);
void removeMessageGroup(Object groupId);
void registerMessageGroupExpiryCallback(MessageGroupCallback callback);
int expireMessageGroups(long timeout);
}
有关更多信息,请参阅 Javadoc。
MessageGroupStore
在等待释放策略触发时累积 MessageGroups
中的状态信息,而该事件可能永远不会发生。因此,为了防止过时消息持续存在,并且为了使易失性存储提供在应用程序关闭时清理的挂钩,MessageGroupStore
允许您注册回调以在 MessageGroups
过期时应用于它们。如以下列表所示,该接口非常简单
public interface MessageGroupCallback {
void execute(MessageGroupStore messageGroupStore, MessageGroup group);
}
回调可以直接访问存储和消息组,以便它可以管理持久状态(例如,通过完全从存储中删除组)。
MessageGroupStore
保持这些回调的列表,它按需将这些回调应用于所有时间戳早于作为参数提供的时间的消息(请参阅前面描述的 registerMessageGroupExpiryCallback(..)
和 expireMessageGroups(..)
方法)。
重要的是,当您打算依赖 expireMessageGroups 功能时,不要在不同的聚合器组件中使用相同的 MessageGroupStore 实例。每个 AbstractCorrelatingMessageHandler 都根据 forceComplete() 回调注册自己的 MessageGroupCallback 。这样,每个用于过期的组都可以由错误的聚合器完成或丢弃。从 5.0.10 版本开始,AbstractCorrelatingMessageHandler 在 MessageGroupStore 中的注册回调中使用 UniqueExpiryCallback 。MessageGroupStore 反过来检查此类实例的存在,如果在回调集中已经存在此类实例,则记录一条带有适当消息的错误。这样,框架不允许在不同的聚合器/重新排序器中使用 MessageGroupStore 实例,以避免上述过期组未由特定关联处理程序创建的副作用。
|
您可以使用超时值调用 expireMessageGroups
方法。任何比当前时间减去此值更旧的消息都将过期,并应用回调。因此,它是存储的用户定义了消息组“过期”的含义。
为了方便用户,Spring Integration 提供了一个名为 MessageGroupStoreReaper
的消息过期包装器,如下例所示。
<bean id="reaper" class="org...MessageGroupStoreReaper">
<property name="messageGroupStore" ref="messageStore"/>
<property name="timeout" value="30000"/>
</bean>
<task:scheduled-tasks scheduler="scheduler">
<task:scheduled ref="reaper" method="run" fixed-rate="10000"/>
</task:scheduled-tasks>
Reaper 是一个 Runnable
。在上面的例子中,消息组存储的过期方法每十秒调用一次。超时时间本身为 30 秒。
重要的是要理解 MessageGroupStoreReaper 的 'timeout' 属性是一个近似值,会受到任务调度器速率的影响,因为此属性仅在 MessageGroupStoreReaper 任务的下一个计划执行时才会被检查。例如,如果超时时间设置为十分钟,但 MessageGroupStoreReaper 任务计划每小时运行一次,并且 MessageGroupStoreReaper 任务的上次执行发生在超时前一分钟,则 MessageGroup 不会在接下来的 59 分钟内过期。因此,我们建议将速率设置为至少等于超时值或更短。
|
除了 reaper 之外,当应用程序通过 AbstractCorrelatingMessageHandler
中的生命周期回调关闭时,也会调用过期回调。
AbstractCorrelatingMessageHandler
注册自己的过期回调,这是与聚合器 XML 配置中的布尔标志 send-partial-result-on-expiry
之间的联系。如果该标志设置为 true
,则当调用过期回调时,任何尚未释放的组中未标记的消息都可以发送到输出通道。
由于 MessageGroupStoreReaper 是从计划任务中调用的,并且可能会导致消息(取决于 sendPartialResultOnExpiry 选项)到下游集成流的生产,因此建议提供一个带有 MessagePublishingErrorHandler 的自定义 TaskScheduler 来通过 errorChannel 处理异常,因为这可能是常规聚合器释放功能所期望的。相同的逻辑适用于也依赖于 TaskScheduler 的组超时功能。有关更多信息,请参见 错误处理。
|
当对不同的关联端点使用共享的 一些 有关 |
Flux 聚合器
在 5.2 版本中,引入了FluxAggregatorMessageHandler
组件。它基于 Project Reactor 的Flux.groupBy()
和Flux.window()
操作符。传入的消息被发射到由该组件构造函数中的Flux.create()
启动的FluxSink
中。如果未提供outputChannel
或它不是ReactiveStreamsSubscribableChannel
的实例,则对主Flux
的订阅将从Lifecycle.start()
实现中完成。否则,它将推迟到由ReactiveStreamsSubscribableChannel
实现完成的订阅。消息通过使用CorrelationStrategy
进行分组,该策略用于分组键。默认情况下,会查询消息的IntegrationMessageHeaderAccessor.CORRELATION_ID
标头。
默认情况下,每个关闭的窗口都会作为Flux
在要生产的消息的有效负载中释放。此消息包含窗口中第一条消息的所有标头。输出消息有效负载中的此Flux
必须在 downstream 订阅和处理。这种逻辑可以通过FluxAggregatorMessageHandler
的setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>>)
配置选项进行自定义(或覆盖)。例如,如果我们希望在最终消息中有一个List
类型的有效负载,我们可以像这样配置一个Flux.collectList()
fluxAggregatorMessageHandler.setCombineFunction(
(messageFlux) ->
messageFlux
.map(Message::getPayload)
.collectList()
.map(GenericMessage::new));
FluxAggregatorMessageHandler
中有多种选项可用于选择合适的窗口策略
-
setBoundaryTrigger(Predicate<Message<?>>)
- 传播到Flux.windowUntil()
操作符。有关更多信息,请参见其 JavaDocs。优先于所有其他窗口选项。 -
setWindowSize(int)
和setWindowSizeFunction(Function<Message<?>, Integer>)
- 传播到Flux.window(int)
或windowTimeout(int, Duration)
。默认情况下,窗口大小从组中的第一条消息及其IntegrationMessageHeaderAccessor.SEQUENCE_SIZE
标头计算得出。 -
setWindowTimespan(Duration)
- 传播到Flux.window(Duration)
或windowTimeout(int, Duration)
,具体取决于窗口大小配置。 -
setWindowConfigurer(Function<Flux<Message<?>>, Flux<Flux<Message<?>>>>)
- 一个函数,用于将转换应用于分组的 Flux,以执行未被公开选项涵盖的任何自定义窗口操作。
由于此组件是 MessageHandler
的实现,因此它可以简单地用作 @Bean
定义,并与 @ServiceActivator
消息传递注释一起使用。使用 Java DSL,它可以从 .handle()
EIP 方法中使用。以下示例演示了如何在运行时注册 IntegrationFlow
以及如何将 FluxAggregatorMessageHandler
与上游拆分器相关联。
IntegrationFlow fluxFlow =
(flow) -> flow
.split()
.channel(MessageChannels.flux())
.handle(new FluxAggregatorMessageHandler());
IntegrationFlowContext.IntegrationFlowRegistration registration =
this.integrationFlowContext.registration(fluxFlow)
.register();
Flux<Message<?>> window =
registration.getMessagingTemplate()
.convertSendAndReceive(new Integer[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }, Flux.class);