【问题标题】:In MongoDB mapreduce, how can I flatten the values object?在 MongoDB mapreduce 中,如何展平值对象?
【发布时间】:2011-11-07 15:35:52
【问题描述】:

我正在尝试使用 MongoDB 来分析 Apache 日志文件。我从 Apache 访问日志中创建了一个 receipts 集合。以下是我的模型外观的简短摘要:

db.receipts.findOne()
{
    "_id" : ObjectId("4e57908c7a044a30dc03a888"),
    "path" : "/videos/1/show_invisibles.m4v",
    "issued_at" : ISODate("2011-04-08T00:00:00Z"),
    "status" : "200"
}

我编写了一个MapReduce function,它按issued_at 日期字段对所有数据进行分组。它总结了请求的总数,并提供了每个唯一路径的请求数的细分。以下是输出的示例:

db.daily_hits_by_path.findOne()
{
    "_id" : ISODate("2011-04-08T00:00:00Z"),
    "value" : {
        "count" : 6,
        "paths" : {
            "/videos/1/show_invisibles.m4v" : {
                "count" : 2
            },
            "/videos/1/show_invisibles.ogv" : {
                "count" : 3
            },
            "/videos/6/buffers_listed_and_hidden.ogv" : {
                "count" : 1
            }
        }
    }
}

我怎样才能使输出看起来像这样:

{
    "_id" : ISODate("2011-04-08T00:00:00Z"),
    "count" : 6,
    "paths" : {
        "/videos/1/show_invisibles.m4v" : {
            "count" : 2
        },
        "/videos/1/show_invisibles.ogv" : {
            "count" : 3
        },
        "/videos/6/buffers_listed_and_hidden.ogv" : {
            "count" : 1
        }
    }
}

