重排序器
重新排序器与聚合器相关,但其用途不同。聚合器组合消息,而重新排序器则在不更改消息的情况下传递消息。
功能
重新排序器的工作方式类似于聚合器,因为它使用CORRELATION_ID
将消息存储在组中。不同之处在于重新排序器不会以任何方式处理消息。相反,它会按照其SEQUENCE_NUMBER
头值的顺序释放它们。
关于这一点,您可以选择一次性释放所有消息(在整个序列之后,根据SEQUENCE_SIZE
和其他可能性)或在可用有效序列后立即释放。(我们将在本章后面介绍“有效序列”的含义。)
重新排序器旨在重新排序具有少量间隙的相对较短的消息序列。如果您有大量具有许多间隙的不连续序列,则可能会遇到性能问题。 |
配置重新排序器
有关在 Java DSL 中配置重新排序器,请参阅聚合器和重新排序器。
配置重新排序器只需要在 XML 中包含相应的元素。
以下示例显示了重新排序器配置
<int:channel id="inputChannel"/>
<int:channel id="outputChannel"/>
<int:resequencer id="completelyDefinedResequencer" (1)
input-channel="inputChannel" (2)
output-channel="outputChannel" (3)
discard-channel="discardChannel" (4)
release-partial-sequences="true" (5)
message-store="messageStore" (6)
send-partial-result-on-expiry="true" (7)
send-timeout="86420000" (8)
correlation-strategy="correlationStrategyBean" (9)
correlation-strategy-method="correlate" (10)
correlation-strategy-expression="headers['something']" (11)
release-strategy="releaseStrategyBean" (12)
release-strategy-method="release" (13)
release-strategy-expression="size() == 10" (14)
empty-group-min-timeout="60000" (15)
lock-registry="lockRegistry" (16)
group-timeout="60000" (17)
group-timeout-expression="size() ge 2 ? 100 : -1" (18)
scheduler="taskScheduler" /> (19)
expire-group-upon-timeout="false" /> (20)
1 | 重新排序器的 ID 可选。 |
2 | 重新排序器的输入通道。必需。 |
3 | 重新排序器将重新排序的消息发送到的通道。可选。 |
4 | 重新排序器将超时消息发送到的通道(如果send-partial-result-on-timeout 设置为false )。可选。 |
5 | 是否在可用时立即发送有序序列,或者仅在整个消息组到达后发送。可选。(默认值为false 。) |
6 | 对MessageGroupStore 的引用,该存储可用于在关联键下存储消息组,直到它们完成。可选。(默认情况下,为易失性内存存储。) |
7 | 在组过期时,是否应发送有序组(即使缺少某些消息)。可选。(默认值为 false。)请参阅在聚合器中管理状态:MessageGroupStore 。 |
8 | 将回复Message 发送到output-channel 或discard-channel 时等待的超时间隔。仅当输出通道存在一些“发送”限制时才应用此间隔,例如具有固定“容量”的QueueChannel 。在这种情况下,将抛出MessageDeliveryException 。对于AbstractSubscribableChannel 实现,将忽略send-timeout 。对于group-timeout(-expression) ,来自计划的过期任务的MessageDeliveryException 导致此任务重新计划。可选。 |
9 | 对实现消息关联(分组)算法的 Bean 的引用。该 Bean 可以是CorrelationStrategy 接口的实现或 POJO。在后一种情况下,还必须定义correlation-strategy-method 属性。可选。(默认情况下,聚合器使用IntegrationMessageHeaderAccessor.CORRELATION_ID 头。) |
10 | 在correlation-strategy 引用的 Bean 上定义的方法,并实现关联决策算法。可选,并受限制(需要存在correlation-strategy )。 |
11 | 表示关联策略的 SpEL 表达式。例如:"headers['something']" 。仅允许使用correlation-strategy 或correlation-strategy-expression 之一。 |
12 | 对实现释放策略的 Bean 的引用。该 Bean 可以是ReleaseStrategy 接口的实现或 POJO。在后一种情况下,还必须定义release-strategy-method 属性。可选(默认情况下,聚合器将使用IntegrationMessageHeaderAccessor.SEQUENCE_SIZE 头属性)。 |
13 | 在release-strategy 引用的 Bean 上定义的方法,并实现完成决策算法。可选,并受限制(需要存在release-strategy )。 |
14 | 表示释放策略的 SpEL 表达式。表达式的根对象是MessageGroup 。例如:"size() == 5" 。仅允许使用release-strategy 或release-strategy-expression 之一。 |
15 | 仅当为<resequencer> MessageStore 配置了MessageGroupStoreReaper 时才适用。默认情况下,当配置MessageGroupStoreReaper 以使部分组过期时,还会删除空组。在组正常释放后,存在空组。这是为了能够检测和丢弃延迟到达的消息。如果希望以比使部分组过期更长的计划使空组过期,请设置此属性。然后,空组不会从MessageStore 中删除,直到它们至少经过此毫秒数未被修改。请注意,空组的实际过期时间还会受到清理程序的超时属性的影响,并且最多可以达到此值加上超时时间。 |
16 | 请参阅使用 XML 配置聚合器。 |
17 | 请参阅使用 XML 配置聚合器。 |
18 | 请参阅使用 XML 配置聚合器。 |
19 | 请参阅使用 XML 配置聚合器。 |
20 | 默认情况下,当组因超时(或由MessageGroupStoreReaper )完成时,将保留空组的元数据。延迟到达的消息将立即被丢弃。将其设置为true 以完全删除组。然后,延迟到达的消息将启动一个新组,并且不会被丢弃,直到该组再次超时。由于导致超时的序列范围中的“空洞”,新组永远不会正常释放。可以使用MessageGroupStoreReaper 以及empty-group-min-timeout 属性稍后使空组过期(完全删除)。从版本 5.0 开始,空组在empty-group-min-timeout 过去后也会被计划删除。默认为“false”。 |
另请参阅聚合器过期组以获取更多信息。
由于在 Java 类中没有针对重新排序器实现的自定义行为,因此没有对其进行注释支持。 |