【问题标题】:Reading S3 files from a manifest and processing them in parallel using Pandas从清单中读取 S3 文件并使用 Pandas 并行处理它们
【发布时间】:2020-09-05 02:09:02
【问题描述】:

我有大约 50k 可以使用清单文件从 S3 读取。我必须将每个(JSON)文件的内容读入数据框并处理文件(将它们标准化为数据库表)。现在我有一个工作代码需要大约 15 小时来处理 50k 个文件。我必须将其作为日常工作来运行。有什么方法可以并行处理大量文件或有什么更好的方法来加快处理速度?

用代码更新问题

import json 
import pandas as pd 
import os
import gzip
import boto3
from datetime import datetime,timezone,timedelta


session = boto3.session.Session()
s3 = session.resource('s3')
client = session.client('s3')

#read the S3 inventory report, get the keys of files that are modified on sysdate-1   
dt=(datetime.now(timezone.utc) + timedelta(days=-1)).strftime('%Y-%m-%d') 
dtz=dt+'T00-00Z'
print('reading inventory report for', dtz)
inventory_bucket = 'xxx'
manifest_key='s3-bucket'+dtz+'/manifest.json'
manifest = json.load(s3.Object(inventory_bucket, manifest_key).get()['Body'])
df=pd.DataFrame()
for obj in manifest['files']:
        gzip_obj = s3.Object(bucket_name=inventory_bucket, key=obj['key'])
        print('csv obj:', gzip_obj)
        buffer = gzip.open(gzip_obj.get()["Body"], mode='rt')
        reader = pd.read_csv(buffer)
        reader.columns=['id','key','modified_date']
        print('converting csv obj to dataframe')
        df=df.append(reader[(reader.modified_date>dt)])
source_keys=list(df['key'])
s3_bucket_source='yyy'

#download the files to a tmp folder
local='/tmp/'
print("downloading from S3")
for k in source_keys:
  k_path=k.replace('/', '-')
  dest_pathname = os.path.join(local, k_path)
  if not os.path.exists(os.path.dirname(dest_pathname)):
      os.makedirs(os.path.dirname(dest_pathname))
      client.download_file(s3_bucket_source, k, dest_pathname)

#read the latest jsons from tmp folder
path_to_json =  os.path.expanduser('/tmp/') 
json_files = [pos_json for pos_json in os.listdir(path_to_json) if pos_json.endswith('.json')]

#loop through the jsons and normalize each file contents into df_table1, df_table2, df_table3
for index, js in enumerate(json_files):
    with open(os.path.join(path_to_json, js)) as json_file:
        print('processing file:', js)
        d=json.loads(json_file.read())
        v=d['key1'][0]['key2']
        if isinstance(v, list):
                for i, v2 in enumerate(v): 
                    df_table1, df_table2, df_table3 = normalizeJSON(d,i,v2['id']) 
                    #normalize is the custom function to split the nested json into relational tables 
        else:
            print('invalid json')

我使用 S3 库存报告从清单中获取最近修改文件的列表,将文件下载到 tmp 位置并一一阅读以完成我需要做的事情

【问题讨论】:

  • 您在 AWS 基础设施上运行它,或者您想在您的工作站上并行化?
  • @Marcin 我在 AWS Infrastructure 上运行代码,目前使用 AWS Glue
  • @Smile 用代码 sn-p 更新了问题
  • 最慢的部分是按顺序下载Python中的所有文件。一个简单的改进是使用 gnu parallels 和 awscli 进行下载。另一种选择是使用 gevent 来并行下载许多文件——请参阅此代码以获取类似的示例gist.github.com/pcdinh/3495566
  • 如果您使用 Glue 处理数据,为什么不使用 Spark?带有 EMRFS 的 spark 将是一个不错的选择。您不必手动下载数据

标签: json pandas amazon-web-services amazon-s3 boto3


【解决方案1】:

您使用multiprocessing module 并行下载 JSON 文件。 您的代码包含 3 个 for 块。您可以并行执行其中的每一项。以下是如何为第一个 for 执行此操作的示例:

第一个

df=pd.DataFrame()
for obj in manifest['files']:
        gzip_obj = s3.Object(bucket_name=inventory_bucket, key=obj['key'])
        print('csv obj:', gzip_obj)
        buffer = gzip.open(gzip_obj.get()["Body"], mode='rt')
        reader = pd.read_csv(buffer)
        reader.columns=['id','key','modified_date']
        print('converting csv obj to dataframe')
        df=df.append(reader[(reader.modified_date>dt)])

变成:

def get_reader(obj):
    gzip_obj = s3.Object(bucket_name=inventory_bucket, key=obj['key'])
    print('csv obj:', gzip_obj)
    buffer = gzip.open(gzip_obj.get()["Body"], mode='rt')
    reader = pd.read_csv(buffer)
    reader.columns=['id','key','modified_date']
    print('converting csv obj to dataframe')
    return reader[(reader.modified_date>dt)]

num_of_workers = 4
df = pd.DataFrame()
with multiprocessing.Pool(num_of_workers) as p:
    results = p.map(get_reader, manifest['files'])

for result in results:
    df = df.append(result)

你可以对其他 for 块做同样的事情

【讨论】:

  • 当我尝试并行化我的最后一个 for 时,我收到以下错误 TypeError: map() takes from 3 to 4 positional arguments but 5 were given
  • 感谢您的回复@pakallis。我修复了错误并且我能够运行代码。代码中的第三个 for 块返回 8 个数据帧。但 map 函数的输出是一个列表。因此,这种情况下的结果是 8 个数据帧的元组。有什么方法可以让 map 函数输出 8 个数据帧?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-04-27
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2013-07-17
  • 2017-04-24
相关资源
最近更新 更多