事务

本节介绍 Spring for Apache Kafka 如何支持事务。

概述

0.11.0.0 客户端库添加了对事务的支持。Spring for Apache Kafka 通过以下方式添加支持

  • KafkaTransactionManager:与正常的 Spring 事务支持一起使用(@TransactionalTransactionTemplate 等)

  • 事务性 KafkaMessageListenerContainer

  • 使用 KafkaTemplate 进行本地事务

  • 与其他事务管理器进行事务同步

通过为DefaultKafkaProducerFactory提供transactionIdPrefix来启用事务。在这种情况下,工厂会维护一个事务生产者的缓存,而不是管理单个共享的Producer。当用户在生产者上调用close()时,它会被返回到缓存以供重用,而不是真正关闭。每个生产者的transactional.id属性为transactionIdPrefix + n,其中n0开始,并为每个新生产者递增。在以前版本的 Spring for Apache Kafka 中,transactional.id 的生成方式不同,用于由带有基于记录的监听器的监听器容器启动的事务,以支持围栏僵尸进程,这在 3.0 开始后不再需要,因为 EOSMode.V2 是唯一的选择。对于运行多个实例的应用程序,transactionIdPrefix 必须在每个实例中唯一。

另请参阅 Exactly Once 语义

另请参阅 transactionIdPrefix

使用 Spring Boot,只需设置spring.kafka.producer.transaction-id-prefix属性 - Spring Boot 会自动配置一个KafkaTransactionManager bean 并将其连接到监听器容器。

从 2.5.8 版本开始,您现在可以在生产者工厂上配置maxAge属性。当使用可能处于空闲状态的交易生产者时,这很有用,因为代理的transactional.id.expiration.ms。使用当前的kafka-clients,这会导致ProducerFencedException,而不会重新平衡。通过将maxAge设置为小于transactional.id.expiration.ms,工厂将在生产者超过其最大年龄时刷新生产者。

使用KafkaTransactionManager

KafkaTransactionManager 是 Spring 框架的PlatformTransactionManager的实现。它在构造函数中提供对生产者工厂的引用。如果您提供自定义生产者工厂,它必须支持事务。请参阅ProducerFactory.transactionCapable()

您可以将KafkaTransactionManager与正常的 Spring 事务支持(@TransactionalTransactionTemplate 等)一起使用。如果事务处于活动状态,则在事务范围内执行的任何KafkaTemplate操作都使用事务的Producer。管理器根据成功或失败提交或回滚事务。您必须配置KafkaTemplate以使用与事务管理器相同的ProducerFactory

事务同步

本节指的是仅生产者事务(不是由监听器容器启动的事务);有关在容器启动事务时链接事务的信息,请参阅使用消费者启动的事务

如果您想向 kafka 发送记录并执行一些数据库更新,您可以使用正常的 Spring 事务管理,例如使用DataSourceTransactionManager

@Transactional
public void process(List<Thing> things) {
    things.forEach(thing -> this.kafkaTemplate.send("topic", thing));
    updateDb(things);
}

@Transactional 注解的拦截器会启动事务,KafkaTemplate 会与该事务管理器同步事务;每个发送操作都会参与该事务。当方法退出时,数据库事务会提交,然后是 Kafka 事务。如果您希望以相反的顺序执行提交(Kafka 优先),请使用嵌套的 @Transactional 方法,将外部方法配置为使用 DataSourceTransactionManager,将内部方法配置为使用 KafkaTransactionManager

有关在 Kafka 优先或数据库优先配置中同步 JDBC 和 Kafka 事务的应用程序示例,请参见 Kafka 事务与其他事务管理器的示例

从版本 2.5.17、2.6.12、2.7.9 和 2.8.0 开始,如果在同步事务上提交失败(在主事务提交后),异常将被抛出给调用者。以前,这会被静默忽略(在调试级别记录)。应用程序应采取补救措施(如有必要)来补偿已提交的主事务。

使用消费者启动的事务

ChainedKafkaTransactionManager 现在已弃用,从版本 2.7 开始;有关更多信息,请参见其超类 ChainedTransactionManager 的 JavaDoc。相反,请在容器中使用 KafkaTransactionManager 启动 Kafka 事务,并使用 @Transactional 注解监听器方法以启动其他事务。

有关链接 JDBC 和 Kafka 事务的示例应用程序,请参见 Kafka 事务与其他事务管理器的示例

非阻塞重试 不能与 容器事务 结合使用。当监听器代码抛出异常时,容器事务提交成功,并且记录将发送到可重试主题。

KafkaTemplate 本地事务

您可以使用 KafkaTemplate 在本地事务中执行一系列操作。以下示例展示了如何执行此操作

