消息存储
《企业集成模式》(EIP)一书识别了多种具有消息缓冲能力的模式。例如,聚合器会缓冲消息直到它们可以被释放,而 QueueChannel 会缓冲消息直到消费者显式地从该通道接收这些消息。由于消息流中任何点都可能发生故障,因此缓冲消息的 EIP 组件也引入了消息可能丢失的风险点。
为了降低消息丢失的风险,EIP 定义了消息存储模式,该模式允许 EIP 组件存储消息,通常是在某种类型的持久化存储(例如 RDBMS)中。
Spring Integration 通过以下方式支持消息存储模式:
-
定义
org.springframework.integration.store.MessageStore策略接口 -
提供该接口的多个实现
-
在所有具有消息缓冲能力的组件上暴露
message-store属性,以便您可以注入任何实现MessageStore接口的实例。
有关如何配置特定消息存储实现以及如何将 MessageStore 实现注入特定缓冲组件的详细信息在整个手册中都有描述(请参阅特定组件,例如 QueueChannel、Aggregator、Delayer 等)。以下两对示例展示了如何为 QueueChannel 和聚合器添加消息存储的引用。
<int:channel id="myQueueChannel">
<int:queue message-store="refToMessageStore"/>
<int:channel>
<int:aggregator message-store="refToMessageStore"/>
默认情况下,消息使用 o.s.i.store.SimpleMessageStore(MessageStore 的一个实现)存储在内存中。这对于开发或简单低容量环境来说可能很好,在这些环境中,非持久化消息的潜在丢失不是问题。然而,典型的生产应用程序需要一个更健壮的选项,不仅要降低消息丢失的风险,还要避免潜在的内存不足错误。因此,我们还为各种数据存储提供了 MessageStore 实现。以下是受支持实现的完整列表:
-
Hazelcast 消息存储:使用 Hazelcast 分布式缓存来存储消息
-
JDBC 消息存储:使用 RDBMS 来存储消息
-
Redis 消息存储:使用 Redis 键/值数据存储来存储消息
-
MongoDB 消息存储:使用 MongoDB 文档存储来存储消息
|
但是,在使用 消息数据(负载和头部)通过不同的序列化策略进行序列化和反序列化,具体取决于 特别注意表示某些类型数据的消息头。例如,如果其中一个消息头包含某个 Spring bean 的实例,则在反序列化时,您可能会得到该 bean 的不同实例,这会直接影响框架创建的一些隐式消息头(例如 从 Spring Integration 3.0 版本开始,您可以通过配置一个消息头增强器来解决此问题,该增强器在将通道注册到 此外,请考虑当您按如下方式配置消息流时会发生什么: 有关更多信息,请参阅消息头增强器。 |
Spring Integration 4.0 引入了两个新接口
-
ChannelMessageStore:用于实现QueueChannel实例特有的操作 -
PriorityCapableChannelMessageStore:用于标记MessageStore实现以用于PriorityChannel实例,并为持久化消息提供优先级排序。
实际行为取决于实现。框架提供以下实现,它们可以作为 QueueChannel 和 PriorityChannel 的持久性 MessageStore 使用
|
关于
SimpleMessageStore 的注意事项从 4.1 版本开始, 现在,在聚合器等组件之外访问组存储的用户将获得对聚合器正在使用的组的直接引用,而不是副本。在聚合器外部操作该组可能会导致不可预测的结果。 因此,您应该避免此类操作,或者将 |
使用 MessageGroupFactory
从 4.3 版本开始,一些 MessageGroupStore 实现可以注入自定义的 MessageGroupFactory 策略,以创建和定制 MessageGroupStore 使用的 MessageGroup 实例。这默认为 SimpleMessageGroupFactory,它根据 GroupType.HASH_SET (LinkedHashSet) 内部集合生成 SimpleMessageGroup 实例。其他可能的选项是 SYNCHRONISED_SET 和 BLOCKING_QUEUE,其中最后一个可用于恢复先前的 SimpleMessageGroup 行为。此外,还提供了 PERSISTENT 选项。更多信息请参阅下一节。从 5.0.1 版本开始,当组中消息的顺序和唯一性无关紧要时,还提供了 LIST 选项。
持久化 MessageGroupStore 和延迟加载
从 4.3 版本开始,所有持久化的 MessageGroupStore 实例都以延迟加载的方式从存储中检索 MessageGroup 实例及其 messages。在大多数情况下,这对于关联 MessageHandler 实例(参见聚合器和重排序器)非常有用,因为在每次关联操作时从存储中加载整个 MessageGroup 会增加开销。
您可以使用 AbstractMessageGroupStore.setLazyLoadMessageGroups(false) 选项来关闭配置中的延迟加载行为。
我们在 MongoDB MessageStore (MongoDB 消息存储) 和 <aggregator> (聚合器) 上的延迟加载性能测试使用了一个类似于以下的自定义 release-strategy:
<int:aggregator input-channel="inputChannel"
output-channel="outputChannel"
message-store="mongoStore"
release-strategy-expression="size() == 1000"/>
它对 1000 条简单消息产生类似以下的结果:
...
StopWatch 'Lazy-Load Performance': running time (millis) = 38918
-----------------------------------------
ms % Task name
-----------------------------------------
02652 007% Lazy-Load
36266 093% Eager
...
然而,从 5.5 版本开始,所有持久化 MessageGroupStore 实现都提供了一个基于目标数据库流式 API 的 streamMessagesForGroup(Object groupId) 契约。这提高了当组在存储中非常大时的资源利用率。在框架内部,这个新的 API 被用于例如 Delayer 在启动时重新调度持久化消息。返回的 Stream<Message<?>> 必须在处理结束时关闭,例如通过 try-with-resources 自动关闭。只要使用 PersistentMessageGroup,其 streamMessages() 就会委托给 MessageGroupStore.streamMessagesForGroup()。
消息组条件
从 5.5 版本开始,MessageGroup 抽象提供了一个 condition 字符串选项。此选项的值可以是任何可以在以后解析以做出组决定的内容。例如,来自关联消息处理器的 ReleaseStrategy 可能会查阅此组属性,而不是遍历组中的所有消息。MessageGroupStore 暴露了一个 setGroupCondition(Object groupId, String condition) API。为此,AbstractCorrelatingMessageHandler 中添加了一个 setGroupConditionSupplier(BiFunction<Message<?>, String, String>) 选项。此函数在消息添加到组后以及组的现有条件上进行评估。实现可能会决定返回新值、现有值或将目标条件重置为 null。condition 的值可以是 JSON、SpEL 表达式、数字或任何可以序列化为字符串并在之后解析的内容。例如,来自文件聚合器组件的 FileMarkerReleaseStrategy 从 FileSplitter.FileMarker.Mark.END 消息的 FileHeaders.LINE_COUNT 消息头中填充条件到组中,并与组大小与此条件中的值进行比较来咨询其 canRelease()。这样它就不必遍历组中的所有消息来查找带有 FileHeaders.LINE_COUNT 消息头的 FileSplitter.FileMarker.Mark.END 消息。它还允许结束标记在所有其他记录之前到达聚合器;例如,在多线程环境中处理文件时。
此外,为了配置方便,引入了 GroupConditionProvider 契约。AbstractCorrelatingMessageHandler 检查所提供的 ReleaseStrategy 是否实现了此接口,并提取 conditionSupplier 用于组条件评估逻辑。
使用 LockRegistry
从 6.5 版本开始,AbstractMessageGroupStore 抽象通过锁操作消息组的元数据。此锁获取 groupId 并由 LockRegistry 生成。其目的是确保消息和消息组操作的原子性。在多线程中,同时添加或删除消息或更新元数据时,如果缺少锁,某些实现可能会出现消息组错误。默认情况下使用 DefaultLockRegistry,任何 LockRegistry 都可以通过 AbstractMessageGroupStore.setLockRegistry() 注入,通常是针对同一持久存储的实现。有关更多信息,请参阅分布式锁。