【发布时间】:2020-07-10 07:17:44
【问题描述】:
使用blocking Spring Data Mongo implementation 订阅change streams 时,可以调用await 等待订阅生效:
Subscription subscription = startBlockingMongoChangeStream();
subscription.await(Duration.of(2, SECONDS));
Document someDocument = ..
writeDocumentToMongoDb(someDocument);
startBlockingMongoChangeStream 的实现方式如下:
public Subscription startBlockingMongoChangeStream() {
MessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start();
MessageListener<ChangeStreamDocument<Document>, Document> listener = System.out::println;
ChangeStreamRequestOptions options = new ChangeStreamRequestOptions("user", ChangeStreamOptions.empty());
return container.register(new ChangeStreamRequest<>(listener, options), Document.class);
}
如果在上面的示例中没有使用await,则someDocument 有可能(如果 JVM 很热,则几乎 100% 的可能性)被写入之前订阅处于活动状态,因此 @ 987654329@ 错过了。所以添加await 可以缓解这个问题。
我正在寻找一种在使用反应式实现时实现相同目标的方法。代码现在看起来像这样:
Disposable disposable = startReactiveMongoChangeStream().subscribe(); // (1)
Document someDocument = ..
writeDocumentToMongoDb(someDocument).subscribe(); // (2)
这里的问题再次是,someDocument 是在 startReactiveMongoChangeStream 返回的订阅开始之前写入的,因此文档丢失了。
还请注意,这是一个有些人为的示例,因为在我的实际应用程序中 writeDocumentToMongoDb (2) 不知道 startReactiveMongoChangeStream 订阅 (1) 所以我不能简单地 flatMap (1) 和调用 (2) . startReactiveMongoChangeStream 方法的实现方式如下:
public Flux<ChangeStreamEvent<String>> startReactiveMongoChangeStream() {
return reactiveTemplate.changeStream(String.class)
.watchCollection("user")
.listen();
}
如何在响应式实现中“模拟”阻塞实现中可用的await 功能?
【问题讨论】:
标签: java spring-boot reactive-programming spring-data-mongodb project-reactor