【问题标题】:How to retrieve the output file from Amazon S3 generated from EMR Pyspark back into Flask如何将 EMR Pyspark 生成的 Amazon S3 中的输出文件检索回 Flask
【发布时间】:2021-07-31 02:35:12
【问题描述】:

我目前正在尝试使用 pyspark 将我的 Flask 应用程序连接到 Amazon EMR。我将 AWS (https://docs.aws.amazon.com/code-samples/latest/catalog/code-catalog-python-example_code-emr.html) 中的示例用于 pyspark。我使用以下代码输出文件:

df.write.mode('overwrite').csv('s3://my-bucket/output')

来自 Amazon EMR 的输出文件存储在 Amazon S3 中,名称如下:

  1. part-00003-2e96c921-8459-4dc9-93e7-3c71eccd442f-c000.csv
  2. part-00007-2e96c921-8459-4dc9-93e7-3c71eccd442f-c000.csv
  3. part-00011-2e96c921-8459-4dc9-93e7-3c71eccd442f-c000.csv

我想将 CSV 文件读入我的 Flask 应用程序。由于文件名每次都不同,我应该如何阅读这些文件?有什么更聪明的方法吗?

【问题讨论】:

  • 读取特定前缀的对象名称,从flask中获取每个csv。
  • @Lamanus 如果我稍后再添加一个 EMR 步骤并将其保存在同一位置如何。我无法区分不同步骤的输出,因为它们都是带有随机名称的 part-0xxxxx。

标签: python amazon-web-services apache-spark pyspark amazon-emr


【解决方案1】:

我假设您正在尝试将它们读入一个数据帧。 (另外,根据您的评论,“part”前缀很常见)

s3 = boto3.resource('s3')
bucket = s3.Bucket('my-bucket')

prefix_objs = bucket.objects.filter(Prefix="output/part")

prefix_df = []

for obj in prefix_objs:
    try:
        key = obj.key
        body = obj.get()['Body'].read()
        temp = pd.read_csv(io.BytesIO(body),header=None, encoding='utf8',sep=',')        
        prefix_df.append(temp)
    except:
        continue

这将读取您存储桶中output 文件夹中带有前缀“part”的每个文件并添加到数组中。

之后,您可以将其连接为

pd.concat(prefix_df)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-10-15
    • 1970-01-01
    • 2011-11-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多