【发布时间】:2020-09-05 02:09:02
【问题描述】:
我有大约 50k 可以使用清单文件从 S3 读取。我必须将每个(JSON)文件的内容读入数据框并处理文件(将它们标准化为数据库表)。现在我有一个工作代码需要大约 15 小时来处理 50k 个文件。我必须将其作为日常工作来运行。有什么方法可以并行处理大量文件或有什么更好的方法来加快处理速度?
用代码更新问题
import json
import pandas as pd
import os
import gzip
import boto3
from datetime import datetime,timezone,timedelta
session = boto3.session.Session()
s3 = session.resource('s3')
client = session.client('s3')
#read the S3 inventory report, get the keys of files that are modified on sysdate-1
dt=(datetime.now(timezone.utc) + timedelta(days=-1)).strftime('%Y-%m-%d')
dtz=dt+'T00-00Z'
print('reading inventory report for', dtz)
inventory_bucket = 'xxx'
manifest_key='s3-bucket'+dtz+'/manifest.json'
manifest = json.load(s3.Object(inventory_bucket, manifest_key).get()['Body'])
df=pd.DataFrame()
for obj in manifest['files']:
gzip_obj = s3.Object(bucket_name=inventory_bucket, key=obj['key'])
print('csv obj:', gzip_obj)
buffer = gzip.open(gzip_obj.get()["Body"], mode='rt')
reader = pd.read_csv(buffer)
reader.columns=['id','key','modified_date']
print('converting csv obj to dataframe')
df=df.append(reader[(reader.modified_date>dt)])
source_keys=list(df['key'])
s3_bucket_source='yyy'
#download the files to a tmp folder
local='/tmp/'
print("downloading from S3")
for k in source_keys:
k_path=k.replace('/', '-')
dest_pathname = os.path.join(local, k_path)
if not os.path.exists(os.path.dirname(dest_pathname)):
os.makedirs(os.path.dirname(dest_pathname))
client.download_file(s3_bucket_source, k, dest_pathname)
#read the latest jsons from tmp folder
path_to_json = os.path.expanduser('/tmp/')
json_files = [pos_json for pos_json in os.listdir(path_to_json) if pos_json.endswith('.json')]
#loop through the jsons and normalize each file contents into df_table1, df_table2, df_table3
for index, js in enumerate(json_files):
with open(os.path.join(path_to_json, js)) as json_file:
print('processing file:', js)
d=json.loads(json_file.read())
v=d['key1'][0]['key2']
if isinstance(v, list):
for i, v2 in enumerate(v):
df_table1, df_table2, df_table3 = normalizeJSON(d,i,v2['id'])
#normalize is the custom function to split the nested json into relational tables
else:
print('invalid json')
我使用 S3 库存报告从清单中获取最近修改文件的列表,将文件下载到 tmp 位置并一一阅读以完成我需要做的事情
【问题讨论】:
-
您在 AWS 基础设施上运行它,或者您想在您的工作站上并行化?
-
@Marcin 我在 AWS Infrastructure 上运行代码,目前使用 AWS Glue
-
@Smile 用代码 sn-p 更新了问题
-
最慢的部分是按顺序下载Python中的所有文件。一个简单的改进是使用 gnu parallels 和 awscli 进行下载。另一种选择是使用 gevent 来并行下载许多文件——请参阅此代码以获取类似的示例gist.github.com/pcdinh/3495566
-
如果您使用 Glue 处理数据,为什么不使用 Spark?带有 EMRFS 的 spark 将是一个不错的选择。您不必手动下载数据
标签: json pandas amazon-web-services amazon-s3 boto3