【发布时间】:2014-05-23 01:10:05
【问题描述】:
TLDR;在 BQ 中查询 12.9MB,在 Python 中大约需要 540MB 内存。这大约呈线性增长。
我正在查询一些 bigQuery 表。在https://bigquery.cloud.google.com/运行以下查询
SELECT * FROM dataset1.table1, dataset1.table2
结果:
Query complete (5.2s elapsed, 12.9 MB processed)
大约有 150k 行数据。当我在 python 中执行相同的查询时,相同的查询最多使用 540Mb 的内存。如果我查询 300k 行,这会导致内存使用量翻倍。当我多次执行相同的查询时,ram 的使用不会改变。所以我最好的猜测是它使用了一些永远不会被释放的缓冲区。我测试了gc.collect() 是否有帮助,但没有。我还将我的数据转储到 json 中,该文件大约 25MB。所以我的问题是:为什么内存使用量这么大,有什么办法可以改变吗?
我的代码:
from apiclient.discovery import build
from oauth2client.file import Storage
from oauth2client.client import OAuth2WebServerFlow
from oauth2client.tools import run
import httplib2
import sys
projectId = '....'
bqCredentialsFile = 'bigquery_credentials.dat'
clientId = '....' # production
secret = '.......apps.googleusercontent.com ' # production
storage = Storage(bqCredentialsFile)
credentials = storage.get()
if credentials is None or credentials.invalid:
flow = OAuth2WebServerFlow(client_id=clientId, client_secret=secret, scope='https://www.googleapis.com/auth/bigquery')
credentials = run(flow, storage)
http = httplib2.Http()
http = credentials.authorize(http)
svc = build('bigquery', 'v2', http=http)
def getQueryResults(jobId, pageToken):
req = svc.jobs()
return req.getQueryResults(projectId=projectId, jobId=jobId, pageToken=pageToken).execute()
def query(queryString, priority='BATCH'):
req = svc.jobs()
body = {'query': queryString, 'maxResults': 100000, 'configuration': {'priority': priority}}
res = req.query(projectId=projectId, body=body).execute()
if 'rows' in res:
for row in res['rows']:
yield row
for _ in range(int(res['totalRows']) / 100000):
pageToken = res['pageToken']
res = getQueryResults(res['jobReference']['jobId'], pageToken=pageToken)
for row in res['rows']:
yield row
def querySome(tableKeys):
queryString = '''SELECT * FROM {0} '''.format(','.join(tableKeys))
if len(tableKeys) > 0:
return query(queryString, priority='BATCH')
if __name__ == '__main__':
import simplejson as json
tableNames = [['dataset1.table1', 'dataset1.table2']
output = list(querySome(tableNames))
fl = open('output.json', 'w')
fl.write(json.dumps(output))
fl.close()
print input('done')
【问题讨论】: