事务支持

从5.0版本开始,引入了一个新的TransactionHandleMessageAdvice,借助HandleMessageAdvice实现,使整个下游流程具有事务性。当在<request-handler-advice-chain>元素中使用常规的TransactionInterceptor(例如,通过配置<tx:advice>)时,启动的事务仅应用于内部的AbstractReplyProducingMessageHandler.handleRequestMessage(),不会传播到下游流程。

为了简化XML配置,除了<request-handler-advice-chain>之外,还在所有<outbound-gateway><service-activator>以及相关组件中添加了<transactional>元素。以下示例展示了<transactional>的用法。

<int-jdbc:outbound-gateway query="select * from things where id=:headers[id]">
        <int-jdbc:transactional/>
</int-jdbc:outbound-gateway>

<bean id="transactionManager" class="org.mockito.Mockito" factory-method="mock">
    <constructor-arg value="org.springframework.transaction.TransactionManager"/>
</bean>

如果您熟悉JPA集成组件,这种配置并不陌生,但现在我们可以从流程中的任何点启动事务——而不仅仅是从<poller>或消息驱动的通道适配器(例如JMS)启动。

可以使用TransactionInterceptorBuilder简化Java配置,结果bean名称可以在消息注解adviceChain属性中使用,如下例所示。

@Bean
public ConcurrentMetadataStore store() {
    return new SimpleMetadataStore(hazelcastInstance()
                       .getMap("idempotentReceiverMetadataStore"));
}

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
    return new IdempotentReceiverInterceptor(
            new MetadataStoreSelector(
                    message -> message.getPayload().toString(),
                    message -> message.getPayload().toString().toUpperCase(), store()));
}

@Bean
public TransactionInterceptor transactionInterceptor() {
    return new TransactionInterceptorBuilder(true)
                .transactionManager(this.transactionManager)
                .isolation(Isolation.READ_COMMITTED)
                .propagation(Propagation.REQUIRES_NEW)
                .build();
}

@Bean
@org.springframework.integration.annotation.Transformer(inputChannel = "input",
         outputChannel = "output",
         adviceChain = { "idempotentReceiverInterceptor",
                 "transactionInterceptor" })
public Transformer transformer() {
    return message -> message;
}

请注意TransactionInterceptorBuilder构造函数上的true参数。它会导致创建TransactionHandleMessageAdvice,而不是常规的TransactionInterceptor

Java DSL通过端点配置上的.transactional()选项支持Advice,如下例所示。

@Bean
public IntegrationFlow updatingGatewayFlow() {
    return f -> f
        .handle(Jpa.updatingGateway(this.entityManagerFactory),
                e -> e.transactional(true))
        .channel(c -> c.queue("persistResults"));
}