消息存储

企业集成模式》(EIP)一书中识别出几种能够缓冲消息的模式。例如,聚合器会缓冲消息,直到它们可以被释放,而 QueueChannel 会缓冲消息,直到消费者明确地从该通道接收这些消息。由于在消息流中的任何点都可能发生故障,因此缓冲消息的 EIP 组件也会引入一个消息可能丢失的点。

为了降低消息丢失的风险,EIP 定义了 消息存储 模式,该模式允许 EIP 组件存储消息,通常存储在某种类型的持久存储中(例如 RDBMS)。

Spring Integration 通过以下方式为消息存储模式提供支持:

  • 定义 org.springframework.integration.store.MessageStore 策略接口

  • 提供该接口的多个实现

  • 在所有具有缓冲消息功能的组件上公开 message-store 属性,以便您可以注入任何实现 MessageStore 接口的实例。

有关如何配置特定消息存储实现以及如何将 MessageStore 实现注入特定缓冲组件的详细信息,请参阅整本手册(请参阅特定组件,例如 QueueChannel聚合器延迟器 等)。以下示例对展示了如何为 QueueChannel 和聚合器添加对消息存储的引用。

QueueChannel
<int:channel id="myQueueChannel">
    <int:queue message-store="refToMessageStore"/>
<int:channel>
聚合器
<int:aggregator message-store="refToMessageStore"/>

默认情况下,消息使用 o.s.i.store.SimpleMessageStoreMessageStore 的一个实现)存储在内存中。这可能适用于开发或简单的低流量环境,在这些环境中,非持久消息的潜在丢失并不令人担忧。但是,典型的生产应用程序需要更强大的选项,不仅是为了降低消息丢失的风险,而且为了避免潜在的内存不足错误。因此,我们还为各种数据存储提供了 MessageStore 实现。以下是支持的实现的完整列表:

但是,请注意使用 MessageStore 的持久化实现时的一些限制。

消息数据(有效负载和标头)使用不同的序列化策略进行序列化和反序列化,具体取决于 MessageStore 的实现。例如,使用 JdbcMessageStore 时,默认情况下只持久化 Serializable 数据。在这种情况下,非可序列化标头会在序列化之前被移除。此外,请注意传输适配器(如 FTP、HTTP、JMS 等)注入的协议特定标头。例如,<http:inbound-channel-adapter/> 将 HTTP 标头映射到消息标头,其中之一是 org.springframework.http.MediaType 实例的非可序列化 ArrayList。但是,您可以将 SerializerDeserializer 策略接口的自定义实现注入到某些 MessageStore 实现(如 JdbcMessageStore)中,以更改序列化和反序列化的行为。

请特别注意表示某些类型数据的标头。例如,如果其中一个标头包含某个 Spring Bean 的实例,在反序列化时,您最终可能会得到该 Bean 的不同实例,这会直接影响框架创建的一些隐式标头(如 REPLY_CHANNELERROR_CHANNEL)。目前,它们不可序列化,但即使它们可序列化,反序列化的通道也不会代表预期的实例。

从 Spring Integration 3.0 版本开始,您可以使用配置为用名称替换这些标头的标头富集器来解决此问题,方法是在将通道注册到 HeaderChannelRegistry 后进行操作。

此外,请考虑以下情况:当您将消息流配置为以下方式时:网关 → 队列通道(由持久化消息存储支持)→ 服务激活器。该网关会创建一个临时回复通道,该通道在服务激活器的轮询器从队列中读取时会丢失。同样,您可以使用标头富集器将标头替换为 String 表示。

有关更多信息,请参阅 Header Enricher

Spring Integration 4.0 引入了两个新接口

  • ChannelMessageStore:用于实现针对 QueueChannel 实例的特定操作

  • PriorityCapableChannelMessageStore:用于标记要用于 PriorityChannel 实例的 MessageStore 实现,并为持久化消息提供优先级顺序。

实际行为取决于实现。框架提供了以下实现,可作为 QueueChannelPriorityChannel 的持久 MessageStore 使用

关于 SimpleMessageStore 的注意事项

从版本 4.1 开始,SimpleMessageStore 在调用 getMessageGroup() 时不再复制消息组。对于大型消息组,这是一个重大的性能问题。4.0.1 引入了布尔值 copyOnGet 属性,可用于控制此行为。当内部由聚合器使用时,此属性设置为 false 以提高性能。现在默认情况下为 false

