【问题标题】:MongoDB Reactive Streams hangs when performing a query执行查询时 MongoDB Reactive Streams 挂起
【发布时间】:2021-11-27 15:31:00
【问题描述】:

我正在使用我在this example 之后实现的 MongoDB Reactive Streams Java API,但我遇到了一个严重的问题:有时,当我尝试查询集合时,await 方法不起作用,它会一直挂起,直到超时。

onSubscribe 方法被正确调用,但是onNextonErroronComplete 都没有被调用。

似乎没有导致此问题的特定情况。

这是我的代码

MongoDatabase database = MongoDBConnector.getClient().getDatabase("myDb");
MongoCollection<Document> collection = database.getCollection("myCollection");
FindPublisher<Document> finder = collection.find(Filters.exists("myField"));    
SettingSubscriber tagSub = new SettingSubscriber(finder); 
//SettingsSubscriber is a subclass of ObservableSubscriber which calls publisher.subscribe(this)
tagSub.await(); //this is where it hangs
return tagSub.getWrappedData();

【问题讨论】:

  • publisher#subscribe(subscriber),不见了。请参阅Quick Tour - MongoDB Reactive Streams Java Driver。另请参阅org.reactivestreams.Publisher's subscribe()
  • @prasad_ 实际上它就在那里(参见第 5 行代码的注释)。另外,正如我所说,onSubscribe 总是被调用。
  • @gscaparrotti myCollection 集合中有多少文档? myField 的存在检查将执行完整的集合扫描。如果集合中有很多记录,则查询可能超时
  • 文档不足100个,超时1分钟,所以我觉得不是这样。

标签: java mongodb reactive-streams


【解决方案1】:

我编写了一个我认为SettingSubscriber 看起来像的简单实现,并尝试使用常规脚本重新创建问题。我不能——我的代码运行时没有挂起,打印每个输出记录并退出。以下代码供参考:

@Grab(group = 'org.mongodb', module = 'mongodb-driver-reactivestreams', version = '4.3.3')
@Grab(group = 'org.slf4j', module = 'slf4j-api', version = '1.7.32')
@Grab(group = 'ch.qos.logback', module = 'logback-classic', version = '1.2.6')

import com.mongodb.MongoClientSettings;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.mongodb.reactivestreams.client.MongoClients;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoDatabase;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.FindPublisher;
import com.mongodb.client.model.Filters;
import org.bson.Document;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.CountDownLatch;

MongoClientSettings.Builder clientSettingsBuilder = MongoClientSettings.builder()
        .applyToClusterSettings { clusterSettingsBuilder ->
            clusterSettingsBuilder.hosts( Arrays.asList(new ServerAddress("localhost", 27017)))
        };
MongoClient mongoClient = MongoClients.create(clientSettingsBuilder.build());
MongoDatabase database = mongoClient.getDatabase("myDb");
MongoCollection<Document> collection = database.getCollection("myCollection");
FindPublisher<Document> finder = collection.find(Filters.exists("myField"));
SettingSubscriber tagSub = new SettingSubscriber(finder);
tagSub.await();

class SettingSubscriber implements Subscriber<Document> {
    private final CountDownLatch latch = new CountDownLatch(1);
    private Subscription subscription;
    private List<Document> data = new ArrayList<>();

    public SettingSubscriber(FindPublisher<Document> finder) {
        finder.subscribe(this);
    }

    @Override
    public void onSubscribe(final Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(final Document document) {
        System.out.println("Received: " + document);
        data.add(document);
        subscription.request(1);
    }

    @Override
    public void onError(final Throwable throwable) {
        throwable.printStackTrace();
        latch.countDown();
    }

    @Override
    public void onComplete() {
        System.out.println("Completed");
        latch.countDown();
    }
    
    public List<Document> getWrappedData() {
        return data;
    }

    public void await() throws Throwable {
        await(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
    }

    public void await(final long timeout, final TimeUnit unit) throws Throwable {
        if (!latch.await(timeout, unit)) {
            System.out.println("Publish timed out");
        }
    }
}

您能否将SettingSubscriber 的此实现与您的进行比较,看看是否遗漏了什么?

【讨论】:

  • 我看到的唯一区别是我没有在onNext中调用subscription.request(1)
  • 您是否在 onSubscribe 方法中执行过类似 subscription.request(Long.MAX_VALUE) 的操作? subscription.request 需要在某个地方进行调用,或者像subscription.request(Long.MAX_VALUE) 那样一次性获得所有结果,或者像subscription.request(1) 那样一次获得更少结果
  • 我使用onSubscribe 方法。如果我一次只请求少量项目并且发布者想要发送更多项目会怎样?
  • 这就是onNext方法中的subscription.request调用的目的,在我们收到一个物品后请求更多物品
  • 我明白了。我想知道如果 Publisher 的可用项目多于请求的数量会发生什么。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-12-08
  • 1970-01-01
  • 2018-01-30
相关资源
最近更新 更多