分隔器
Splitter 组件的作用是将一条消息分割成多个部分,并将结果消息发送出去进行独立处理。它们通常是包含聚合器的管道中的上游生产者。
编程模型
执行分割的 API 包含一个基类 `AbstractMessageSplitter`。它是一个 `MessageHandler` 实现,封装了分割程序的常见功能,例如在生成的 message 中填写相应的 message header(`CORRELATION_ID`、`SEQUENCE_SIZE` 和 `SEQUENCE_NUMBER`)。这种填写可以追踪消息及其处理结果(在一个典型的场景中,这些 header 会被复制到各种转换端点生成的 message 中)。例如,这些值可以被 组合消息处理器 使用。
以下示例显示了 `AbstractMessageSplitter` 的摘录
public abstract class AbstractMessageSplitter
extends AbstractReplyProducingMessageConsumer {
...
protected abstract Object splitMessage(Message<?> message);
}
要在应用程序中实现特定的分割器,您可以扩展 `AbstractMessageSplitter` 并实现 `splitMessage` 方法,其中包含分割消息的逻辑。返回值可以是以下之一:
-
消息的 `Collection` 或数组,或者迭代消息的 `Iterable`(或 `Iterator`)。在这种情况下,消息将作为消息发送(在填充 `CORRELATION_ID`、`SEQUENCE_SIZE` 和 `SEQUENCE_NUMBER` 之后)。使用这种方法可以提供更多控制——例如,在分割过程中填充自定义消息 header。
-
非消息对象的 `Collection` 或数组,或者迭代非消息对象的 `Iterable`(或 `Iterator`)。它的工作方式与前一种情况类似,只是每个集合元素都用作消息有效负载。使用这种方法可以让你专注于领域对象,而不必考虑消息系统,从而产生更容易测试的代码。
-
一个 `Message` 或非消息对象(但不是集合或数组)。它的工作方式与前面的情况类似,只是发送单个消息。
在 Spring Integration 中,任何 POJO 都可以实现分割算法,前提是它定义了一个接受单个参数并具有返回值的方法。在这种情况下,方法的返回值将按前面所述进行解释。输入参数可以是 `Message` 或简单的 POJO。在后一种情况下,分割器接收传入消息的有效负载。我们推荐这种方法,因为它将代码与 Spring Integration API 解耦,通常更容易测试。
迭代器
从 4.1 版本开始,`AbstractMessageSplitter` 支持 `Iterator` 类型作为要分割的 `value`。请注意,在 `Iterator`(或 `Iterable`)的情况下,我们无法访问底层项目的数量,并且 `SEQUENCE_SIZE` header 设置为 `0`。这意味着 `<aggregator>` 的默认 `SequenceSizeReleaseStrategy` 将不起作用,并且来自 `splitter` 的 `CORRELATION_ID` 的组将不会被释放;它将保持为 `incomplete`。在这种情况下,您应该使用适当的自定义 `ReleaseStrategy` 或将 `send-partial-result-on-expiry` 与 `group-timeout` 或 `MessageGroupStoreReaper` 一起使用。
从 5.0 版本开始,`AbstractMessageSplitter` 提供了 `protected obtainSizeIfPossible()` 方法,以便如果可能的话,确定 `Iterable` 和 `Iterator` 对象的大小。例如,`XPathMessageSplitter` 可以确定底层 `NodeList` 对象的大小。从 5.0.9 版本开始,此方法还可以正确返回 `com.fasterxml.jackson.core.TreeNode` 的大小。
`Iterator` 对象对于避免在分割之前需要在内存中构建整个集合非常有用。例如,当使用迭代或流从某些外部系统(例如数据库或 FTP `MGET`)填充底层项目时。
Stream 和 Flux
从 5.0 版本开始,`AbstractMessageSplitter` 支持 Java `Stream` 和 Reactive Streams `Publisher` 类型作为要分割的 `value`。在这种情况下,目标 `Iterator` 是基于它们的迭代功能构建的。
此外,如果分割器的输出通道是 `ReactiveStreamsSubscribableChannel` 的实例,则 `AbstractMessageSplitter` 将生成 `Flux` 结果而不是 `Iterator`,并且输出通道将订阅此 `Flux`,以便基于反压的分割在下游流需求上进行。
从 5.2 版本开始,分割器支持 `discardChannel` 选项,用于发送那些分割函数返回空容器(集合、数组、流、`Flux` 等)的请求消息。在这种情况下,没有项目可用于迭代发送到 `outputChannel`。`null` 分割结果仍然是流结束的指示符。
使用 Java、Groovy 和 Kotlin DSL 配置分割器
基于 `Message` 及其可迭代有效负载的简单分割器的 DSL 配置示例
-
Java DSL
-
Kotlin DSL
-
Groovy DSL
@Bean
public IntegrationFlow someFlow() {
return f -> f.split(Message.class, Message::getPayload);
}
@Bean
fun someFlow() =
integrationFlow {
split<Message<*>> { it.payload }
}
@Bean
someFlow() {
integrationFlow {
splitWith {
expectedType Message<?>
function { it.payload }
}
}
}
有关 DSL 的更多信息,请参见各个章节
使用 XML 配置分割器
分割器可以通过 XML 配置如下:
<int:channel id="inputChannel"/>
<int:splitter id="splitter" (1)
ref="splitterBean" (2)
method="split" (3)
input-channel="inputChannel" (4)
output-channel="outputChannel" (5)
discard-channel="discardChannel" /> (6)
<int:channel id="outputChannel"/>
<beans:bean id="splitterBean" class="sample.PojoSplitter"/>
1 | 分割器的 ID 是可选的。 |
2 | 对应用程序上下文中的 bean 的引用。该 bean 必须实现分割逻辑,如前面部分所述。可选。如果未提供对 bean 的引用,则假定到达 `input-channel` 的消息的有效负载是 `java.util.Collection` 的实现,并且默认分割逻辑将应用于该集合,将每个单独的元素合并到消息中并将其发送到 `output-channel`。 |
3 | 实现分割逻辑的方法(在 bean 上定义)。可选。 |
4 | 分割器的输入通道。必需。 |
5 | 分割器将传入消息分割结果发送到的通道。可选(因为传入消息本身可以指定回复通道)。 |
6 | 在分割结果为空的情况下,将请求消息发送到的通道。可选(它们将像 `null` 结果一样停止)。 |
如果自定义分割器实现可以在其他 `<splitter>` 定义中引用,我们建议使用 `ref` 属性。但是,如果自定义分割器处理程序实现应该作用于单个 `<splitter>` 定义,您可以配置内部 bean 定义,如下例所示:
<int:splitter id="testSplitter" input-channel="inChannel" method="split"
output-channel="outChannel">
<beans:bean class="org.foo.TestSplitter"/>
</int:splitter>
不允许在同一个 `<int:splitter>` 配置中同时使用 `ref` 属性和内部处理程序定义,因为它会创建模棱两可的情况并导致抛出异常。 |
如果 `ref` 属性引用扩展 `AbstractMessageProducingHandler` 的 bean(例如框架本身提供的分割器),则通过将输出通道直接注入处理程序来优化配置。在这种情况下,每个 `ref` 必须是单独的 bean 实例(或 `prototype` 作用域的 bean)或使用内部 `<bean/>` 配置类型。但是,此优化仅在您未在分割器 XML 定义中提供任何分割器特定属性的情况下才适用。如果您无意中从多个 bean 引用相同的 message 处理程序,则会得到配置异常。 |
使用注解配置分割器
`@Splitter` 注解适用于期望 `Message` 类型或消息有效负载类型的方法,并且方法的返回值应该是任何类型的 `Collection`。如果返回值不是实际的 `Message` 对象,则每个项目都将作为 `Message` 的有效负载包装在 `Message` 中。每个生成的 `Message` 都将发送到定义 `@Splitter` 的端点的指定输出通道。
以下示例显示了如何使用 `@Splitter` 注解配置分割器:
@Splitter
List<LineItem> extractItems(Order order) {
return order.getItems()
}