事务
本节介绍 Spring for Apache Kafka 如何支持事务。
概述
0.11.0.0 客户端库增加了对事务的支持。Spring for Apache Kafka 通过以下方式添加支持:
-
KafkaTransactionManager
:与正常的 Spring 事务支持一起使用(@Transactional
、TransactionTemplate
等) -
事务性
KafkaMessageListenerContainer
-
使用
KafkaTemplate
的本地事务 -
与其他事务管理器的交易同步
通过为 DefaultKafkaProducerFactory
提供 transactionIdPrefix
来启用事务。在这种情况下,工厂维护的是事务性生产者的缓存,而不是管理单个共享的 Producer
。当用户调用生产者的 close()
方法时,它将返回到缓存以供重用,而不是真正关闭。每个生产者的 transactional.id
属性为 transactionIdPrefix
+ n
,其中 n
从 0
开始,并为每个新生产者递增。在早期版本的 Spring for Apache Kafka 中,对于由具有基于记录的监听器的监听器容器启动的事务,transactional.id
的生成方式有所不同,以支持围栏僵尸进程,而从 3.0 开始,EOSMode.V2
是唯一的选择,因此不再需要此功能。对于运行多个实例的应用程序,每个实例的 transactionIdPrefix
必须唯一。
另见 精确一次语义。
使用 Spring Boot,只需设置 spring.kafka.producer.transaction-id-prefix
属性即可——Spring Boot 将自动配置 KafkaTransactionManager
bean 并将其连接到监听器容器。
从 2.5.8 版本开始,您现在可以在生产者工厂上配置 maxAge 属性。当使用可能空闲一段时间(直到代理的 transactional.id.expiration.ms 到期)的事务性生产者时,这非常有用。使用当前的 kafka-clients ,这可能会导致 ProducerFencedException 且无需重新平衡。通过将 maxAge 设置为小于 transactional.id.expiration.ms ,如果生产者超过其最大年龄,工厂将刷新生产者。 |
使用 KafkaTransactionManager
KafkaTransactionManager
是 Spring Framework 的 PlatformTransactionManager
的一个实现。它在其构造函数中提供对生产者工厂的引用。如果提供自定义生产者工厂,则它必须支持事务。请参阅 ProducerFactory.transactionCapable()
。
您可以将 KafkaTransactionManager
与正常的 Spring 事务支持一起使用(@Transactional
、TransactionTemplate
等)。如果事务处于活动状态,则在事务范围内执行的任何 KafkaTemplate
操作都使用事务的 Producer
。管理器根据成功或失败提交或回滚事务。必须将 KafkaTemplate
配置为使用与事务管理器相同的 ProducerFactory
。
事务同步
本节指的是仅生产者事务(不是由监听器容器启动的事务);有关在容器启动事务时链接事务的信息,请参阅 使用消费者启动的事务。
如果您想将记录发送到 Kafka 并执行一些数据库更新,您可以使用普通的 Spring 事务管理,例如使用 DataSourceTransactionManager
。
@Transactional
public void process(List<Thing> things) {
things.forEach(thing -> this.kafkaTemplate.send("topic", thing));
updateDb(things);
}
针对@Transactional
注解的拦截器会启动事务,KafkaTemplate
会与该事务管理器同步事务;每个发送操作都将参与该事务。方法退出时,数据库事务将先提交,然后是Kafka事务。如果您希望以相反的顺序提交事务(先Kafka后数据库),请使用嵌套的@Transactional
方法,外部方法配置为使用DataSourceTransactionManager
,内部方法配置为使用KafkaTransactionManager
。
有关同步JDBC和Kafka事务(Kafka优先或数据库优先配置)的应用程序示例,请参阅Kafka事务与其他事务管理器的示例。
从2.5.17、2.6.12、2.7.9和2.8.0版本开始,如果同步事务在(主事务提交后)提交失败,则会将异常抛给调用方。以前,这会被默默忽略(在调试级别记录)。如有必要,应用程序应采取补救措施来补偿已提交的主事务。 |
使用消费者启动的事务
ChainedKafkaTransactionManager
自2.7版本起已弃用;有关更多信息,请参阅其超类ChainedTransactionManager
的JavaDocs。请改用容器中的KafkaTransactionManager
启动Kafka事务,并使用@Transactional
注解监听器方法以启动其他事务。
有关链接JDBC和Kafka事务的示例应用程序,请参阅Kafka事务与其他事务管理器的示例。
KafkaTemplate
本地事务
您可以使用KafkaTemplate
在一个本地事务中执行一系列操作。以下示例演示了如何执行此操作
boolean result = template.executeInTransaction(t -> {
t.sendDefault("thing1", "thing2");
t.sendDefault("cat", "hat");
return true;
});
回调中的参数是模板本身(this
)。如果回调正常退出,则提交事务。如果抛出异常,则回滚事务。
如果正在处理KafkaTransactionManager (或同步)事务,则不会使用它。相反,将使用新的“嵌套”事务。 |
TransactionIdPrefix
使用EOSMode.V2
(又名BETA
),这是唯一受支持的模式,即使对于消费者启动的事务,也不再需要使用相同的transactional.id
;事实上,它必须与生产者启动的事务一样,在每个实例上都是唯一的。此属性在每个应用程序实例上必须具有不同的值。
TransactionIdSuffix Fixed
从3.2版本开始,引入了一个新的TransactionIdSuffixStrategy
接口来管理transactional.id
后缀。当设置maxCache
大于零时,默认实现是DefaultTransactionIdSuffixStrategy
,可以在特定范围内重用transactional.id
,否则将通过递增计数器动态生成后缀。当请求事务生产者且所有transactional.id
都在使用时,将抛出NoProducerAvailableException
。然后,用户可以使用配置为重试该异常的RetryTemplate
,并配置合适的回退。
public static class Config {
@Bean
public ProducerFactory<String, String> myProducerFactory() {
Map<String, Object> configs = producerConfigs();
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
...
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
...
TransactionIdSuffixStrategy ss = new DefaultTransactionIdSuffixStrategy(5);
pf.setTransactionIdSuffixStrategy(ss);
return pf;
}
}
当将maxCache
设置为5时,transactional.id
为my.txid.
+`{0-4}`。
当将KafkaTransactionManager 与ConcurrentMessageListenerContainer 一起使用并启用maxCache 时,必须将maxCache 设置为大于或等于concurrency 的值。如果MessageListenerContainer 无法获取transactional.id 后缀,它将抛出NoProducerAvailableException 。当在ConcurrentMessageListenerContainer 中使用嵌套事务时,需要调整maxCache设置以处理增加的嵌套事务数量。 |
KafkaTemplate
事务性和非事务性发布
通常,当KafkaTemplate
是事务性的(使用具有事务功能的生产者工厂配置)时,需要事务。事务可以通过TransactionTemplate
、@Transactional
方法、调用executeInTransaction
或通过使用KafkaTransactionManager
配置的监听器容器来启动。任何在事务范围之外使用模板的尝试都会导致模板抛出IllegalStateException
。从2.4.3版本开始,您可以将模板的allowNonTransactional
属性设置为true
。在这种情况下,模板将允许操作在没有事务的情况下运行,方法是调用ProducerFactory
的createNonTransactionalProducer()
方法;生产者将像往常一样被缓存或线程绑定以供重用。请参阅使用DefaultKafkaProducerFactory
。
带有批处理监听器的交易
当使用事务时监听器失败时,将调用AfterRollbackProcessor
在回滚发生后执行某些操作。当使用带有记录监听器的默认AfterRollbackProcessor
时,将执行查找操作,以便重新传递失败的记录。但是,使用批处理监听器时,将重新传递整个批处理,因为框架不知道批处理中的哪个记录失败了。有关更多信息,请参阅回滚后处理器。
在使用批处理监听器时,2.4.2 版本引入了另一种机制来处理处理批处理时的失败:BatchToRecordAdapter
。当将batchListener
设置为 true 的容器工厂配置为BatchToRecordAdapter
时,将使用一个记录一次调用监听器。这使得能够在批处理中进行错误处理,同时仍然可以根据异常类型停止处理整个批处理。提供了一个默认的BatchToRecordAdapter
,可以使用标准的ConsumerRecordRecoverer
(例如DeadLetterPublishingRecoverer
)进行配置。以下测试用例配置片段说明了如何使用此功能
public static class TestListener {
final List<String> values = new ArrayList<>();
@KafkaListener(id = "batchRecordAdapter", topics = "test")
public void listen(String data) {
values.add(data);
if ("bar".equals(data)) {
throw new RuntimeException("reject partial");
}
}
}
@Configuration
@EnableKafka
public static class Config {
ConsumerRecord<?, ?> failed;
@Bean
public TestListener test() {
return new TestListener();
}
@Bean
public ConsumerFactory<?, ?> consumerFactory() {
return mock(ConsumerFactory.class);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setBatchToRecordAdapter(new DefaultBatchToRecordAdapter<>((record, ex) -> {
this.failed = record;
}));
return factory;
}
}