【问题标题】:MongoDB Java API slow reading peformanceMongoDB Java API 读取速度慢
【发布时间】:2019-02-05 15:43:25
【问题描述】:

我们正在从本地 MongoDB 读取集合中的所有文档,性能不是很出色。

我们需要转储所有数据,不要担心为什么,只要相信它确实需要并且没有解决办法。

我们有 4mio 文档,看起来像:

{
    "_id":"4d094f58c96767d7a0099d49",
    "exchange":"NASDAQ",
    "stock_symbol":"AACC",
    "date":"2008-03-07",
    "open":8.4,
    "high":8.75,
    "low":8.08,
    "close":8.55,
    "volume":275800,
    "adj close":8.55
}

我们现在使用这个简单的代码来阅读:

MongoClient mongoClient = MongoClients.create();
MongoDatabase database = mongoClient.getDatabase("localhost");
MongoCollection<Document> collection = database.getCollection("test");

MutableInt count = new MutableInt();
long start = System.currentTimeMillis();
collection.find().forEach((Block<Document>) document -> count.increment() /* actually something more complicated */ );
long start = System.currentTimeMillis();

我们以 16 秒(250k 行/秒)的速度读取整个集合,这对于小型文档来说真的一点也不令人印象深刻。请记住,我们要加载 800mio 行。不能进行聚合、map reduce 或类似操作。

这是否与 MongoDB 一样快,或者是否有其他方法可以更快地加载文档(其他技术、移动 Linux、更多 RAM、设置...)?

【问题讨论】:

  • 我认为this article about MongoDB bulk operations 有助于了解如何提高性能。要获取可以找到多少对象,您应该使用db.collection.stats()This 是如何从 java 调用它。我会选择网络和数据密集程度较低的方式在 MongoDB 上进行任何操作。
  • 您真的需要翻阅 800mio 文档还是只看一个子集?如果可能的话,这将是我建议改变的第一件事。此外,投影将有助于限制需要反序列化的数据量。如果不了解您到底想在循环中做什么,这是一个非常广泛的问题......
  • 您想对您的文档进行什么操作?您应该描述您的要求,以便有人可以帮助您。例如,如果只想像您的示例一样计数,则使用内置的计数功能会更好。我认为你想做的远不止这些
  • 我在其他问题中看到过类似的情况。出于某种原因,即使您想要集合中的所有文档,Mongo 也会使用索引和随机磁盘操作。你使用压缩吗?加密?
  • that is really not impressive at all.. 这也取决于您的硬件。请提供有关此的更多信息。您使用哪种硬件?操作系统?我认为 16 秒内 8 亿行对于 Raspberry Pi 来说非常棒!

标签: java mongodb performance mongodb-java mongo-java


【解决方案1】:

collection.find().forEach((Block&lt;Document&gt;) document -&gt; count.increment());

由于您在内存中迭代超过 250k 条记录,这一行可能会增加很多时间。

要快速检查是不是这样,你可以试试这个 -

long start1 = System.currentTimeMillis();
List<Document> documents = collection.find();
System.out.println(System.currentTimeMillis() - start1);

long start2 = System.currentTimeMillis();
documents.forEach((Block<Document>) document -> count.increment());
System.out.println(System.currentTimeMillis() - start2);

这将帮助您了解从数据库中获取文档实际需要多少时间以及迭代需要多少时间。

【讨论】:

  • 嗨,最终我对两者的总和感兴趣,它不会改变,因为我的代码只调用一次 find()
  • 为什么要使用count?你不能通过documents.size() 得到那个吗?
  • 这只是一个没有转换答案的测试,但是您可以想象那里需要在服务器端完成的任何代码
【解决方案2】:

您没有指定用例,因此很难告诉您如何调整查询。 (即:谁愿意一次加载 8 亿行只是为了计数?)。

鉴于您的架构,我认为您的数据几乎是只读的,并且您的任务与数据聚合有关。

您当前的工作只是读取数据,(很可能您的驱动程序将批量读取),然后停止,然后执行一些计算(地狱是的,一个 int 包装器用于更多地增加处理时间),然后重复。这不是一个好方法。如果您没有以正确的方式访问数据库,它就不会神奇地快速。

如果计算不是太复杂,我建议你使用aggregation framework而不是全部加载到你的RAM中。

您应该考虑改进聚合:

  1. 将您的数据集分成更小的集合。 (例如:date 分区,exchange 分区...)。添加索引以支持该分区并在分区上操作聚合,然后组合结果(典型的分而治之方法)
  2. 项目仅需要字段
  3. 过滤掉不必要的文档(如果可能的话)
  4. 如果您无法在内存上执行聚合(如果您达到每个管道 100MB 的限制),请允许使用磁盘。
  5. 使用内置管道加快计算速度(例如:$count 用于您的示例)

如果您的计算过于复杂而无法用聚合框架表达,请使用mapReduce。它在mongod 进程上运行,数据不需要通过网络传输到您的内存中。

更新

所以看起来您想要进行 OLAP 处理,但您停留在 ETL 步骤。

