事务

Spring Rabbit 框架支持在同步和异步用例中进行自动事务管理,并可以选择多种不同的语义进行声明式配置,这对于 Spring 事务的现有用户来说是熟悉的。这使得许多(如果不是大多数)常见的消息传递模式易于实现。

有两种方法可以向框架指示所需的事务语义。在 `RabbitTemplate` 和 `SimpleMessageListenerContainer` 中,都有一个标志 `channelTransacted`,如果为 `true`,则告诉框架使用事务性通道,并以提交或回滚(取决于结果)结束所有操作(发送或接收),其中异常表示回滚。另一个信号是使用 Spring 的 `PlatformTransactionManager` 实现之一提供外部事务作为正在进行的操作的上下文。如果在框架发送或接收消息时已经有事务正在进行,并且 `channelTransacted` 标志为 `true`,则消息事务的提交或回滚将推迟到当前事务结束时。如果 `channelTransacted` 标志为 `false`,则消息操作不适用任何事务语义(它会自动确认)。

`channelTransacted` 标志是配置时间设置。它在创建 AMQP 组件时(通常在应用程序启动时)被声明和处理一次。外部事务原则上更动态,因为系统在运行时响应当前线程状态。但是,实际上,当事务被声明性地分层到应用程序上时,它通常也是一个配置设置。

对于使用 `RabbitTemplate` 的同步用例,外部事务由调用者提供,根据需要可以声明式或命令式地提供(通常的 Spring 事务模型)。以下示例显示了一种声明式方法(通常更可取,因为它是非侵入式的),其中模板已配置为 `channelTransacted=true`

@Transactional
public void doSomething() {
    String incoming = rabbitTemplate.receiveAndConvert();
    // do some more database processing...
    String outgoing = processInDatabaseAndExtractReply(incoming);
    rabbitTemplate.convertAndSend(outgoing);
}

在前面的示例中,接收 `String` 类型的有效负载,进行转换,并在标记为 `@Transactional` 的方法内部作为消息体发送。如果数据库处理因异常而失败,则传入消息将返回到代理,并且不会发送传出消息。这适用于事务方法链中使用 `RabbitTemplate` 的任何操作(除非例如直接操作 `Channel` 以提前提交事务)。

对于使用 `SimpleMessageListenerContainer` 的异步用例,如果需要外部事务,则监听器设置时容器必须请求它。为了表明需要外部事务,用户在配置容器时向容器提供 `PlatformTransactionManager` 的实现。以下示例显示了如何操作

@Configuration
public class ExampleExternalTransactionAmqpConfiguration {

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory());
        container.setTransactionManager(transactionManager());
        container.setChannelTransacted(true);
        container.setQueueName("some.queue");
        container.setMessageListener(exampleListener());
        return container;
    }

}

在前面的示例中,事务管理器作为从另一个 bean 定义(未显示)注入的依赖项添加,并且 `channelTransacted` 标志也设置为 `true`。其效果是,如果监听器因异常而失败,则事务将回滚,并且消息也将返回到代理。重要的是,如果事务未能提交(例如,由于数据库约束错误或连接问题),AMQP 事务也将回滚,并且消息将返回到代理。这有时被称为“尽力而为的一阶段提交”,是一种非常强大的可靠消息传递模式。如果在前面的示例中将 `channelTransacted` 标志设置为 `false`(默认值),则仍然会为监听器提供外部事务,但是所有消息操作都将自动确认,因此即使业务操作回滚,消息操作也会提交。

条件回滚

在1.6.6版本之前,当使用外部事务管理器(例如JDBC)时,向容器的transactionAttribute添加回滚规则无效。异常始终回滚事务。

此外,当在容器的advice链中使用事务advice时,条件回滚并不十分有用,因为所有监听器异常都被包装在ListenerExecutionFailedException中。

第一个问题已得到纠正,规则现在已正确应用。此外,现在提供了ListenerFailedRuleBasedTransactionAttribute。它是RuleBasedTransactionAttribute的子类,唯一的区别在于它知道ListenerExecutionFailedException并使用此类异常的原因作为规则。此事务属性可以直接在容器中使用,也可以通过事务advice使用。

以下示例使用此规则

@Bean
public AbstractMessageListenerContainer container() {
    ...
    container.setTransactionManager(transactionManager);
    RuleBasedTransactionAttribute transactionAttribute =
        new ListenerFailedRuleBasedTransactionAttribute();
    transactionAttribute.setRollbackRules(Collections.singletonList(
        new NoRollbackRuleAttribute(DontRollBackException.class)));
    container.setTransactionAttribute(transactionAttribute);
    ...
}

关于接收消息回滚的说明

AMQP事务仅适用于发送到代理的消息和确认。因此,当Spring事务回滚且消息已接收时,Spring AMQP不仅必须回滚事务,还必须手动拒绝消息(类似于nack,但这并非规范中的称呼)。对消息拒绝采取的操作独立于事务,并取决于defaultRequeueRejected属性(默认值:true)。有关拒绝失败消息的更多信息,请参阅消息监听器和异步情况

有关RabbitMQ事务及其限制的更多信息,请参阅RabbitMQ代理语义

在RabbitMQ 2.7.0之前,此类消息(以及通道关闭或中止时未确认的任何消息)会在Rabbit代理的队列末尾。从2.7.0开始,拒绝的消息会进入队列的开头,类似于JMS回滚的消息。
以前,在事务回滚时消息重新入队在本地事务和提供TransactionManager的情况下是不一致的。在前一种情况下,应用正常的重新入队逻辑(AmqpRejectAndDontRequeueExceptiondefaultRequeueRejected=false)(参见消息监听器和异步情况)。使用事务管理器时,消息会在回滚时无条件地重新入队。从2.0版本开始,行为一致,两种情况下都应用正常的重新入队逻辑。要恢复到之前的行为,可以将容器的alwaysRequeueWithTxManagerRollback属性设置为true。参见消息监听器容器配置

使用RabbitTransactionManager

RabbitTransactionManager是执行Rabbit操作的替代方案,它在外部事务内执行并与之同步。此事务管理器是PlatformTransactionManager接口的实现,应与单个Rabbit ConnectionFactory一起使用。

此策略无法提供XA事务——例如,为了在消息传递和数据库访问之间共享事务。

应用程序代码需要通过ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactory, boolean)检索事务性Rabbit资源,而不是使用标准的Connection.createChannel()调用以及随后的通道创建。当使用Spring AMQP的RabbitTemplate时,它将自动检测线程绑定的通道并自动参与其事务。

使用Java配置,您可以使用以下bean设置新的RabbitTransactionManager

@Bean
public RabbitTransactionManager rabbitTransactionManager() {
    return new RabbitTransactionManager(connectionFactory);
}

如果您更喜欢XML配置,可以在XML应用程序上下文文件中声明以下bean

<bean id="rabbitTxManager"
      class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
    <property name="connectionFactory" ref="connectionFactory"/>
</bean>

事务同步

将RabbitMQ事务与其他(例如DBMS)事务同步提供“尽力而为的一阶段提交”语义。在事务同步的完成后期,RabbitMQ事务可能无法提交。spring-tx基础结构会将其记录为错误,但不会向调用代码抛出异常。从2.3.10版本开始,您可以在同一线程上事务提交后调用ConnectionUtils.checkAfterCompletion()。如果没有发生异常,它将简单地返回;否则,它将抛出AfterCompletionFailedException,该异常将具有表示完成的同步状态的属性。

通过调用ConnectionFactoryUtils.enableAfterCompletionFailureCapture(true)启用此功能;这是一个全局标志,适用于所有线程。