会话与事务

从 3.6 版本开始,MongoDB 支持会话的概念。会话的使用使 MongoDB 的 因果一致性 模型成为可能,该模型保证以尊重其因果关系的顺序运行操作。这些操作分为 ServerSession 实例和 ClientSession 实例。在本节中,当我们提到会话时,指的是 ClientSession

客户端会话内的操作不会与会话外的操作隔离。

MongoOperationsReactiveMongoOperations 都提供网关方法用于将 ClientSession 绑定到操作。MongoCollectionMongoDatabase 使用实现 MongoDB 集合和数据库接口的会话代理对象,因此您无需在每次调用时都添加会话。这意味着对 MongoCollection#find() 的潜在调用将委托给 MongoCollection#find(ClientSession)

诸如 (Reactive)MongoOperations#getCollection 之类的方法返回本机 MongoDB Java 驱动程序网关对象(例如 MongoCollection),这些对象本身提供用于 ClientSession 的专用方法。这些方法 **不是** 会话代理的。在直接与 MongoCollectionMongoDatabase 交互时,您应该在需要时提供 ClientSession,而不是通过 MongoOperations 上的 #execute 回调之一提供。

ClientSession 支持

以下示例显示了会话的使用方法

  • 命令式

  • 响应式

ClientSessionOptions sessionOptions = ClientSessionOptions.builder()
    .causallyConsistent(true)
    .build();

ClientSession session = client.startSession(sessionOptions); (1)

template.withSession(() -> session)
    .execute(action -> {

        Query query = query(where("name").is("Durzo Blint"));
        Person durzo = action.findOne(query, Person.class);  (2)

        Person azoth = new Person("Kylar Stern");
        azoth.setMaster(durzo);

        action.insert(azoth);                                (3)

        return azoth;
    });

session.close()                                              (4)
1 从服务器获取新的会话。
2 像以前一样使用 MongoOperation 方法。ClientSession 会自动应用。
3 确保关闭 ClientSession
4 关闭会话。
在处理 DBRef 实例(尤其是延迟加载的实例)时,必须在加载所有数据之前 **不要** 关闭 ClientSession。否则,延迟获取将失败。
ClientSessionOptions sessionOptions = ClientSessionOptions.builder()
.causallyConsistent(true)
.build();

Publisher<ClientSession> session = client.startSession(sessionOptions); (1)

template.withSession(session)
.execute(action -> {

        Query query = query(where("name").is("Durzo Blint"));
        return action.findOne(query, Person.class)
            .flatMap(durzo -> {

                Person azoth = new Person("Kylar Stern");
                azoth.setMaster(durzo);

                return action.insert(azoth);                            (2)
            });
    }, ClientSession::close)                                            (3)
    .subscribe();                                                       (4)
1 获取用于新会话检索的 Publisher
2 像以前一样使用 ReactiveMongoOperation 方法。ClientSession 将自动获取和应用。
3 确保关闭 ClientSession
4 在您订阅之前不会发生任何事情。有关详细信息,请参阅 Project Reactor 参考指南

通过使用提供实际会话的 Publisher,您可以将会话获取延迟到实际订阅时。但是,您仍然需要在完成后关闭会话,以避免服务器被过时的会话污染。使用 execute 上的 doFinally 钩子在不再需要会话时调用 ClientSession#close()。如果您希望对会话本身有更多控制,则可以通过驱动程序获取 ClientSession 并通过 Supplier 提供它。

ClientSession 的响应式使用仅限于模板 API 使用。目前,响应式存储库没有会话集成。

MongoDB 事务

从 4 版本开始,MongoDB 支持 事务。事务构建在 会话 之上,因此需要一个活动的 ClientSession

除非您在应用程序上下文中指定 MongoTransactionManager,否则事务支持将 **被禁用**。您可以使用 setSessionSynchronization(ALWAYS) 来参与正在进行的非原生 MongoDB 事务。

要完全以编程方式控制事务,您可能需要在 MongoOperations 上使用会话回调。

以下示例显示了以编程方式控制事务

编程事务
  • 命令式

  • 响应式

ClientSession session = client.startSession(options);                   (1)

