可追踪游标
默认情况下,当客户端用尽光标提供的所有结果时,MongoDB 会自动关闭光标。用尽时关闭光标会将流变成有限流。对于 capped 集合,你可以使用在客户端使用所有最初返回的数据后仍然保持打开状态的 可跟踪光标。
可以通过 MongoOperations.createCollection 创建 capped 集合。为此,提供必需的 CollectionOptions.empty().capped()… 。
|
可通过命令式和响应式 MongoDB API 使用可跟踪游标。强烈建议使用响应式变体,因为它对资源的消耗较少。但是,如果您无法使用响应式 API,仍然可以使用 Spring 生态系统中已普遍存在的消息传递概念。
使用 MessageListener
的可跟踪游标
使用同步驱动程序侦听封顶集合会创建一个长期运行的阻塞任务,需要将其委托给单独的组件。在这种情况下,我们需要先创建一个 MessageListenerContainer
,它将成为运行特定 SubscriptionRequest
的主要入口点。Spring Data MongoDB 已随附一个在 MongoTemplate
上运行的默认实现,并且能够为 TailableCursorRequest
创建和运行 Task
实例。
以下示例展示了如何将可跟踪游标与 MessageListener
实例一起使用
示例 1. 使用
MessageListener
实例的可跟踪游标MessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start(); (1)
MessageListener<Document, User> listener = System.out::println; (2)
TailableCursorRequest request = TailableCursorRequest.builder()
.collection("orders") (3)
.filter(query(where("value").lt(100))) (4)
.publishTo(listener) (5)
.build();
container.register(request, User.class); (6)
// ...
container.stop(); (7)
1 | 启动容器会初始化资源,并为已注册的 SubscriptionRequest 实例启动 Task 实例。启动后添加的请求会立即运行。 |
2 | 定义在收到 Message 时调用的侦听器。Message#getBody() 会转换为请求的域类型。使用 Document 来接收未转换的原始结果。 |
3 | 设置要侦听的集合。 |
4 | 提供要接收的文档的可选筛选器。 |
5 | 设置消息侦听器以发布传入的 Message 。 |
6 | 注册请求。返回的 Subscription 可用于检查当前 Task 状态并取消它以释放资源。 |
7 | 一旦确定不再需要容器,请务必停止它。这样做会停止容器内所有正在运行的 Task 实例。 |
响应式可跟踪游标
将可跟踪游标与响应式数据类型一起使用允许构建无限流。可跟踪游标会保持打开状态,直到外部将其关闭。当新文档到达封顶集合时,它会发出数据。
如果查询没有返回匹配项,或者游标返回集合“末尾”处的文档,并且应用程序随后删除该文档,则可跟踪游标可能会失效或无效。以下示例展示了如何创建和使用无限流查询
示例 2. 使用 ReactiveMongoOperations 的无限流查询
Flux<Person> stream = template.tail(query(where("name").is("Joe")), Person.class);
Disposable subscription = stream.doOnNext(person -> System.out.println(person)).subscribe();
// …
// Later: Dispose the subscription to close the stream
subscription.dispose();
Spring Data MongoDB 响应式存储库通过使用 @Tailable
注释查询方法来支持无限流。这适用于返回 Flux
和其他能够发出多个元素的响应式类型的函数,如下例所示
示例 3. 使用 ReactiveMongoRepository 的无限流查询
public interface PersonRepository extends ReactiveMongoRepository<Person, String> {
@Tailable
Flux<Person> findByFirstname(String firstname);
}
Flux<Person> stream = repository.findByFirstname("Joe");
Disposable subscription = stream.doOnNext(System.out::println).subscribe();
// …
// Later: Dispose the subscription to close the stream
subscription.dispose();