事务
Spring Rabbit 框架支持在同步和异步用例中进行自动事务管理,并提供多种声明式可选择的语义,这与 Spring 事务的现有用户所熟悉的模式一致。这使得许多(如果不是大多数)常见的消息传递模式易于实现。
有两种方法可以向框架发出所需的交易语义信号。在 RabbitTemplate
和 SimpleMessageListenerContainer
中,都存在一个标志 channelTransacted
,如果该标志为 true
,则表示框架将使用事务通道,并以提交或回滚(取决于结果)结束所有操作(发送或接收),其中异常表示回滚。另一个信号是使用 Spring 的 PlatformTransactionManager
实现之一提供外部事务,作为正在进行操作的上下文。如果在框架发送或接收消息时,当前正在进行事务,并且 channelTransacted
标志为 true
,则消息传递事务的提交或回滚将推迟到当前事务结束时。如果 channelTransacted
标志为 false
,则消息传递操作不适用任何事务语义(自动确认)。
channelTransacted
标志是配置时设置。它在创建 AMQP 组件时声明并处理一次,通常是在应用程序启动时。外部事务在原则上更加动态,因为系统在运行时响应当前线程状态。但是,在实践中,它通常也是一个配置设置,当事务以声明方式分层到应用程序中时。
对于使用 RabbitTemplate
的同步用例,外部事务由调用者提供,可以根据喜好以声明方式或命令方式提供(通常的 Spring 事务模型)。以下示例展示了声明式方法(通常更受欢迎,因为它是非侵入式的),其中模板已配置为 channelTransacted=true
@Transactional
public void doSomething() {
String incoming = rabbitTemplate.receiveAndConvert();
// do some more database processing...
String outgoing = processInDatabaseAndExtractReply(incoming);
rabbitTemplate.convertAndSend(outgoing);
}
在前面的示例中,一个 String
负载被接收、转换并作为消息体发送,该消息体位于标记为 @Transactional
的方法内部。如果数据库处理因异常而失败,则传入消息将返回到代理,而传出消息将不会发送。这适用于 RabbitTemplate
内的任何操作,这些操作位于事务方法链中(除非例如直接操作 Channel
以提前提交事务)。
对于使用 SimpleMessageListenerContainer
的异步用例,如果需要外部事务,则容器在设置监听器时必须请求该事务。为了发出需要外部事务的信号,用户在配置容器时向其提供 PlatformTransactionManager
的实现。以下示例展示了如何做到这一点
@Configuration
public class ExampleExternalTransactionAmqpConfiguration {
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setTransactionManager(transactionManager());
container.setChannelTransacted(true);
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
return container;
}
}
在前面的示例中,事务管理器作为依赖项从另一个 bean 定义(未显示)中注入,并且 channelTransacted
标志也设置为 true
。其效果是,如果侦听器因异常而失败,则事务将回滚,并且消息也将返回到代理。重要的是,如果事务无法提交(例如,由于数据库约束错误或连接问题),则 AMQP 事务也将回滚,并且消息将返回到代理。这有时被称为“尽力而为的一阶段提交”,是一种用于可靠消息传递的非常强大的模式。如果在前面的示例中将 channelTransacted
标志设置为 false
(默认值),则外部事务仍将为侦听器提供,但所有消息传递操作将自动确认,因此其效果是即使在业务操作回滚时也提交消息传递操作。
条件回滚
在 1.6.6 版本之前,在使用外部事务管理器(如 JDBC)时,向容器的 transactionAttribute
添加回滚规则无效。异常始终回滚事务。
此外,在容器的建议链中使用 事务建议 时,条件回滚不太有用,因为所有侦听器异常都包装在 ListenerExecutionFailedException
中。
第一个问题已得到纠正,现在规则已正确应用。此外,现在提供了 ListenerFailedRuleBasedTransactionAttribute
。它是 RuleBasedTransactionAttribute
的子类,唯一的区别是它知道 ListenerExecutionFailedException
并使用此类异常的原因作为规则。此事务属性可以直接在容器中使用,也可以通过事务建议使用。
以下示例使用此规则
@Bean
public AbstractMessageListenerContainer container() {
...
container.setTransactionManager(transactionManager);
RuleBasedTransactionAttribute transactionAttribute =
new ListenerFailedRuleBasedTransactionAttribute();
transactionAttribute.setRollbackRules(Collections.singletonList(
new NoRollbackRuleAttribute(DontRollBackException.class)));
container.setTransactionAttribute(transactionAttribute);
...
}
关于接收消息回滚的说明
AMQP 事务仅适用于发送到代理的消息和确认。因此,当 Spring 事务回滚并且消息已接收时,Spring AMQP 不仅需要回滚事务,还需要手动拒绝消息(类似于 nack,但规范中没有这样称呼)。对消息拒绝采取的操作独立于事务,并取决于 defaultRequeueRejected
属性(默认值:true
)。有关拒绝失败消息的更多信息,请参阅 消息侦听器和异步情况。
有关 RabbitMQ 事务及其限制的更多信息,请参阅 RabbitMQ 代理语义。
在 RabbitMQ 2.7.0 之前,此类消息(以及在通道关闭或中止时未确认的消息)会回到 Rabbit 代理上的队列末尾。从 2.7.0 版本开始,拒绝的消息会进入队列的开头,类似于 JMS 回滚消息。 |
以前,消息在事务回滚时的重新入队在本地事务和提供 TransactionManager 时是不一致的。在前一种情况下,会应用正常的重新入队逻辑(AmqpRejectAndDontRequeueException 或 defaultRequeueRejected=false )(参见 消息监听器和异步情况)。使用事务管理器时,消息会在回滚时无条件地重新入队。从 2.0 版本开始,行为一致,两种情况下都会应用正常的重新入队逻辑。要恢复到之前的行为,可以将容器的 alwaysRequeueWithTxManagerRollback 属性设置为 true 。参见 消息监听器容器配置。
|
使用 RabbitTransactionManager
RabbitTransactionManager
是在外部事务中执行 Rabbit 操作的替代方案,并与外部事务同步。此事务管理器是 PlatformTransactionManager
接口的实现,应与单个 Rabbit ConnectionFactory
一起使用。
此策略无法提供 XA 事务,例如,无法在消息传递和数据库访问之间共享事务。 |
应用程序代码需要通过 ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactory, boolean)
检索事务性 Rabbit 资源,而不是使用标准的 Connection.createChannel()
调用以及随后的通道创建。使用 Spring AMQP 的 RabbitTemplate
时,它会自动检测线程绑定的通道并自动参与其事务。
使用 Java 配置,可以使用以下 bean 设置新的 RabbitTransactionManager
@Bean
public RabbitTransactionManager rabbitTransactionManager() {
return new RabbitTransactionManager(connectionFactory);
}
如果您更喜欢 XML 配置,可以在 XML 应用程序上下文文件中声明以下 bean
<bean id="rabbitTxManager"
class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
事务同步
将 RabbitMQ 事务与其他(例如 DBMS)事务同步提供“尽力而为的一阶段提交”语义。RabbitMQ 事务可能在事务同步的完成后的阶段无法提交。spring-tx
基础设施会将其记录为错误,但不会向调用代码抛出异常。从 2.3.10 版本开始,可以在处理事务的同一线程上事务提交后调用 ConnectionUtils.checkAfterCompletion()
。如果未发生异常,它将简单地返回;否则,它将抛出 AfterCompletionFailedException
,该异常将具有一个表示完成同步状态的属性。
通过调用 ConnectionFactoryUtils.enableAfterCompletionFailureCapture(true)
启用此功能;这是一个全局标志,适用于所有线程。