【发布时间】:2012-06-13 13:48:17
【问题描述】:
拥有一个 GAE 数据存储类型,其中包含数十万个对象。想做几个涉及到的查询(涉及计数查询)。 Big Query 似乎非常适合这样做。
目前是否有一种使用 Big Query 查询实时 AppEngine 数据存储区的简便方法?
【问题讨论】:
标签: google-app-engine google-bigquery
拥有一个 GAE 数据存储类型,其中包含数十万个对象。想做几个涉及到的查询(涉及计数查询)。 Big Query 似乎非常适合这样做。
目前是否有一种使用 Big Query 查询实时 AppEngine 数据存储区的简便方法?
【问题讨论】:
标签: google-app-engine google-bigquery
您不能直接在 DataStore 实体上运行 BigQuery,但您可以编写一个 Mapper Pipeline,从 DataStore 中读取实体,将它们写入 Google Cloud Storage 中的 CSV,然后将它们提取到 BigQuery - 您甚至可以自动化过程。下面是一个使用 Mapper API 类的示例,仅用于 DataStore 到 CSV 步骤:
import re
import time
from datetime import datetime
import urllib
import httplib2
import pickle
from google.appengine.ext import blobstore
from google.appengine.ext import db
from google.appengine.ext import webapp
from google.appengine.ext.webapp.util import run_wsgi_app
from google.appengine.ext.webapp import blobstore_handlers
from google.appengine.ext.webapp import util
from google.appengine.ext.webapp import template
from mapreduce.lib import files
from google.appengine.api import taskqueue
from google.appengine.api import users
from mapreduce import base_handler
from mapreduce import mapreduce_pipeline
from mapreduce import operation as op
from apiclient.discovery import build
from google.appengine.api import memcache
from oauth2client.appengine import AppAssertionCredentials
#Number of shards to use in the Mapper pipeline
SHARDS = 20
# Name of the project's Google Cloud Storage Bucket
GS_BUCKET = 'your bucket'
# DataStore Model
class YourEntity(db.Expando):
field1 = db.StringProperty() # etc, etc
ENTITY_KIND = 'main.YourEntity'
class MapReduceStart(webapp.RequestHandler):
"""Handler that provides link for user to start MapReduce pipeline.
"""
def get(self):
pipeline = IteratorPipeline(ENTITY_KIND)
pipeline.start()
path = pipeline.base_path + "/status?root=" + pipeline.pipeline_id
logging.info('Redirecting to: %s' % path)
self.redirect(path)
class IteratorPipeline(base_handler.PipelineBase):
""" A pipeline that iterates through datastore
"""
def run(self, entity_type):
output = yield mapreduce_pipeline.MapperPipeline(
"DataStore_to_Google_Storage_Pipeline",
"main.datastore_map",
"mapreduce.input_readers.DatastoreInputReader",
output_writer_spec="mapreduce.output_writers.FileOutputWriter",
params={
"input_reader":{
"entity_kind": entity_type,
},
"output_writer":{
"filesystem": "gs",
"gs_bucket_name": GS_BUCKET,
"output_sharding":"none",
}
},
shards=SHARDS)
def datastore_map(entity_type):
props = GetPropsFor(entity_type)
data = db.to_dict(entity_type)
result = ','.join(['"%s"' % str(data.get(k)) for k in props])
yield('%s\n' % result)
def GetPropsFor(entity_or_kind):
if (isinstance(entity_or_kind, basestring)):
kind = entity_or_kind
else:
kind = entity_or_kind.kind()
cls = globals().get(kind)
return cls.properties()
application = webapp.WSGIApplication(
[('/start', MapReduceStart)],
debug=True)
def main():
run_wsgi_app(application)
if __name__ == "__main__":
main()
如果您将其附加到 IteratorPipeline 类的末尾:yield CloudStorageToBigQuery(output),您可以将生成的 csv 文件句柄通过管道传输到 BigQuery 提取管道中...如下所示:
class CloudStorageToBigQuery(base_handler.PipelineBase):
"""A Pipeline that kicks off a BigQuery ingestion job.
"""
def run(self, output):
# BigQuery API Settings
SCOPE = 'https://www.googleapis.com/auth/bigquery'
PROJECT_ID = 'Some_ProjectXXXX'
DATASET_ID = 'Some_DATASET'
# Create a new API service for interacting with BigQuery
credentials = AppAssertionCredentials(scope=SCOPE)
http = credentials.authorize(httplib2.Http())
bigquery_service = build("bigquery", "v2", http=http)
jobs = bigquery_service.jobs()
table_name = 'datastore_dump_%s' % datetime.utcnow().strftime(
'%m%d%Y_%H%M%S')
files = [str(f.replace('/gs/', 'gs://')) for f in output]
result = jobs.insert(projectId=PROJECT_ID,
body=build_job_data(table_name,files)).execute()
logging.info(result)
def build_job_data(table_name, files):
return {"projectId": PROJECT_ID,
"configuration":{
"load": {
"sourceUris": files,
"schema":{
# put your schema here
"fields": fields
},
"destinationTable":{
"projectId": PROJECT_ID,
"datasetId": DATASET_ID,
"tableId": table_name,
},
}
}
}
【讨论】:
使用新的(自 2013 年 9 月起)streaming inserts api,您可以将记录从您的应用导入 BigQuery。
数据会立即在 BigQuery 中可用,因此应该可以满足您的实时需求。
虽然这个问题现在有点老了,但对于遇到这个问题的人来说,这可能是一个更简单的解决方案
目前,虽然从本地开发服务器上让它工作充其量是不完整的。
【讨论】:
我们正在开发一个 Trusted Tester 程序,通过两个简单的操作从 Datastore 迁移到 BigQuery:
它会自动为您处理架构。
更多信息(申请):https://docs.google.com/a/google.com/spreadsheet/viewform?formkey=dHdpeXlmRlZCNWlYSE9BcE5jc2NYOUE6MQ
【讨论】:
对于 BigQuery,您必须将这些 Kind 导出为 CSV 或分隔记录结构,然后加载到 BigQuery 中,然后您就可以进行查询了。据我所知,没有任何设施可以查询实时 GAE 数据存储。
Biquery 是分析查询引擎,这意味着您无法更改记录。不允许更新或删除,只能追加。
【讨论】:
不,BigQuery 是一种不同的产品,需要将数据上传到其中。它不能在数据存储上工作。您可以使用 GQL 查询数据存储区。
【讨论】:
截至 2016 年,现在这很有可能!您必须执行以下操作:
有关此工作流程的完整示例,请参阅 this post!
【讨论】: