会话与事务
从 3.6 版本开始,MongoDB 支持会话的概念。会话的使用使 MongoDB 的 因果一致性 模型成为可能,该模型保证以尊重其因果关系的顺序运行操作。这些操作分为 ServerSession
实例和 ClientSession
实例。在本节中,当我们提到会话时,指的是 ClientSession
。
客户端会话内的操作不会与会话外的操作隔离。 |
MongoOperations
和 ReactiveMongoOperations
都提供网关方法用于将 ClientSession
绑定到操作。MongoCollection
和 MongoDatabase
使用实现 MongoDB 集合和数据库接口的会话代理对象,因此您无需在每次调用时都添加会话。这意味着对 MongoCollection#find()
的潜在调用将委托给 MongoCollection#find(ClientSession)
。
诸如 (Reactive)MongoOperations#getCollection 之类的方法返回本机 MongoDB Java 驱动程序网关对象(例如 MongoCollection ),这些对象本身提供用于 ClientSession 的专用方法。这些方法 **不是** 会话代理的。在直接与 MongoCollection 或 MongoDatabase 交互时,您应该在需要时提供 ClientSession ,而不是通过 MongoOperations 上的 #execute 回调之一提供。 |
ClientSession 支持
以下示例显示了会话的使用方法
-
命令式
-
响应式
ClientSessionOptions sessionOptions = ClientSessionOptions.builder()
.causallyConsistent(true)
.build();
ClientSession session = client.startSession(sessionOptions); (1)
template.withSession(() -> session)
.execute(action -> {
Query query = query(where("name").is("Durzo Blint"));
Person durzo = action.findOne(query, Person.class); (2)
Person azoth = new Person("Kylar Stern");
azoth.setMaster(durzo);
action.insert(azoth); (3)
return azoth;
});
session.close() (4)
1 | 从服务器获取新的会话。 |
2 | 像以前一样使用 MongoOperation 方法。ClientSession 会自动应用。 |
3 | 确保关闭 ClientSession 。 |
4 | 关闭会话。 |
在处理 DBRef 实例(尤其是延迟加载的实例)时,必须在加载所有数据之前 **不要** 关闭 ClientSession 。否则,延迟获取将失败。 |
ClientSessionOptions sessionOptions = ClientSessionOptions.builder()
.causallyConsistent(true)
.build();
Publisher<ClientSession> session = client.startSession(sessionOptions); (1)
template.withSession(session)
.execute(action -> {
Query query = query(where("name").is("Durzo Blint"));
return action.findOne(query, Person.class)
.flatMap(durzo -> {
Person azoth = new Person("Kylar Stern");
azoth.setMaster(durzo);
return action.insert(azoth); (2)
});
}, ClientSession::close) (3)
.subscribe(); (4)
1 | 获取用于新会话检索的 Publisher 。 |
2 | 像以前一样使用 ReactiveMongoOperation 方法。ClientSession 将自动获取和应用。 |
3 | 确保关闭 ClientSession 。 |
4 | 在您订阅之前不会发生任何事情。有关详细信息,请参阅 Project Reactor 参考指南。 |
通过使用提供实际会话的 Publisher
,您可以将会话获取延迟到实际订阅时。但是,您仍然需要在完成后关闭会话,以避免服务器被过时的会话污染。使用 execute
上的 doFinally
钩子在不再需要会话时调用 ClientSession#close()
。如果您希望对会话本身有更多控制,则可以通过驱动程序获取 ClientSession
并通过 Supplier
提供它。
ClientSession 的响应式使用仅限于模板 API 使用。目前,响应式存储库没有会话集成。 |
MongoDB 事务
除非您在应用程序上下文中指定 MongoTransactionManager ,否则事务支持将 **被禁用**。您可以使用 setSessionSynchronization(ALWAYS) 来参与正在进行的非原生 MongoDB 事务。 |
要完全以编程方式控制事务,您可能需要在 MongoOperations
上使用会话回调。
以下示例显示了以编程方式控制事务
-
命令式
-
响应式
ClientSession session = client.startSession(options); (1)
template.withSession(session)
.execute(action -> {
session.startTransaction(); (2)
try {
Step step = // ...;
action.insert(step);
process(step);
action.update(Step.class).apply(Update.set("state", // ...
session.commitTransaction(); (3)
} catch (RuntimeException e) {
session.abortTransaction(); (4)
}
}, ClientSession::close) (5)
1 | 获取新的 ClientSession 。 |
2 | 启动事务。 |
3 | 如果一切按预期进行,则提交更改。 |
4 | 某些内容出现故障,因此回滚所有内容。 |
5 | 完成后不要忘记关闭会话。 |
前面的示例允许您完全控制事务行为,同时在回调中使用会话范围的 MongoOperations
实例来确保会话传递到每个服务器调用。为了避免这种方法带来的某些开销,您可以使用 TransactionTemplate
来消除手动事务流程的一些噪音。
Mono<DeleteResult> result = Mono
.from(client.startSession()) (1)
.flatMap(session -> {
session.startTransaction(); (2)
return Mono.from(collection.deleteMany(session, ...)) (3)
.onErrorResume(e -> Mono.from(session.abortTransaction()).then(Mono.error(e))) (4)
.flatMap(val -> Mono.from(session.commitTransaction()).then(Mono.just(val))) (5)
.doFinally(signal -> session.close()); (6)
});
1 | 首先,我们显然需要启动会话。 |
2 | 一旦我们获得了 ClientSession ,就启动事务。 |
3 | 通过将 ClientSession 传递到操作来在事务中进行操作。 |
4 | 如果操作异常完成,我们需要停止事务并保留错误。 |
5 | 或者当然,在成功的情况下提交更改。仍然保留操作结果。 |
6 | 最后,我们需要确保关闭会话。 |
上述操作的罪魁祸首是保留主流程 DeleteResult
而不是通过 commitTransaction()
或 abortTransaction()
发布的事务结果,这导致设置相当复杂。
除非您在应用程序上下文中指定 ReactiveMongoTransactionManager ,否则事务支持将 **被禁用**。您可以使用 setSessionSynchronization(ALWAYS) 来参与正在进行的非原生 MongoDB 事务。 |
使用 TransactionTemplate/TransactionalOperator 进行事务
Spring Data MongoDB 事务支持 TransactionTemplate
和 TransactionalOperator
。
TransactionTemplate
/TransactionalOperator
进行事务-
命令式
-
响应式
template.setSessionSynchronization(ALWAYS); (1)
// ...
TransactionTemplate txTemplate = new TransactionTemplate(anyTxManager); (2)
txTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) { (3)
Step step = // ...;
template.insert(step);
process(step);
template.update(Step.class).apply(Update.set("state", // ...
}
});
1 | 在模板 API 配置期间启用事务同步。 |
2 | 使用提供的 PlatformTransactionManager 创建 TransactionTemplate 。 |
3 | 在回调中,ClientSession 和事务已注册。 |
在运行时更改 MongoTemplate 的状态(您可能认为在前面列表的项目 1 中是可能的)会导致线程和可见性问题。 |
template.setSessionSynchronization(ALWAYS); (1)
// ...
TransactionalOperator rxtx = TransactionalOperator.create(anyTxManager,
new DefaultTransactionDefinition()); (2)
Step step = // ...;
template.insert(step);
Mono<Void> process(step)
.then(template.update(Step.class).apply(Update.set("state", …))
.as(rxtx::transactional) (3)
.then();
1 | 启用事务同步以参与事务。 |
2 | 使用提供的 ReactiveTransactionManager 创建 TransactionalOperator 。 |
3 | TransactionalOperator.transactional(…) 为所有上游操作提供事务管理。 |
使用 MongoTransactionManager & ReactiveMongoTransactionManager 进行事务
MongoTransactionManager
/ ReactiveMongoTransactionManager
是通往众所周知的 Spring 事务支持的桥梁。它允许应用程序使用Spring 的受管事务功能。MongoTransactionManager
将 ClientSession
绑定到线程,而 ReactiveMongoTransactionManager
使用 ReactorContext
来实现此目的。MongoTemplate
检测会话并在与事务相关联的这些资源上进行操作。MongoTemplate
也可以参与其他正在进行的事务。以下示例展示了如何使用 MongoTransactionManager
创建和使用事务
MongoTransactionManager
/ ReactiveMongoTransactionManager
进行事务-
命令式
-
响应式
@Configuration
static class Config extends AbstractMongoClientConfiguration {
@Bean
MongoTransactionManager transactionManager(MongoDatabaseFactory dbFactory) { (1)
return new MongoTransactionManager(dbFactory);
}
// ...
}
@Component
public class StateService {
@Transactional
void someBusinessFunction(Step step) { (2)
template.insert(step);
process(step);
template.update(Step.class).apply(Update.set("state", // ...
};
});
1 | 在应用程序上下文中注册 MongoTransactionManager 。 |
2 | 将方法标记为事务性的。 |
@Transactional(readOnly = true) 建议 MongoTransactionManager 也启动一个事务,该事务将 ClientSession 添加到传出请求中。 |
@Configuration
public class Config extends AbstractReactiveMongoConfiguration {
@Bean
ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory factory) { (1)
return new ReactiveMongoTransactionManager(factory);
}
// ...
}
@Service
public class StateService {
@Transactional
Mono<UpdateResult> someBusinessFunction(Step step) { (2)
return template.insert(step)
.then(process(step))
.then(template.update(Step.class).apply(Update.set("state", …));
};
});
1 | 在应用程序上下文中注册 ReactiveMongoTransactionManager 。 |
2 | 将方法标记为事务性的。 |
@Transactional(readOnly = true) 建议 ReactiveMongoTransactionManager 也启动一个事务,该事务将 ClientSession 添加到传出请求中。 |
控制 MongoDB 特定的事务选项
事务性服务方法可能需要特定的事务选项来运行事务。Spring Data MongoDB 的事务管理器支持评估事务标签,例如 @Transactional(label = { "mongo:readConcern=available" })
。
默认情况下,使用 mongo:
前缀的标签命名空间由 MongoTransactionOptionsResolver
评估,该解析器默认配置。事务标签由 TransactionAttribute
提供,并可通过 TransactionTemplate
和 TransactionalOperator
用于程序化事务控制。由于其声明性,@Transactional(label = …)
提供了一个良好的起点,也可以作为文档。
目前,支持以下选项
- 最大提交时间
-
控制 commitTransaction 操作在服务器上的最大执行时间。值的格式与
Duration.parse(…)
使用的 ISO-8601 持续时间格式相对应。用法:
mongo:maxCommitTime=PT1S
- 读取关注
-
设置事务的读取关注。
用法:
mongo:readConcern=LOCAL|MAJORITY|LINEARIZABLE|SNAPSHOT|AVAILABLE
- 读取偏好
-
设置事务的读取偏好。
用法:
mongo:readPreference=PRIMARY|SECONDARY|SECONDARY_PREFERRED|PRIMARY_PREFERRED|NEAREST
- 写入关注
-
设置事务的写入关注。
用法:
mongo:writeConcern=ACKNOWLEDGED|W1|W2|W3|UNACKNOWLEDGED|JOURNALED|MAJORITY
加入外部事务的嵌套事务不会影响初始事务选项,因为事务已经启动。只有在启动新事务时才会应用事务选项。 |
事务内的特殊行为
在事务内,MongoDB 服务器的行为略有不同。
连接设置
MongoDB 驱动程序提供了一个专用的副本集名称配置选项,将驱动程序转换为自动检测模式。此选项有助于在事务期间识别主副本集节点和命令路由。
确保将 replicaSet 添加到 MongoDB URI。有关更多详细信息,请参阅连接字符串选项。 |
集合操作
MongoDB **不支持**在事务中进行集合操作,例如集合创建。这也影响了首次使用时发生的动态集合创建。因此,请确保所有必需的结构都已就位。
瞬态错误
MongoDB 可以在事务操作期间引发的错误中添加特殊标签。这些标签可能指示瞬态故障,这些故障可能仅通过重试操作即可消失。我们强烈建议使用Spring Retry 来实现这些目的。但是,可以覆盖 MongoTransactionManager#doCommit(MongoTransactionObject)
以实现重试提交操作行为,如 MongoDB 参考手册中所述。
计数
MongoDB count
对集合统计信息进行操作,这些统计信息可能无法反映事务中的实际情况。当在多文档事务内发出 count
命令时,服务器会响应错误 50851。一旦 MongoTemplate
检测到活动事务,所有公开的 count()
方法都会被转换并委托给聚合框架,使用 $match
和 $count
运算符,保留 Query
设置,例如 collation
。
在聚合计数帮助程序内使用地理命令时,适用限制。以下运算符无法使用,必须替换为其他运算符
-
$where
→$expr
-
$near
→$geoWithin
with$center
-
$nearSphere
→$geoWithin
with$centerSphere
使用 Criteria.near(…)
和 Criteria.nearSphere(…)
的查询必须分别重写为 Criteria.within(…)
和 Criteria.withinSphere(…)
。存储库查询方法中的 near
查询关键字也适用,必须更改为 within
。有关更多参考,另请参见 MongoDB JIRA 票证DRIVERS-518。
以下代码片段显示了在会话绑定闭包内使用 count
的情况
session.startTransaction();
template.withSession(session)
.execute(action -> {
action.count(query(where("state").is("active")), Step.class)
...
上面的代码片段最终转换为以下命令
db.collection.aggregate(
[
{ $match: { state: "active" } },
{ $count: "totalEntityCount" }
]
)
而不是
db.collection.find( { state: "active" } ).count()