事务支持
从5.0版本开始,引入了一个新的TransactionHandleMessageAdvice
,借助HandleMessageAdvice
实现,使整个下游流程具有事务性。当在<request-handler-advice-chain>
元素中使用常规的TransactionInterceptor
(例如,通过配置<tx:advice>
)时,启动的事务仅应用于内部的AbstractReplyProducingMessageHandler.handleRequestMessage()
,不会传播到下游流程。
为了简化XML配置,除了<request-handler-advice-chain>
之外,还在所有<outbound-gateway>
和<service-activator>
以及相关组件中添加了<transactional>
元素。以下示例展示了<transactional>
的用法。
<int-jdbc:outbound-gateway query="select * from things where id=:headers[id]">
<int-jdbc:transactional/>
</int-jdbc:outbound-gateway>
<bean id="transactionManager" class="org.mockito.Mockito" factory-method="mock">
<constructor-arg value="org.springframework.transaction.TransactionManager"/>
</bean>
可以使用TransactionInterceptorBuilder
简化Java配置,结果bean名称可以在消息注解的adviceChain
属性中使用,如下例所示。
@Bean
public ConcurrentMetadataStore store() {
return new SimpleMetadataStore(hazelcastInstance()
.getMap("idempotentReceiverMetadataStore"));
}
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(
new MetadataStoreSelector(
message -> message.getPayload().toString(),
message -> message.getPayload().toString().toUpperCase(), store()));
}
@Bean
public TransactionInterceptor transactionInterceptor() {
return new TransactionInterceptorBuilder(true)
.transactionManager(this.transactionManager)
.isolation(Isolation.READ_COMMITTED)
.propagation(Propagation.REQUIRES_NEW)
.build();
}
@Bean
@org.springframework.integration.annotation.Transformer(inputChannel = "input",
outputChannel = "output",
adviceChain = { "idempotentReceiverInterceptor",
"transactionInterceptor" })
public Transformer transformer() {
return message -> message;
}
请注意TransactionInterceptorBuilder
构造函数上的true
参数。它会导致创建TransactionHandleMessageAdvice
,而不是常规的TransactionInterceptor
。
Java DSL通过端点配置上的.transactional()
选项支持Advice
,如下例所示。
@Bean
public IntegrationFlow updatingGatewayFlow() {
return f -> f
.handle(Jpa.updatingGateway(this.entityManagerFactory),
e -> e.transactional(true))
.channel(c -> c.queue("persistResults"));
}