【问题讨论】:

    标签: mongodb mapreduce


    【解决方案1】:

    目前不可能,但我建议对此案投票:https://jira.mongodb.org/browse/SERVER-2517

    【讨论】:

    【解决方案2】:

    充分利用以前的答案和 cmets:

    db.items.find().hint({_id: 1}).forEach(function(item) {
        db.items.update({_id: item._id}, item.value);
    });
    

    来自http://docs.mongodb.org/manual/core/update/#replace-existing-document-with-new-document
    “如果update 参数只包含字段和值对,则update() 方法将现有文档替换为update 参数中的文档,_id 字段除外。”

    因此您既不需要$unset value,也不需要列出每个字段。

    来自https://docs.mongodb.com/manual/core/read-isolation-consistency-recency/#cursor-snapshot “在某些情况下,MongoDB 游标可以多次返回同一文档。...在此字段或这些字段上使用唯一索引,以便查询将不超过一次返回每个文档。使用提示()查询以显式强制查询以使用该索引。”

    【讨论】:

    • 这会导致严重的竞争条件。
    • @DerekBrown,请解释或提供证明或指出此代码中的错误
    • db.items.update() 是异步的,因此外部调用(即db.items.find().hint()....foreach())的执行可以在所有更新完成之前终止。
    • @DerekBrown,上面的代码使用标准的同步 JS API。如果您使用异步驱动程序,那么您有责任等待所有异步操作完成。如果你想说更新是最终一致的(这与异步不同),并且默认写关注点不足以满足你的情况,那么请使用更严格的写关注点配置 - docs.mongodb.com/manual/reference/write-concern
    • 1) 官方不支持同步驱动。所以至少你应该提到你的代码需要你使用不同的 mongo 驱动程序。 2)写关注在这里不相关。问题不在于您创建的更新没有传播到其他分片,而是未来的查询(需要完成更新)尚未在 NodeJS 与之交互的单个实例上发出。
    【解决方案3】:

    AFAIK,按照设计,Mongo 的 map reduce 将在“值元组”中输出结果,我还没有看到任何可以配置“输出格式”的东西。也许可以使用 finalize() 方法。

    您可以尝试运行一个后期处理,该处理将使用

    重塑数据
    results.find({}).forEach( function(result) {
      results.update({_id: result._id}, {count: result.value.count, paths: result.value.paths})
    });
    

    是的,看起来很难看。我知道。

    【讨论】:

    • 有没有办法直接修改result对象/文档?
    【解决方案4】:

    您可以使用集合引用来编写 Dan 的代码:

        function clean(collection) { 
          collection.find().forEach( function(result) {
          var value = result.value;
          delete value._id;     
          collection.update({_id: result._id}, value);     
          collection.update({_id: result.id}, {$unset: {value: 1}} ) } )};
    

    【讨论】:

      【解决方案5】:

      类似于@ljonas 的方法,但不需要硬编码文档字段:

      db.results.find().forEach( function(result) {
          var value = result.value;
          delete value._id;
          db.results.update({_id: result._id}, value);
          db.results.update({_id: result.id}, {$unset: {value: 1}} )
      } );
      

      【讨论】:

        【解决方案6】:

        所有提出的解决方案都远非最优。到目前为止,您可以做到的最快速度是:

        var flattenMRCollection=function(dbName,collectionName) {
            var collection=db.getSiblingDB(dbName)[collectionName];
        
            var i=0;
            var bulk=collection.initializeUnorderedBulkOp();
            collection.find({ value: { $exists: true } }).addOption(16).forEach(function(result) {
                print((++i));
                //collection.update({_id: result._id},result.value);
        
                bulk.find({_id: result._id}).replaceOne(result.value);
        
                if(i%1000==0)
                {
                    print("Executing bulk...");
                    bulk.execute();
                    bulk=collection.initializeUnorderedBulkOp();
                }
            });
            bulk.execute();
        };
        

        然后调用它: flattenMRCollection("MyDB","MyMRCollection")

        这比顺序更新要快得多。

        【讨论】:

        • 注意:这是 MongoDB 2.6 中的新功能
        【解决方案7】:

        在尝试文森特的答案时,我发现了几个问题。基本上,如果您在 foreach 循环中执行更新,这会将文档移动到集合的末尾,并且光标将再次到达该文档(example)。如果使用$snapshot,则可以避免这种情况。因此,我在下面提供了一个 Java 示例。

        final List<WriteModel<Document>> bulkUpdate = new ArrayList<>();
        
        // You should enable $snapshot if performing updates within foreach
        collection.find(new Document().append("$query", new Document()).append("$snapshot", true)).forEach(new Block<Document>() {
            @Override
            public void apply(final Document document) {
                // Note that I used incrementing long values for '_id'. Change to String if
                // you used string '_id's
                long docId = document.getLong("_id");
                Document subDoc = (Document)document.get("value");
                WriteModel<Document> m = new ReplaceOneModel<>(new Document().append("_id", docId), subDoc);
                bulkUpdate.add(m);
        
                // If you used non-incrementing '_id's, then you need to use a final object with a counter.
                if(docId % 1000 == 0 && !bulkUpdate.isEmpty()) {
                    collection.bulkWrite(bulkUpdate);
                    bulkUpdate.removeAll(bulkUpdate);
                }
            }
        });
        // Fixing bug related to Vincent's answer.
        if(!bulkUpdate.isEmpty()) {
            collection.bulkWrite(bulkUpdate);
            bulkUpdate.removeAll(bulkUpdate);
        }
        

        注意:这个 sn-p 在我的机器上执行平均需要 7.4 秒,有 10 万条记录和 14 个属性(IMDB 数据集)。如果没有批处理,平均需要 25.2 秒。

        【讨论】:

          猜你喜欢
          • 2023-03-28
          • 2011-08-03
          • 1970-01-01
          • 2019-04-17
          • 1970-01-01
          • 1970-01-01
          • 2017-05-31
          • 1970-01-01
          • 1970-01-01
          相关资源
          最近更新 更多