【问题标题】:Google App Engine: Using Big Query on datastore?Google App Engine:在数据存储上使用 Big Query?
【发布时间】:2012-06-13 13:48:17
【问题描述】:

拥有一个 GAE 数据存储类型,其中包含数十万个对象。想做几个涉及到的查询(涉及计数查询)。 Big Query 似乎非常适合这样做。

目前是否有一种使用 Big Query 查询实时 AppEngine 数据存储区的简便方法?

【问题讨论】:

    标签: google-app-engine google-bigquery


    【解决方案1】:

    您不能直接在 DataStore 实体上运行 BigQuery,但您可以编写一个 Mapper Pipeline,从 DataStore 中读取实体,将它们写入 Google Cloud Storage 中的 CSV,然后将它们提取到 BigQuery - 您甚至可以自动化过程。下面是一个使用 Mapper API 类的示例,仅用于 DataStore 到 CSV 步骤:

    import re
    import time
    from datetime import datetime
    import urllib
    import httplib2
    import pickle
    
    from google.appengine.ext import blobstore
    from google.appengine.ext import db
    from google.appengine.ext import webapp
    
    from google.appengine.ext.webapp.util import run_wsgi_app
    from google.appengine.ext.webapp import blobstore_handlers
    from google.appengine.ext.webapp import util
    from google.appengine.ext.webapp import template
    
    from mapreduce.lib import files
    from google.appengine.api import taskqueue
    from google.appengine.api import users
    
    from mapreduce import base_handler
    from mapreduce import mapreduce_pipeline
    from mapreduce import operation as op
    
    from apiclient.discovery import build
    from google.appengine.api import memcache
    from oauth2client.appengine import AppAssertionCredentials
    
    
    #Number of shards to use in the Mapper pipeline
    SHARDS = 20
    
    # Name of the project's Google Cloud Storage Bucket
    GS_BUCKET = 'your bucket'
    
    # DataStore Model
    class YourEntity(db.Expando):
      field1 = db.StringProperty() # etc, etc
    
    ENTITY_KIND = 'main.YourEntity'
    
    
    class MapReduceStart(webapp.RequestHandler):
      """Handler that provides link for user to start MapReduce pipeline.
      """
      def get(self):
        pipeline = IteratorPipeline(ENTITY_KIND)
        pipeline.start()
        path = pipeline.base_path + "/status?root=" + pipeline.pipeline_id
        logging.info('Redirecting to: %s' % path)
        self.redirect(path)
    
    
    class IteratorPipeline(base_handler.PipelineBase):
      """ A pipeline that iterates through datastore
      """
      def run(self, entity_type):
        output = yield mapreduce_pipeline.MapperPipeline(
          "DataStore_to_Google_Storage_Pipeline",
          "main.datastore_map",
          "mapreduce.input_readers.DatastoreInputReader",
          output_writer_spec="mapreduce.output_writers.FileOutputWriter",
          params={
              "input_reader":{
                  "entity_kind": entity_type,
                  },
              "output_writer":{
                  "filesystem": "gs",
                  "gs_bucket_name": GS_BUCKET,
                  "output_sharding":"none",
                  }
              },
              shards=SHARDS)
    
    
    def datastore_map(entity_type):
      props = GetPropsFor(entity_type)
      data = db.to_dict(entity_type)
      result = ','.join(['"%s"' % str(data.get(k)) for k in props])
      yield('%s\n' % result)
    
    
    def GetPropsFor(entity_or_kind):
      if (isinstance(entity_or_kind, basestring)):
        kind = entity_or_kind
      else:
        kind = entity_or_kind.kind()
      cls = globals().get(kind)
      return cls.properties()
    
    
    application = webapp.WSGIApplication(
                                         [('/start', MapReduceStart)],
                                         debug=True)
    
    def main():
      run_wsgi_app(application)
    
    if __name__ == "__main__":
      main()
    

    如果您将其附加到 IteratorPipeline 类的末尾:yield CloudStorageToBigQuery(output),您可以将生成的 csv 文件句柄通过管道传输到 BigQuery 提取管道中...如下所示:

    class CloudStorageToBigQuery(base_handler.PipelineBase):
      """A Pipeline that kicks off a BigQuery ingestion job.
      """
      def run(self, output):
    
    # BigQuery API Settings
    SCOPE = 'https://www.googleapis.com/auth/bigquery'
    PROJECT_ID = 'Some_ProjectXXXX'
    DATASET_ID = 'Some_DATASET'
    
    # Create a new API service for interacting with BigQuery
    credentials = AppAssertionCredentials(scope=SCOPE)
    http = credentials.authorize(httplib2.Http())
    bigquery_service = build("bigquery", "v2", http=http)
    
    jobs = bigquery_service.jobs()
    table_name = 'datastore_dump_%s' % datetime.utcnow().strftime(
        '%m%d%Y_%H%M%S')
    files = [str(f.replace('/gs/', 'gs://')) for f in output]
    result = jobs.insert(projectId=PROJECT_ID,
                        body=build_job_data(table_name,files)).execute()
    logging.info(result)
    
    def build_job_data(table_name, files):
      return {"projectId": PROJECT_ID,
              "configuration":{
                  "load": {
                      "sourceUris": files,
                      "schema":{
                          # put your schema here
                          "fields": fields
                          },
                      "destinationTable":{
                          "projectId": PROJECT_ID,
                          "datasetId": DATASET_ID,
                          "tableId": table_name,
                          },
                      }
                  }
              }
    

    【讨论】:

      【解决方案2】:

      使用新的(自 2013 年 9 月起)streaming inserts api,您可以将记录从您的应用导入 BigQuery。

      数据会立即在 BigQuery 中可用,因此应该可以满足您的实时需求。

      虽然这个问题现在有点老了,但对于遇到这个问题的人来说,这可能是一个更简单的解决方案

      目前,虽然从本地开发服务器上让它工作充其量是不完整的。

      【讨论】:

        【解决方案3】:

        我们正在开发一个 Trusted Tester 程序,通过两个简单的操作从 Datastore 迁移到 BigQuery:

        1. 使用数据存储管理员的备份功能备份数据存储
        2. 将备份直接导入 BigQuery

        它会自动为您处理架构。

        更多信息(申请):https://docs.google.com/a/google.com/spreadsheet/viewform?formkey=dHdpeXlmRlZCNWlYSE9BcE5jc2NYOUE6MQ

        【讨论】:

        • 这到底是怎么回事?关于 TTP 命运的任何更新?
        • 是的,已经有一段时间了
        • 这个也申请了两次,还没有“票”。
        • 现在可以公开获得。不知道从什么时候开始
        【解决方案4】:

        对于 BigQuery,您必须将这些 Kind 导出为 CSV 或分隔记录结构,然后加载到 BigQuery 中,然后您就可以进行查询了。据我所知,没有任何设施可以查询实时 GAE 数据存储。

        Biquery 是分析查询引擎,这意味着您无法更改记录。不允许更新或删除,只能追加。

        【讨论】:

        【解决方案5】:

        不,BigQuery 是一种不同的产品,需要将数据上传到其中。它不能在数据存储上工作。您可以使用 GQL 查询数据存储区。

        【讨论】:

          【解决方案6】:

          截至 2016 年,现在这很有可能!您必须执行以下操作:

          1. 在谷歌存储中创建一个新存储桶
          2. 使用 console.developers.google.com 上的数据库管理员备份实体我有一个完整的教程
          3. 前往 bigquery Web UI,并导入在步骤 1 中生成的文件。

          有关此工作流程的完整示例,请参阅 this post

          【讨论】:

            猜你喜欢
            • 1970-01-01
            • 2011-04-11
            • 2010-11-03
            • 2011-05-26
            • 2013-05-17
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            相关资源
            最近更新 更多