您不需要也必须避免每次都将整个 OLTP 数据加载到 OLAP。只需要将新的更改加载到您的数据仓库。那么第一次数据加载/转储需要更多时间是正常且可以接受的。

首次加载时,应考虑以下几点:

  1. 分而治之,再次将您的数据分解为更小的数据集(使用日期/交易所/股票标签等谓词...)
  2. 进行并行计算,然后合并您的结果(您必须正确划分数据集)
  3. 批量计算而不是 forEach 中的处理:加载数据分区然后计算而不是逐个计算。

【讨论】:

  • 谢谢,我们确实需要获取 800mio 行并将它们发送到另一个实时计算引擎。因此,聚合、map reduce 等不是一种选择,我们需要这些信息。
  • 所以看起来您正在执行 OLAP 操作。为什么每次都必须发送 800mill 行?我认为你应该只发送新文件
  • 是的 :-),我们需要在启动时至少加载一次
  • 我想知道您为什么要在您的应用服务器上进行计算(例如您的计数示例)。使用我上面描述的分而治之的模式。将您的数据集分成较小的数据集,然后进行并行加载(因为那些较小的数据集不重叠)。加载时不要计算每个文档(如您的问题中的 foreach),而是批量计算(IE,读取分区后,做等等,然后继续)
  • 计数只是一个惊人的过度简化。我们需要加载所有数据,然后实时进行一些高级数学运算(在 mongoDB 中无法做到这一点),但这是另一个“故事”。现在我们需要 'dump' 。感谢您的提示
【解决方案3】:

我认为在你的情况下我应该做的是一个简单的解决方案,同时一种有效的方法是通过使用 parallelCollectionScan 来最大化整体吞吐量

允许应用程序在读取所有内容时使用多个并行游标 集合中的文档,从而增加吞吐量。这 parallelCollectionScan 命令返回一个包含 光标信息数组。

每个游标都提供对部分集合的返回的访问 集合中的文档。迭代每个游标返回每个 集合中的文档。游标不包含结果 数据库命令。数据库命令的结果标识 游标,但不包含或构成游标。

parallelCollectionScan 的一个简单示例应该是这样的

 MongoClient mongoClient = MongoClients.create();
 MongoDatabase database = mongoClient.getDatabase("localhost");
 Document commandResult = database.runCommand(new Document("parallelCollectionScan", "collectionName").append("numCursors", 3));

【讨论】:

  • 如果瓶颈是后端 CPU,这很好。但通常是 Mongo 的读取速度。
  • 运气不好,我们只返回一个光标 -> '此命令不会为 WiredTiger 存储引擎返回一个以上的光标。 docs.mongodb.com/manual/reference/command/…
  • 更甚的是,它在 4.2 及更高版本中被完全删除。
【解决方案4】:

据我计算,您正在处理大约 50 MiB/s(250k 行/秒 * 0.2 KiB/行)。这进入了磁盘驱动器和网络瓶颈领域。 MongoDB 使用什么样的存储?客户端和 MongoDB 服务器之间有什么样的带宽?您是否尝试过以最小 (= 10 Gib/s) 网络上?请记住,如果您使用的是 AWS 或 GCP 之类的云计算提供商,他们将遇到物理瓶颈之上的虚拟化瓶颈。

您询问了可能有帮助的设置。您可以尝试更改connectioncollection 上的压缩设置(选项为“无”、snappyzlib)。即使在snappy 上两者都没有改进,查看设置产生(或没有产生)的差异可能有助于确定系统的哪个部分承受的压力最大。

与 C++ 或 Python 相比,Java 在数字运算方面的性能不佳,因此您可以考虑用其中一种语言重写此特定操作,然后将其与您的 Java 代码集成。我建议您在 Python 中循环数据并将其与 Java 中的相同数据进行测试运行。