template.withSession(session)
    .execute(action -> {

        session.startTransaction();                                     (2)

        try {

            Step step = // ...;
            action.insert(step);

            process(step);

            action.update(Step.class).apply(Update.set("state", // ...

            session.commitTransaction();                                (3)

        } catch (RuntimeException e) {
            session.abortTransaction();                                 (4)
        }
    }, ClientSession::close)                                            (5)
1 获取新的 ClientSession
2 启动事务。
3 如果一切按预期进行,则提交更改。
4 某些内容出现故障,因此回滚所有内容。
5 完成后不要忘记关闭会话。

前面的示例允许您完全控制事务行为,同时在回调中使用会话范围的 MongoOperations 实例来确保会话传递到每个服务器调用。为了避免这种方法带来的某些开销,您可以使用 TransactionTemplate 来消除手动事务流程的一些噪音。

Mono<DeleteResult> result = Mono
    .from(client.startSession())                                                             (1)

    .flatMap(session -> {
        session.startTransaction();                                                          (2)

        return Mono.from(collection.deleteMany(session, ...))                                (3)

            .onErrorResume(e -> Mono.from(session.abortTransaction()).then(Mono.error(e)))   (4)

            .flatMap(val -> Mono.from(session.commitTransaction()).then(Mono.just(val)))     (5)

            .doFinally(signal -> session.close());                                           (6)
      });
1 首先,我们显然需要启动会话。
2 一旦我们获得了 ClientSession,就启动事务。
3 通过将 ClientSession 传递到操作来在事务中进行操作。
4 如果操作异常完成,我们需要停止事务并保留错误。
5 或者当然,在成功的情况下提交更改。仍然保留操作结果。
6 最后,我们需要确保关闭会话。

上述操作的罪魁祸首是保留主流程 DeleteResult 而不是通过 commitTransaction()abortTransaction() 发布的事务结果,这导致设置相当复杂。

除非您在应用程序上下文中指定 ReactiveMongoTransactionManager,否则事务支持将 **被禁用**。您可以使用 setSessionSynchronization(ALWAYS) 来参与正在进行的非原生 MongoDB 事务。

使用 TransactionTemplate/TransactionalOperator 进行事务

Spring Data MongoDB 事务支持 TransactionTemplateTransactionalOperator

使用 TransactionTemplate/TransactionalOperator 进行事务
  • 命令式

  • 响应式

template.setSessionSynchronization(ALWAYS);                                     (1)

// ...

TransactionTemplate txTemplate = new TransactionTemplate(anyTxManager);         (2)

txTemplate.execute(new TransactionCallbackWithoutResult() {

    @Override
    protected void doInTransactionWithoutResult(TransactionStatus status) {     (3)

        Step step = // ...;
        template.insert(step);

        process(step);

        template.update(Step.class).apply(Update.set("state", // ...
    }
});
1 在模板 API 配置期间启用事务同步。
2 使用提供的 PlatformTransactionManager 创建 TransactionTemplate
3 在回调中,ClientSession 和事务已注册。
在运行时更改 MongoTemplate 的状态(您可能认为在前面列表的项目 1 中是可能的)会导致线程和可见性问题。
template.setSessionSynchronization(ALWAYS);                                          (1)

// ...

TransactionalOperator rxtx = TransactionalOperator.create(anyTxManager,
                                   new DefaultTransactionDefinition());              (2)


Step step = // ...;
template.insert(step);

Mono<Void> process(step)
    .then(template.update(Step.class).apply(Update.set("state", …))
    .as(rxtx::transactional)                                                         (3)
    .then();
1 启用事务同步以参与事务。
2 使用提供的 ReactiveTransactionManager 创建 TransactionalOperator
3 TransactionalOperator.transactional(…) 为所有上游操作提供事务管理。

使用 MongoTransactionManager & ReactiveMongoTransactionManager 进行事务

MongoTransactionManager / ReactiveMongoTransactionManager 是通往众所周知的 Spring 事务支持的桥梁。它允许应用程序使用Spring 的受管事务功能MongoTransactionManagerClientSession 绑定到线程,而 ReactiveMongoTransactionManager 使用 ReactorContext 来实现此目的。MongoTemplate 检测会话并在与事务相关联的这些资源上进行操作。MongoTemplate 也可以参与其他正在进行的事务。以下示例展示了如何使用 MongoTransactionManager 创建和使用事务

使用 MongoTransactionManager / ReactiveMongoTransactionManager 进行事务
  • 命令式

  • 响应式

@Configuration
static class Config extends AbstractMongoClientConfiguration {

    @Bean
    MongoTransactionManager transactionManager(MongoDatabaseFactory dbFactory) {  (1)
        return new MongoTransactionManager(dbFactory);
    }

    // ...
}

@Component
public class StateService {

    @Transactional
    void someBusinessFunction(Step step) {                                        (2)

        template.insert(step);

        process(step);

        template.update(Step.class).apply(Update.set("state", // ...
    };
});
1 在应用程序上下文中注册 MongoTransactionManager
2 将方法标记为事务性的。
@Transactional(readOnly = true) 建议 MongoTransactionManager 也启动一个事务,该事务将 ClientSession 添加到传出请求中。
@Configuration
public class Config extends AbstractReactiveMongoConfiguration {

    @Bean
    ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory factory) {  (1)
        return new ReactiveMongoTransactionManager(factory);
    }

    // ...
}

@Service
public class StateService {

    @Transactional
    Mono<UpdateResult> someBusinessFunction(Step step) {                                  (2)

        return template.insert(step)
            .then(process(step))
            .then(template.update(Step.class).apply(Update.set("state", …));
    };
});
1 在应用程序上下文中注册 ReactiveMongoTransactionManager
2 将方法标记为事务性的。
@Transactional(readOnly = true) 建议 ReactiveMongoTransactionManager 也启动一个事务,该事务将 ClientSession 添加到传出请求中。

控制 MongoDB 特定的事务选项

事务性服务方法可能需要特定的事务选项来运行事务。Spring Data MongoDB 的事务管理器支持评估事务标签,例如 @Transactional(label = { "mongo:readConcern=available" })

默认情况下,使用 mongo: 前缀的标签命名空间由 MongoTransactionOptionsResolver 评估,该解析器默认配置。事务标签由 TransactionAttribute 提供,并可通过 TransactionTemplateTransactionalOperator 用于程序化事务控制。由于其声明性,@Transactional(label = …) 提供了一个良好的起点,也可以作为文档。

目前,支持以下选项

最大提交时间

控制 commitTransaction 操作在服务器上的最大执行时间。值的格式与 Duration.parse(…) 使用的 ISO-8601 持续时间格式相对应。

用法:mongo:maxCommitTime=PT1S

读取关注

设置事务的读取关注。

用法:mongo:readConcern=LOCAL|MAJORITY|LINEARIZABLE|SNAPSHOT|AVAILABLE

读取偏好

设置事务的读取偏好。

用法:mongo:readPreference=PRIMARY|SECONDARY|SECONDARY_PREFERRED|PRIMARY_PREFERRED|NEAREST

写入关注

设置事务的写入关注。

用法:mongo:writeConcern=ACKNOWLEDGED|W1|W2|W3|UNACKNOWLEDGED|JOURNALED|MAJORITY

加入外部事务的嵌套事务不会影响初始事务选项,因为事务已经启动。只有在启动新事务时才会应用事务选项。

事务内的特殊行为

在事务内,MongoDB 服务器的行为略有不同。

连接设置

MongoDB 驱动程序提供了一个专用的副本集名称配置选项,将驱动程序转换为自动检测模式。此选项有助于在事务期间识别主副本集节点和命令路由。

确保将 replicaSet 添加到 MongoDB URI。有关更多详细信息,请参阅连接字符串选项

集合操作

MongoDB **不支持**在事务中进行集合操作,例如集合创建。这也影响了首次使用时发生的动态集合创建。因此,请确保所有必需的结构都已就位。

瞬态错误

MongoDB 可以在事务操作期间引发的错误中添加特殊标签。这些标签可能指示瞬态故障,这些故障可能仅通过重试操作即可消失。我们强烈建议使用Spring Retry 来实现这些目的。但是,可以覆盖 MongoTransactionManager#doCommit(MongoTransactionObject) 以实现重试提交操作行为,如 MongoDB 参考手册中所述。

计数

MongoDB count 对集合统计信息进行操作,这些统计信息可能无法反映事务中的实际情况。当在多文档事务内发出 count 命令时,服务器会响应错误 50851。一旦 MongoTemplate 检测到活动事务,所有公开的 count() 方法都会被转换并委托给聚合框架,使用 $match$count 运算符,保留 Query 设置,例如 collation

在聚合计数帮助程序内使用地理命令时,适用限制。以下运算符无法使用,必须替换为其他运算符

  • $where$expr

  • $near$geoWithin with $center

  • $nearSphere$geoWithin with $centerSphere

使用 Criteria.near(…)Criteria.nearSphere(…) 的查询必须分别重写为 Criteria.within(…)Criteria.withinSphere(…)。存储库查询方法中的 near 查询关键字也适用,必须更改为 within。有关更多参考,另请参见 MongoDB JIRA 票证DRIVERS-518

以下代码片段显示了在会话绑定闭包内使用 count 的情况

session.startTransaction();

template.withSession(session)
    .execute(action -> {
        action.count(query(where("state").is("active")), Step.class)
        ...

上面的代码片段最终转换为以下命令

db.collection.aggregate(
   [
      { $match: { state: "active" } },
      { $count: "totalEntityCount" }
   ]
)

而不是

db.collection.find( { state: "active" } ).count()