事务 Binder

通过将 spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix 设置为非空值(例如 tx-)来启用事务。在处理器应用程序中使用时,使用者将启动事务;使用者线程上发送的任何记录都将参与同一事务。当侦听器正常退出时,侦听器容器将偏移量发送到事务并提交它。使用 spring.cloud.stream.kafka.binder.transaction.producer.* 属性配置的所有生产者绑定都使用一个公共生产者工厂;忽略各个绑定 Kafka 生产者属性。

由于重试将在原始事务中运行,该事务可能会回滚,并且任何已发布的记录也将回滚,因此不支持使用事务进行常规绑定重试(和死信)。当启用重试时(公共属性 maxAttempts 大于零),重试属性用于配置 DefaultAfterRollbackProcessor 以启用容器级别的重试。同样,此功能已移至侦听器容器,同样通过在主事务回滚后运行的 DefaultAfterRollbackProcessor,而不是在事务中发布死信记录。

如果您希望在源应用程序中使用事务,或者从某个任意线程进行仅生产者事务(例如 @Scheduled 方法),则必须获取对事务生产者工厂的引用,并使用它定义一个 KafkaTransactionManager bean。

@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders,
        @Value("${unique.tx.id.per.instance}") String txId) {

    ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
            MessageChannel.class)).getTransactionalProducerFactory();
    KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
    tm.setTransactionId(txId)
    return tm;
}

请注意,我们使用 BinderFactory 获取对绑定器的引用;当仅配置了一个绑定器时,在第一个参数中使用 null。如果配置了多个绑定器,请使用绑定器名称获取引用。一旦我们获得对绑定器的引用,我们就可以获得对 ProducerFactory 的引用并创建一个事务管理器。

然后,您可以使用正常的 Spring 事务支持,例如 TransactionTemplate@Transactional,例如

public static class Sender {

    @Transactional
    public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
        stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
    }

}

如果您希望将仅生产者事务与来自其他事务管理器的那些事务同步,请使用 ChainedTransactionManager

如果您部署多个应用程序实例,则每个实例都需要一个唯一的 transactionIdPrefix