【讨论】:

    【解决方案5】:

    首先,正如@xtreme-biker 所说,性能很大程度上取决于您的硬件。具体来说,我的第一个建议是检查您是在虚拟机还是本机主机上运行。在我的情况下,使用带有 SDD 驱动器的 i7 上的 CentOS VM,我每秒可以读取 123,000 个文档,但在同一驱动器上的 Windows 主机上运行的完全相同的代码每秒可以读取多达 387,000 个文档。

    接下来,假设您确实需要阅读完整的合集。这就是说您必须执行全扫描。假设您不能更改 MongoDB 服务器的配置,只能优化您的代码。

    然后一切都归结为什么

    collection.find().forEach((Block<Document>) document -> count.increment());
    

    确实如此。

    快速展开 MongoCollection.find() 表明它实际上是这样做的:

    ReadPreference readPref = ReadPreference.primary();
    ReadConcern concern = ReadConcern.DEFAULT;
    MongoNamespace ns = new MongoNamespace(databaseName,collectionName);
    Decoder<Document> codec = new DocumentCodec();
    FindOperation<Document> fop = new FindOperation<Document>(ns,codec);
    ReadWriteBinding readBinding = new ClusterBinding(getCluster(), readPref, concern);
    QueryBatchCursor<Document> cursor = (QueryBatchCursor<Document>) fop.execute(readBinding);
    AtomicInteger count = new AtomicInteger(0);
    try (MongoBatchCursorAdapter<Document> cursorAdapter = new MongoBatchCursorAdapter<Document>(cursor)) {
        while (cursorAdapter.hasNext()) {
            Document doc = cursorAdapter.next();
            count.incrementAndGet();
        }
    }
    

    这里 FindOperation.execute() 相当快(不到 10 毫秒),大部分时间都花在了 while 循环中,特别是在私有方法 QueryBatchCursor.getMore()

    getMore() 调用DefaultServerConnection.command() 其时间主要用于两个操作:1) 从服务器获取字符串数据和2) 转换字符串数据进入 BsonDocument。

    事实证明,Mongo 非常聪明,它会执行多少次网络往返来获取大型结果集。它将首先使用 firstBatch 命令获取 100 个结果,然后获取更大的批次,其中 nextBatch 是批次大小,具体取决于集合大小达到限制。

    所以,在木头下会发生这样的事情来取第一批。

    ReadPreference readPref = ReadPreference.primary();
    ReadConcern concern = ReadConcern.DEFAULT;
    MongoNamespace ns = new MongoNamespace(databaseName,collectionName);
    FieldNameValidator noOpValidator = new NoOpFieldNameValidator();
    DocumentCodec payloadDecoder = new DocumentCodec();
    Constructor<CodecProvider> providerConstructor = (Constructor<CodecProvider>) Class.forName("com.mongodb.operation.CommandResultCodecProvider").getDeclaredConstructor(Decoder.class, List.class);
    providerConstructor.setAccessible(true);
    CodecProvider firstBatchProvider = providerConstructor.newInstance(payloadDecoder, Collections.singletonList("firstBatch"));
    CodecProvider nextBatchProvider = providerConstructor.newInstance(payloadDecoder, Collections.singletonList("nextBatch"));
    Codec<BsonDocument> firstBatchCodec = fromProviders(Collections.singletonList(firstBatchProvider)).get(BsonDocument.class);
    Codec<BsonDocument> nextBatchCodec = fromProviders(Collections.singletonList(nextBatchProvider)).get(BsonDocument.class);
    ReadWriteBinding readBinding = new ClusterBinding(getCluster(), readPref, concern);
    BsonDocument find = new BsonDocument("find", new BsonString(collectionName));
    Connection conn = readBinding.getReadConnectionSource().getConnection();
    
    BsonDocument results = conn.command(databaseName,find,noOpValidator,readPref,firstBatchCodec,readBinding.getReadConnectionSource().getSessionContext(), true, null, null);
    BsonDocument cursor = results.getDocument("cursor");
    long cursorId = cursor.getInt64("id").longValue();
    
    BsonArray firstBatch = cursor.getArray("firstBatch");
    

    然后cursorId 用于获取每个下一批。

    在我看来,驱动程序实现的“问题”是 String 到 JSON 解码器被注入,但 JsonReader(decode() 方法所依赖的)没有注入。这种方式甚至可以达到com.mongodb.internal.connection.InternalStreamConnection,您已经在套接字通信附近。

    因此,我认为你几乎没有什么可以改进MongoCollection.find() 除非你深入到InternalStreamConnection.sendAndReceiveAsync()

    您无法减少往返次数,也无法更改将响应转换为 BsonDocument 的方式。并非没有绕过驱动程序并编写自己的客户端,我怀疑这是一个好主意。

    P.D.如果您想尝试上面的一些代码,您需要 getCluster() 方法,该方法需要对mongo-java-driver 进行恶意破解。

    private Cluster getCluster() {
        Field cluster, delegate;
        Cluster mongoCluster = null;
        try {
            delegate = mongoClient.getClass().getDeclaredField("delegate");
            delegate.setAccessible(true);
            Object clientDelegate = delegate.get(mongoClient);
            cluster = clientDelegate.getClass().getDeclaredField("cluster");
            cluster.setAccessible(true);
            mongoCluster = (Cluster) cluster.get(clientDelegate);
        } catch (NoSuchFieldException | SecurityException | IllegalArgumentException | IllegalAccessException e) {
            System.err.println(e.getClass().getName()+" "+e.getMessage());
        }
        return mongoCluster;
    }
    

    【讨论】:

    • 我们一直在玩,我们得出了类似的结论,解码占用了很大一部分时间。谢谢
    • @ic3 我认为这仍然支持您应该在 Python 或 C++ 中重写此特定操作的想法,至少作为测试。
    • @OldPro 我们致力于这个想法,并解耦监听端口和解码同一线程,等等。完成并调整后,我将写下我们的发现。一个可怜的 bson 不会将 fieldNames 放在地图中,因此不会发送相同的字段,一遍又一遍。. 一些关于我们正在改进的提示;-)
    猜你喜欢
    • 2013-02-16
    • 1970-01-01
    • 2019-02-09
    • 1970-01-01
    • 2016-05-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多