boolean result = template.executeInTransaction(t -> {
    t.sendDefault("thing1", "thing2");
    t.sendDefault("cat", "hat");
    return true;
});

回调中的参数是模板本身(this)。如果回调正常退出,则提交事务。如果抛出异常,则回滚事务。

如果存在正在进行的 KafkaTransactionManager(或同步)事务,则不会使用它。相反,将使用新的“嵌套”事务。

TransactionIdPrefix

使用EOSMode.V2(也称为BETA),这是唯一支持的模式,不再需要使用相同的transactional.id,即使对于消费者发起的交易也是如此;事实上,它必须在每个实例上都是唯一的,就像生产者发起的交易一样。此属性在每个应用程序实例上必须具有不同的值。

TransactionIdSuffix Fixed

从 3.2 版本开始,引入了新的TransactionIdSuffixStrategy 接口来管理transactional.id 后缀。默认实现是DefaultTransactionIdSuffixStrategy,当设置maxCache 大于零时,可以在特定范围内重用transactional.id,否则将通过递增计数器动态生成后缀。当请求交易生产者并且所有transactional.id 都在使用时,将抛出NoProducerAvailableException。用户可以使用配置为重试该异常的RetryTemplate,并配置适当的回退。

public static class Config {

    @Bean
    public ProducerFactory<String, String> myProducerFactory() {
        Map<String, Object> configs = producerConfigs();
        configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
        ...
        DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
        ...
        TransactionIdSuffixStrategy ss = new DefaultTransactionIdSuffixStrategy(5);
        pf.setTransactionIdSuffixStrategy(ss);
        return pf;
    }

}

当将maxCache 设置为 5 时,transactional.idmy.txid.+{0-4}

当使用KafkaTransactionManagerConcurrentMessageListenerContainer 并且启用maxCache 时,需要将maxCache 设置为大于或等于concurrency 的值。如果MessageListenerContainer 无法获取transactional.id 后缀,它将抛出NoProducerAvailableException。当在ConcurrentMessageListenerContainer 中使用嵌套事务时,需要调整maxCache 设置以处理增加的嵌套事务数量。

KafkaTemplate 事务和非事务发布

通常,当KafkaTemplate 是事务性的(使用支持事务的生产者工厂配置)时,需要事务。事务可以通过TransactionTemplate@Transactional 方法、调用executeInTransaction 或通过监听器容器(当使用KafkaTransactionManager 配置时)启动。任何尝试在事务范围之外使用模板都会导致模板抛出IllegalStateException。从 2.4.3 版本开始,可以将模板的allowNonTransactional 属性设置为true。在这种情况下,模板将允许操作在没有事务的情况下运行,通过调用ProducerFactorycreateNonTransactionalProducer() 方法;生产者将被缓存或线程绑定,以便正常重用。请参阅使用DefaultKafkaProducerFactory

带有批处理监听器的交易

当监听器在使用交易时发生故障时,将调用AfterRollbackProcessor 以在回滚发生后采取一些措施。当使用带有记录监听器的默认AfterRollbackProcessor 时,将执行查找,以便重新传递失败的记录。但是,对于批处理监听器,整个批次将被重新传递,因为框架不知道批次中哪个记录失败。有关更多信息,请参阅回滚后处理器

在使用批处理监听器时,版本 2.4.2 引入了一种处理批处理过程中失败的替代机制:BatchToRecordAdapter。当配置了 batchListener 为 true 的容器工厂以及 BatchToRecordAdapter 时,监听器会一次调用一个记录。这使得在批处理中进行错误处理成为可能,同时仍然可以根据异常类型停止处理整个批处理。提供了一个默认的 BatchToRecordAdapter,它可以使用标准的 ConsumerRecordRecoverer 进行配置,例如 DeadLetterPublishingRecoverer。以下测试用例配置片段说明了如何使用此功能

public static class TestListener {

    final List<String> values = new ArrayList<>();

    @KafkaListener(id = "batchRecordAdapter", topics = "test")
    public void listen(String data) {
        values.add(data);
        if ("bar".equals(data)) {
            throw new RuntimeException("reject partial");
        }
    }

}

@Configuration
@EnableKafka
public static class Config {

    ConsumerRecord<?, ?> failed;

    @Bean
    public TestListener test() {
        return new TestListener();
    }

    @Bean
    public ConsumerFactory<?, ?> consumerFactory() {
        return mock(ConsumerFactory.class);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        factory.setBatchToRecordAdapter(new DefaultBatchToRecordAdapter<>((record, ex) ->  {
            this.failed = record;
        }));
        return factory;
    }

}