【问题标题】:MongoDB to BigQueryMongoDB 到 BigQuery
【发布时间】:2017-06-29 06:47:57
【问题描述】:

将数据从托管在 mlab 中的 MongoDB 导出到 google bigquery 的最佳方法是什么?

最初,我尝试从 MongoDB 向 BigQuery 进行一次加载,后来我正在考虑使用 Pub/Sub 将实时数据流到 bigquery。

我需要第一次从 mongodb 加载到 bigquery 的帮助。

【问题讨论】:

    标签: mongodb google-bigquery


    【解决方案1】:

    从对 MongoDB 文档的基本阅读来看,听起来您可以使用mongoexport 将您的数据库转储为 JSON。完成后,请参阅 BigQuery loading data 主题,了解如何在将 JSON 文件复制到 GCS 后从它们创建表。

    【讨论】:

    • 日期字段怎么样?当我将数据导出到日期/时间字段时,会显示为“$date”,这对于 BigQuery 字段名称是不可接受的。有什么解决方法吗?
    • 不幸的是,我认为您需要先以某种方式重命名它们。
    • 谢谢你,Elliott,我找到了修复它的方法,并将它作为这个问题的另一个答案提交,这可能会在未来帮助像我这样的人:-)
    【解决方案2】:

    在我看来,最好的做法是构建自己的提取器。这可以使用您选择的语言来完成,您可以提取为 CSV 或 JSON。

    但是,如果您寻求一种快速的方式,并且如果您的数据不是很大并且可以容纳在一台服务器中,那么我建议使用mongoexport。假设您有一个简单的文档结构,如下所示:

    {
        "_id" : "tdfMXH0En5of2rZXSQ2wpzVhZ",
        "statuses" : [ 
            {
                "status" : "dc9e5511-466c-4146-888a-574918cc2534",
                "score" : 53.24388894
            }
        ],
        "stored_at" : ISODate("2017-04-12T07:04:23.545Z")
    }
    

    然后您需要定义您的 BigQuery Schema (mongodb_schema.json),例如:

    $ cat > mongodb_schema.json <<EOF
    [
        { "name":"_id", "type": "STRING" },
        { "name":"stored_at", "type": "record", "fields": [
            { "name":"date", "type": "STRING" }
        ]},
        { "name":"statuses", "type": "record", "mode": "repeated", "fields": [
            { "name":"status", "type": "STRING" },
            { "name":"score", "type": "FLOAT" }
        ]}
    ]
    EOF
    

    现在,有趣的部分开始了 :-) 从 MongoDB 中以 JSON 格式提取数据。假设您有一个副本集名称为statuses 的集群,您的数据库为sample,您的集合为status

    mongoexport \
        --host statuses/db-01:27017,db-02:27017,db-03:27017 \
        -vv \
        --db "sample" \
        --collection "status" \
        --type "json" \
        --limit 100000 \
        --out ~/sample.json
    

    正如您在上面看到的,我将输出限制为 10 万条记录,因为我建议您先运行示例并加载到 BigQuery,然后再对所有数据执行此操作。运行上述命令后,您的示例数据应该在 sample.json 但有一个字段 $date 会导致您在加载到 BigQuery 时出错。为了解决这个问题,我们可以使用sed 将它们替换为简单的字段名称:

    # Fix Date field to make it compatible with BQ
    sed -i 's/"\$date"/"date"/g' sample.json
    

    现在您可以使用以下命令压缩、上传到 Google Cloud Storage (GCS),然后加载到 BigQuery:

    # Compress for faster load
    gzip sample.json
    
    # Move to GCloud
    gsutil mv ./sample.json.gz gs://your-bucket/sample/sample.json.gz
    
    # Load to BQ
    bq load \
        --source_format=NEWLINE_DELIMITED_JSON \
        --max_bad_records=999999 \
        --ignore_unknown_values=true \
        --encoding=UTF-8 \
        --replace \
        "YOUR_DATASET.mongodb_sample" \
        "gs://your-bucket/sample/*.json.gz" \
        "mongodb_schema.json"
    

    如果一切正常,则返回并从 mongoexport 命令中删除 --limit 100000 并再次重新运行上述命令以加载所有内容而不是 100k 样本。

    替代解决方案:

    如果您想要更多的灵活性并且不关心性能,那么您也可以使用mongo CLI 工具。通过这种方式,您可以在 JavaScript 中编写提取逻辑并针对您的数据执行它,然后将输出发送到 BigQuery。这是我为同一过程所做的,但使用 JavaScript 以 CSV 格式输出,因此我可以更轻松地将其加载到 BigQuery:

    # Export Logic in JavaScript
    cat > export-csv.js <<EOF
    var size = 100000;
    var maxCount = 1;
    for (x = 0; x < maxCount; x = x + 1) {
        var recToSkip = x * size;
        db.entities.find().skip(recToSkip).limit(size).forEach(function(record) {
            var row = record._id + "," + record.stored_at.toISOString();;
            record.statuses.forEach(function (l) {
                print(row + "," + l.status + "," + l.score)
            });
        });
    }
    EOF
    
    # Execute on Mongo CLI
    _MONGO_HOSTS="db-01:27017,db-02:27017,db-03:27017/sample?replicaSet=statuses"
    mongo --quiet \
        "${_MONGO_HOSTS}" \
        export-csv.js \
        | split -l 500000 --filter='gzip > $FILE.csv.gz' - sample_
    
    # Load all Splitted Files to Google Cloud Storage
    gsutil -m mv ./sample_* gs://your-bucket/sample/
    
    # Load files to BigQuery
    bq load \
        --source_format=CSV \
        --max_bad_records=999999 \
        --ignore_unknown_values=true \
        --encoding=UTF-8 \
        --replace \
        "YOUR_DATASET.mongodb_sample" \
        "gs://your-bucket/sample/sample_*.csv.gz" \
        "ID,StoredDate:DATETIME,Status,Score:FLOAT"
    

    提示: 在上面的脚本中,我做了一个小技巧,通过管道输出能够将输出拆分为多个文件,并带有 sample_ 前缀。同样在拆分过程中,它会对输出进行 GZip 压缩,因此您可以更轻松地将其加载到 GCS。

    【讨论】:

    • 感谢您的解决方案。太棒了。但是这种方法存在一个问题。此处 BigQuery 的架构已明确修复。但从逻辑上讲,MongoDB 的架构永远不会固定。所以我们不能依赖固定模式文件来进行 bigquery。其次,新列或嵌套列在 mongoDB 中的新/最近数据中很有可能存在。然后一种方法是每次在 MongoDB 中存储一个全新的 key 时重复更改 bigquery 模式文件的模式,但是您将如何处理 bigquery 中已经上传的数据。很想听听你的想法。
    • 我完全理解你的观点,这就是我开始回答的主要原因,建议你构建一个自定义提取器。 BigQuery 可以支持嵌套字段,因此这不是问题。为了在 BigQuery 中设计灵活的架构,我建议构建包含两部分的架构。将固定且大部分不会更改的公共字段和一个类型为 ARRAY> 的“元”字段。这样,您可以以键/值对的形式推送任何自定义值,并将它们全部以数组形式存储在“元”字段中。我经常使用这种技术,它对你来说可能是一个不错的选择
    【解决方案3】:

    您可以从 MongoDB 和 stream it to BigQuery 读取数据。您可以在 NodeJS here 中找到一个示例。

    这是链接示例的扩展,可防止重复记录(只要它们仍在流式缓冲区中):

        const { BigQuery } = require('@google-cloud/bigquery');
        const bigqueryClient = new BigQuery();
    
        ...
    
        const jsonData = // Array of documents from MongoDB
    
        const inputRows = jsonData.map(row => ({
          insertId: row._id,
          json: row
        }));
        const insertOptions = {
          raw: true
        };
    
        await bigqueryClient
          .dataset(datasetId)
          .table(tableId)
          .insert(inputRows, insertOptions);
    
    

    【讨论】:

    • 这段代码在哪里运行?在mongodb服务器上?
    • 您必须在自己的服务器或计算机上运行它。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-05-21
    • 1970-01-01
    • 2019-04-22
    • 1970-01-01
    • 2023-02-04
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多