重新排序器

重排序器与聚合器相关,但服务于不同的目的。聚合器组合消息,而重排序器在不更改消息的情况下传递消息。

功能

重排序器的工作方式与聚合器类似,它使用CORRELATION_ID将消息存储在组中。不同之处在于重排序器不会以任何方式处理消息。相反,它会按照消息的SEQUENCE_NUMBER头值顺序释放它们。

关于这一点,您可以选择一次性释放所有消息(在整个序列之后,根据SEQUENCE_SIZE和其他可能性),或者在有效序列可用时立即释放。(我们将在本章后面介绍“有效序列”的含义。)

重排序器旨在对具有较小间隙的相对较短的消息序列进行重新排序。如果您有大量具有许多间隙的不相交序列,您可能会遇到性能问题。

配置重排序器

有关在 Java DSL 中配置重排序器的信息,请参见聚合器和重排序器

配置重排序器只需要在 XML 中包含相应的元素。

以下示例显示了重排序器配置

<int:channel id="inputChannel"/>

<int:channel id="outputChannel"/>

<int:resequencer id="completelyDefinedResequencer"  (1)
  input-channel="inputChannel"  (2)
  output-channel="outputChannel"  (3)
  discard-channel="discardChannel"  (4)
  release-partial-sequences="true"  (5)
  message-store="messageStore"  (6)
  send-partial-result-on-expiry="true"  (7)
  send-timeout="86420000"  (8)
  correlation-strategy="correlationStrategyBean"  (9)
  correlation-strategy-method="correlate"  (10)
  correlation-strategy-expression="headers['something']"  (11)
  release-strategy="releaseStrategyBean"  (12)
  release-strategy-method="release"  (13)
  release-strategy-expression="size() == 10"  (14)
  empty-group-min-timeout="60000"  (15)

  lock-registry="lockRegistry"  (16)

  group-timeout="60000"  (17)
  group-timeout-expression="size() ge 2 ? 100 : -1"  (18)
  scheduler="taskScheduler" />  (19)
  expire-group-upon-timeout="false" />  (20)
1 重排序器的 ID 是可选的。
2 重排序器的输入通道。必需。
3 重排序器将重新排序的消息发送到的通道。可选。
4 重排序器将超时消息发送到的通道(如果send-partial-result-on-timeout设置为false)。可选。
5 是否在可用时立即发送排序后的序列,还是只在整个消息组到达后发送。可选。(默认值为false。)
6 MessageGroupStore的引用,该存储可以用于在消息组完成之前将其存储在它们的关联键下。可选。(默认情况下是易失性内存存储。)
7 在组过期时,是否应该发送排序后的组(即使缺少一些消息)。可选。(默认值为 false。)请参见在聚合器中管理状态:MessageGroupStore
8 发送回复Messageoutput-channeldiscard-channel时等待的超时时间间隔。仅当输出通道存在一些“发送”限制时才应用,例如具有固定“容量”的QueueChannel。在这种情况下,将抛出MessageDeliveryException。对于AbstractSubscribableChannel实现,send-timeout将被忽略。对于group-timeout(-expression),来自计划的过期任务的MessageDeliveryException会导致此任务重新计划。可选。
9 对实现消息关联(分组)算法的 Bean 的引用。该 Bean 可以是CorrelationStrategy接口的实现或 POJO。在后一种情况下,还必须定义correlation-strategy-method属性。可选。(默认情况下,聚合器使用IntegrationMessageHeaderAccessor.CORRELATION_ID标头。)
10 correlation-strategy引用的 Bean 上定义的方法,该方法实现关联决策算法。可选,有限制(需要correlation-strategy存在)。
11 表示关联策略的 SpEL 表达式。示例:"headers['something']"。仅允许correlation-strategycorrelation-strategy-expression中的一个。
12 对实现释放策略的 Bean 的引用。该 Bean 可以是ReleaseStrategy接口的实现或 POJO。在后一种情况下,还必须定义release-strategy-method属性。可选(默认情况下,聚合器将使用IntegrationMessageHeaderAccessor.SEQUENCE_SIZE标头属性)。
13 release-strategy引用的 Bean 上定义的方法,该方法实现完成决策算法。可选,有限制(需要release-strategy存在)。
14 表示释放策略的 SpEL 表达式。表达式的根对象是MessageGroup。示例:"size() == 5"。仅允许release-strategyrelease-strategy-expression中的一个。
15 仅当为<resequencer>MessageStore配置了MessageGroupStoreReaper时才适用。默认情况下,当配置MessageGroupStoreReaper以使部分组过期时,空组也会被删除。在组正常释放后,空组存在。这是为了能够检测和丢弃迟到的消息。如果您希望以比过期部分组更长的计划来过期空组,请设置此属性。然后,空组不会从MessageStore中删除,直到它们至少在该毫秒数内未被修改。请注意,过期空组的实际时间也会受到收割机超时属性的影响,并且可能与该值加上超时一样多。
16 参见使用 XML 配置聚合器
17 参见使用 XML 配置聚合器
18 参见使用 XML 配置聚合器
19 参见使用 XML 配置聚合器
20 默认情况下,当组因超时(或由MessageGroupStoreReaper)完成时,空组的元数据将被保留。 延迟到达的消息将立即被丢弃。 将此设置为true以完全删除组。 然后,延迟到达的消息将启动一个新组,并且在组再次超时之前不会被丢弃。 新组永远不会因导致超时的序列范围中的“漏洞”而正常释放。 空组可以使用MessageGroupStoreReaper以及empty-group-min-timeout属性稍后过期(完全删除)。 从 5.0 版开始,空组也会在empty-group-min-timeout过期后被安排删除。 默认值为“false”。

另请参阅聚合器过期组以获取更多信息。

由于在 Java 类中没有要为重排序器实现的自定义行为,因此没有对其进行注释支持。