【问题标题】:Wait for reactive change stream subscription to be active with Spring Data MongoDB?等待响应式更改流订阅在 Spring Data MongoDB 中处于活动状态?
【发布时间】:2020-07-10 07:17:44
【问题描述】:

使用blocking Spring Data Mongo implementation 订阅change streams 时,可以调用await 等待订阅生效:

Subscription subscription = startBlockingMongoChangeStream();
subscription.await(Duration.of(2, SECONDS));
Document someDocument = ..
writeDocumentToMongoDb(someDocument);

startBlockingMongoChangeStream 的实现方式如下:

public Subscription startBlockingMongoChangeStream() {

    MessageListenerContainer container = new DefaultMessageListenerContainer(template);
    container.start();                                                                                        

    MessageListener<ChangeStreamDocument<Document>, Document> listener = System.out::println;                     
    ChangeStreamRequestOptions options = new ChangeStreamRequestOptions("user", ChangeStreamOptions.empty()); 

    return container.register(new ChangeStreamRequest<>(listener, options), Document.class);
}

如果在上面的示例中没有使用await,则someDocument 有可能(如果 JVM 很热,则几乎 100% 的可能性)被写入之前订阅处于活动状态,因此 @ 987654329@ 错过了。所以添加await 可以缓解这个问题。

我正在寻找一种在使用反应式实现时实现相同目标的方法。代码现在看起来像这样:

Disposable disposable = startReactiveMongoChangeStream().subscribe(); // (1)
Document someDocument = ..
writeDocumentToMongoDb(someDocument).subscribe(); // (2)

这里的问题再次是,someDocument 是在 startReactiveMongoChangeStream 返回的订阅开始之前写入的,因此文档丢失了。

还请注意,这是一个有些人为的示例,因为在我的实际应用程序中 writeDocumentToMongoDb (2) 不知道 startReactiveMongoChangeStream 订阅 (1) 所以我不能简单地 flatMap (1) 和调用 (2) . startReactiveMongoChangeStream 方法的实现方式如下:

public Flux<ChangeStreamEvent<String>> startReactiveMongoChangeStream() {
    return reactiveTemplate.changeStream(String.class) 
                           .watchCollection("user")
                           .listen();    
}

如何在响应式实现中“模拟”阻塞实现中可用的await 功能?

【问题讨论】:

    标签: java spring-boot reactive-programming spring-data-mongodb project-reactor


    【解决方案1】:

    TL;DR

    反应式 API 中没有同步方法

    说明

    首先,让我们看看这两种实现,以了解这是为什么。

    阻塞实现使用 MongoDB 的游标 API 来获取游标。获得游标包括与服务器的对话。 MessageListenerContainer 获取到游标后,订阅任务切换到活动状态,这意味着您已经等待获取第一个游标的阶段。

    反应式实现在ChangeStreamPublisher 上运行。从响应式流协议中,可以在发出元素、流完成或失败时收到通知。服务器端活动开始或完成时没有可用的通知。因此,您不能等到反应式 API 收到第一个游标。由于游标可能为空,第一个游标可能根本不会发出任何值。

    我认为 MongoDB 驱动程序可以提供回调样式的 API 来获得流处于活动状态的通知。不过,这需要在MongoDB issue tracker 中报告。

    【讨论】:

      猜你喜欢
      • 2020-02-25
      • 1970-01-01
      • 2018-08-01
      • 1970-01-01
      • 1970-01-01
      • 2020-03-16
      • 1970-01-01
      • 2023-02-06
      • 1970-01-01
      相关资源
      最近更新 更多