事务支持
理解消息流中的事务
Spring Integration 提供了几个挂钩来解决消息流的事务需求。为了更好地理解这些挂钩以及如何从中获益,我们必须首先回顾一下您可以用来启动消息流的六种机制,并了解如何在每种机制中解决这些流的事务需求。
以下六种机制启动消息流(每种机制的详细信息在整本手册中提供)
-
网关代理:一个基本的报文网关。
-
消息通道:直接与
MessageChannel
方法交互(例如,channel.send(message)
)。 -
消息发布者:作为 Spring Bean 上方法调用的副产品启动消息流的方式。
-
入站通道适配器和网关:基于将第三方系统连接到 Spring Integration 消息系统来启动消息流的方式(例如,
[JmsMessage] → Jms 入站适配器[SI Message] → SI 通道
)。 -
调度程序:基于预配置的调度程序分发的调度事件启动消息流的方式。
-
轮询器:与调度程序类似,这是基于预配置的轮询器分发的调度或基于间隔的事件启动消息流的方式。
我们可以将这六种机制分成两大类
-
由用户进程启动的消息流:此类别中的示例场景包括调用网关方法或显式地将
Message
发送到MessageChannel
。换句话说,这些消息流依赖于第三方进程(例如您编写的某些代码)来启动。 -
由守护进程启动的消息流:此类别中的示例场景包括轮询器轮询消息队列以使用轮询到的消息启动新的消息流,或者调度程序通过创建新消息并在预定义的时间启动消息流来调度进程。
显然,网关代理、MessageChannel.send(…)
和MessagePublisher
都属于第一类,而入站适配器和网关、调度程序和轮询器属于第二类。
那么,您如何在每个类别中的各种场景中解决事务需求,并且 Spring Integration 是否需要针对特定场景提供与事务相关的显式内容?或者您可以使用 Spring 的事务支持吗?
Spring 本身提供了对事务管理的一流支持。因此,我们的目标不是提供新的东西,而是使用 Spring 从其现有的事务支持中获益。换句话说,作为一个框架,我们必须将挂钩公开到 Spring 的事务管理功能。但是,由于 Spring Integration 配置基于 Spring 配置,因此我们并不总是需要公开这些挂钩,因为 Spring 已经公开了它们。毕竟,每个 Spring Integration 组件都是一个 Spring Bean。
牢记这一目标,我们可以再次考虑这两种场景:由用户进程启动的消息流和由守护进程启动的消息流。
由用户进程启动并在 Spring 应用程序上下文中配置的消息流受此类进程的常规事务配置的约束。因此,Spring Integration 不需要显式地配置它们来支持事务。可以通过 Spring 的标准事务支持启动事务,并且应该这样做。Spring Integration 消息流自然地遵循组件的事务语义,因为它本身是由 Spring 配置的。例如,网关或服务激活器方法可以使用@Transactional
进行注释,或者可以在 XML 配置中定义一个TransactionInterceptor
,并使用指向应为事务性特定方法的切入点表达式。底线是您可以在这些场景中完全控制事务配置和边界。
但是,当涉及到由守护进程启动的消息流时,情况就有点不同了。尽管由开发人员配置,但这些流不会直接涉及人类或其他需要启动的进程。这些是基于触发器的流,由触发器进程(守护进程)根据进程的配置启动。例如,我们可以让调度程序在每周五晚上启动消息流。我们还可以配置一个触发器,每秒启动一个消息流,等等。因此,我们需要一种方法让这些基于触发器的进程知道我们希望使生成的邮件流成为事务性的,以便在启动新的邮件流时可以创建事务上下文。换句话说,我们需要公开一些事务配置,但只需要足够委托给 Spring 已经提供的事务支持(就像我们在其他场景中所做的那样)。
轮询器事务支持
Spring Integration 为轮询器提供了事务支持。轮询器是一种特殊的组件类型,因为在轮询器任务中,我们可以对本身就是事务性的资源调用receive()
,从而将receive()
调用包含在事务的边界内,这使得它可以在任务失败时回滚。如果我们要对通道添加相同的支持,则添加的事务将影响从send()
调用开始的所有下游组件。这为事务分界提供了一个相当广泛的范围,没有任何充分的理由,尤其是在 Spring 已经提供了多种方法来解决任何下游组件的事务需求的情况下。但是,receive()
方法包含在事务边界内是轮询器的“充分理由”。
任何时候您配置轮询器,都可以使用transactional
子元素及其属性提供事务配置,如下例所示
<int:poller max-messages-per-poll="1" fixed-rate="1000">
<transactional transaction-manager="txManager"
isolation="DEFAULT"
propagation="REQUIRED"
read-only="true"
timeout="1000"/>
</poller>
前面的配置看起来类似于本机 Spring 事务配置。您仍然必须提供对事务管理器的引用,并指定事务属性或依赖于默认值(例如,如果未指定'transaction-manager'属性,则默认为名为'transactionManager'的 bean)。在内部,该过程包装在 Spring 的本机事务中,其中TransactionInterceptor
负责处理事务。有关如何配置事务管理器、事务管理器类型(如 JTA、Datasource 等)以及与事务配置相关的其他详细信息,请参阅Spring 框架参考指南。
使用前面的配置,由该轮询器启动的所有消息流都是事务性的。有关轮询器事务配置的更多信息和详细信息,请参阅轮询和事务。
除了事务之外,在运行轮询器时,您可能还需要解决更多跨领域问题。为了帮助解决这个问题,轮询器元素接受一个<advice-chain>
子元素,它允许您定义要应用于轮询器的自定义建议实例链。(有关更多详细信息,请参阅可轮询消息源)。在 Spring Integration 2.0 中,轮询器经历了重构工作,现在使用代理机制来解决事务问题以及其他跨领域问题。由此产生的一个重大变化是我们使<transactional>
和<advice-chain>
元素相互排斥。其背后的原理是,如果您需要多个建议并且其中一个建议是事务建议,则可以将其包含在<advice-chain>
中,与以前一样方便,但具有更好的控制力,因为您现在可以选择按所需顺序放置建议。以下示例显示了如何执行此操作
<int:poller max-messages-per-poll="1" fixed-rate="10000">
<advice-chain>
<ref bean="txAdvice"/>
<ref bean="someOtherAdviceBean" />
<beans:bean class="foo.bar.SampleAdvice"/>
</advice-chain>
</poller>
<tx:advice id="txAdvice" transaction-manager="txManager">
<tx:attributes>
<tx:method name="get*" read-only="true"/>
<tx:method name="*"/>
</tx:attributes>
</tx:advice>
前面的示例显示了 Spring 事务建议(txAdvice
)的基本基于 XML 的配置,并将其包含在轮询器定义的<advice-chain>
中。如果您只需要解决轮询器的事务问题,则仍可以使用<transactional>
元素作为一种方便的方法。
事务边界
另一个重要因素是消息流中事务的边界。当事务启动时,事务上下文将绑定到当前线程。因此,无论您的消息流中有多少个端点和通道,只要您确保流在同一线程上继续,您的事务上下文就会被保留。一旦您通过引入可轮询通道或执行器通道或在某个服务中手动启动新线程来中断它,事务边界也将被中断。从本质上讲,事务将在此处结束,如果线程之间发生了成功的传递,则该流将被视为成功,并且将发送COMMIT信号,即使流将继续并且可能仍然会导致下游某个地方出现异常。如果这样的流是同步的,则该异常可以抛回到消息流的发起者,该发起者也是事务上下文的发起者,并且事务将导致回滚。中间地带是在任何线程边界被中断的地方使用事务性通道。例如,您可以使用将委托给事务性MessageStore策略的基于队列的通道,或者可以使用基于JMS的通道。
事务同步
在某些环境中,将操作与包含整个流的事务同步会有所帮助。例如,考虑在流的开头使用一个<file:inbound-channel-adapter/>
,该流执行许多数据库更新。如果事务提交,我们可能希望将文件移动到success
目录,而如果事务回滚,我们可能希望将其移动到failure
目录。
Spring Integration 2.2 引入了将这些操作与事务同步的功能。此外,如果您没有“真实”事务但仍希望在成功或失败时执行不同的操作,则可以配置PseudoTransactionManager
。有关更多信息,请参阅伪事务。
以下列表显示了此功能的关键策略接口
public interface TransactionSynchronizationFactory {
TransactionSynchronization create(Object key);
}
public interface TransactionSynchronizationProcessor {
void processBeforeCommit(IntegrationResourceHolder holder);
void processAfterCommit(IntegrationResourceHolder holder);
void processAfterRollback(IntegrationResourceHolder holder);
}
工厂负责创建一个TransactionSynchronization
对象。您可以实现自己的对象或使用框架提供的对象:DefaultTransactionSynchronizationFactory
。此实现返回一个TransactionSynchronization
,它委托给TransactionSynchronizationProcessor
的默认实现:ExpressionEvaluatingTransactionSynchronizationProcessor
。此处理器支持三个 SpEL 表达式:beforeCommitExpression
、afterCommitExpression
和afterRollbackExpression
。
对于熟悉事务的人来说,这些操作应该是不言自明的。在每种情况下,#root
变量都是原始的 Message
。在某些情况下,根据轮询器轮询的 MessageSource
,还会提供其他 SpEL 变量。例如,MongoDbMessageSource
提供了 #mongoTemplate
变量,该变量引用消息源的 MongoTemplate
。类似地,RedisStoreMessageSource
提供了 #store
变量,该变量引用轮询创建的 RedisStore
。
要为特定轮询器启用此功能,您可以使用 synchronization-factory
属性在轮询器的 <transactional/>
元素上提供对 TransactionSynchronizationFactory
的引用。
从 5.0 版本开始,Spring Integration 提供了 PassThroughTransactionSynchronizationFactory
,当没有配置 TransactionSynchronizationFactory
但建议链中存在类型为 TransactionInterceptor
的建议时,默认将其应用于轮询端点。当使用任何现成的 TransactionSynchronizationFactory
实现时,轮询端点会将轮询的消息绑定到当前事务上下文中,并在事务建议后抛出异常时将其作为 MessagingException
中的 failedMessage
提供。当使用未实现 TransactionInterceptor
的自定义事务建议时,您可以显式配置 PassThroughTransactionSynchronizationFactory
来实现此行为。在这两种情况下,MessagingException
都会成为发送到 errorChannel
的 ErrorMessage
的有效负载,并且其原因是建议抛出的原始异常。以前,ErrorMessage
的有效负载是建议抛出的原始异常,并且没有提供对 failedMessage
信息的引用,这使得难以确定事务提交问题的原因。
为了简化这些组件的配置,Spring Integration 为默认工厂提供了命名空间支持。以下示例展示了如何使用命名空间配置文件入站通道适配器
<int-file:inbound-channel-adapter id="inputDirPoller"
channel="someChannel"
directory="/foo/bar"
filter="filter"
comparator="testComparator">
<int:poller fixed-rate="5000">
<int:transactional transaction-manager="transactionManager" synchronization-factory="syncFactory" />
</int:poller>
</int-file:inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="payload.renameTo(new java.io.File('/success/' + payload.name))"
channel="committedChannel" />
<int:after-rollback expression="payload.renameTo(new java.io.File('/failed/' + payload.name))"
channel="rolledBackChannel" />
</int:transaction-synchronization-factory>
SpEL 表达式的结果作为有效负载发送到 committedChannel
或 rolledBackChannel
(在本例中,这将是 Boolean.TRUE
或 Boolean.FALSE
——java.io.File.renameTo()
方法调用的结果)。
如果希望发送整个有效负载以进行进一步的 Spring Integration 处理,请使用“payload”表达式。
务必了解,这会将操作与事务同步。它不会使本身不是事务性的资源实际上成为事务性的。相反,事务(无论是 JDBC 还是其他事务)在轮询之前启动,并在流程完成后提交或回滚,然后执行同步操作。 如果提供自定义的 |
除了 after-commit
和 after-rollback
表达式之外,还支持 before-commit
。在这种情况下,如果评估(或下游处理)抛出异常,则事务将回滚而不是提交。
伪事务
在阅读了事务同步部分后,您可能会认为在流程完成后采取这些“成功”或“失败”操作会很有用,即使轮询器下游没有“真实”的事务资源(例如 JDBC)。例如,考虑一个“<file:inbound-channel-adapter/>”后面跟着一个“<ftp:outbout-channel-adapter/>”。这两个组件都不是事务性的,但我们可能希望根据 FTP 传输的成功或失败将输入文件移动到不同的目录。
为了提供此功能,框架提供了一个 PseudoTransactionManager
,即使没有涉及真实的事务资源,也可以启用上述配置。如果流程正常完成,则会调用 beforeCommit
和 afterCommit
同步。如果发生故障,则会调用 afterRollback
同步。因为它不是真实的事务,所以不会发生实际的提交或回滚。伪事务是用于启用同步功能的一种手段。
要使用 PseudoTransactionManager
,您可以将其定义为 <bean/>,就像配置真实的事务管理器一样。以下示例展示了如何执行此操作
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager" />