在聚合器等组件之外访问组存储的用户现在将获得对聚合器正在使用的组的直接引用,而不是副本。在聚合器之外操作组可能会导致不可预测的结果。

因此,您应该要么不执行此类操作,要么将 copyOnGet 属性设置为 true

使用 MessageGroupFactory

从版本 4.3 开始,一些 MessageGroupStore 实现可以注入自定义 MessageGroupFactory 策略,以创建和自定义 MessageGroupStore 使用的 MessageGroup 实例。这默认为 SimpleMessageGroupFactory,它根据 GroupType.HASH_SET (LinkedHashSet) 内部集合生成 SimpleMessageGroup 实例。其他可能的选项是 SYNCHRONISED_SETBLOCKING_QUEUE,其中最后一个选项可用于恢复以前的 SimpleMessageGroup 行为。此外,PERSISTENT 选项也可用。有关更多信息,请参阅下一节。从版本 5.0.1 开始,LIST 选项也可用,适用于组中消息的顺序和唯一性无关紧要的情况。

持久 MessageGroupStore 和延迟加载

从版本 4.3 开始,所有持久 MessageGroupStore 实例都以延迟加载的方式从存储中检索 MessageGroup 实例及其 messages。在大多数情况下,这对相关 MessageHandler 实例(请参阅 AggregatorResequencer)很有用,因为在每次相关操作时从存储中加载整个 MessageGroup 会增加开销。

您可以使用AbstractMessageGroupStore.setLazyLoadMessageGroups(false)选项从配置中关闭延迟加载行为。

我们在 MongoDB MessageStore (MongoDB 消息存储) 和 <aggregator> (聚合器) 上对延迟加载进行的性能测试使用类似于以下内容的自定义release-strategy

<int:aggregator input-channel="inputChannel"
                output-channel="outputChannel"
                message-store="mongoStore"
                release-strategy-expression="size() == 1000"/>

它针对 1000 条简单消息产生了类似于以下结果的结果

...
StopWatch 'Lazy-Load Performance': running time (millis) = 38918
-----------------------------------------
ms     %     Task name
-----------------------------------------
02652  007%  Lazy-Load
36266  093%  Eager
...

但是,从 5.5 版本开始,所有持久化的 MessageGroupStore 实现都提供了一个基于目标数据库流式 API 的 streamMessagesForGroup(Object groupId) 契约。这在存储中的组非常大时提高了资源利用率。在框架内部,这个新的 API 在 延迟器(例如)中使用,当它在启动时重新调度持久化消息时。返回的 Stream<Message<?>> 必须在处理结束时关闭,例如通过 try-with-resources 自动关闭。每当使用 PersistentMessageGroup 时,它的 streamMessages() 会委托给 MessageGroupStore.streamMessagesForGroup()

消息组条件

从 5.5 版本开始,MessageGroup 抽象提供了一个 condition 字符串选项。此选项的值可以是任何可以稍后解析的内容,以便出于任何原因做出有关该组的决定。例如,来自 关联消息处理程序ReleaseStrategy 可以从组中查询此属性,而不是迭代组中的所有消息。MessageGroupStore 公开了一个 setGroupCondition(Object groupId, String condition) API。为此,AbstractCorrelatingMessageHandler 添加了一个 setGroupConditionSupplier(BiFunction<Message<?>, String, String>) 选项。此函数在消息添加到组后以及组的现有条件下针对每条消息进行评估。实现可以决定返回一个新值、现有值或将目标条件重置为 nullcondition 的值可以是 JSON、SpEL 表达式、数字或任何可以作为字符串序列化并在之后解析的内容。例如,来自 文件聚合器 组件的 FileMarkerReleaseStrategy 将条件填充到来自 FileHeaders.LINE_COUNT 标头的组中 FileSplitter.FileMarker.Mark.END 消息,并通过将其与组大小进行比较来查询它 canRelease() 中的值。这样,它就不会迭代组中的所有消息来查找具有 FileHeaders.LINE_COUNT 标头的 FileSplitter.FileMarker.Mark.END 消息。它还允许结束标记在所有其他记录到达聚合器之前到达;例如,在多线程环境中处理文件时。

此外,为了配置方便,引入了GroupConditionProvider契约。AbstractCorrelatingMessageHandler 检查提供的 ReleaseStrategy 是否实现了此接口,并提取一个 conditionSupplier 用于组条件评估逻辑。