【问题标题】:PySpark and Pandas - Read partitioned csv files from S3 skipping empty onesPySpark 和 Pandas - 从 S3 读取分区的 csv 文件,跳过空文件
【发布时间】:2018-07-17 08:37:43
【问题描述】:

使用 PySpark,我有一些代码通过一堆查询运行。

for index, query in enumerate(query_map):  
  spark_dataframe.filter(
       query).write.csv('s3://OutputBucket/Csvs/Query_{}'.format(index)

我是 spark 新手,但我知道每个分区都将单独的 csv 文件写入一个名为 Query_[index] 的目录。现在我想收集这些文件并将它们放入熊猫数据框中。

import boto3
import pandas
s3 = boto3.resource('s3')
my_bucket = s3.Bucket("OutputBucket")
#Get all csv names
csvs = [
    "s3://OutputBucket/Csvs/"+\
    str(i.key) for i in my_bucket.objects.filter(Prefix='Query/')] 
to_concat = []
#Turn them into a dataframe
for csv in csvs:
    try:
        to_put_in.append(pandas.read_csv(csv))
    except pandas.errors.EmptyDataError:
        pass
#Join dataframe
my_big_dataframe = pandas.concat(to_concat)

问题是 Pyspark 会写出很多空文件。所以我的代码花了很多时间试图读入一个空的 csv 文件,只是为了抛出一个异常。

据我了解,df_spark.toPandas() 函数违背了 spark 的目的,因为它将其放入驱动程序内存并且不利用每个分区的 IO 并行化。这也违背了 spark 使用coalesce 的目的。因此,写入一堆 csv,然后手动读取它们并不是一个糟糕的主意。

tl;博士

我的问题是,是否有办法跳过 pyspark 写入的那些空 csv 文件:

  1. 也许 boto3 可以先按大小对它们进行排序,然后迭代直到我们处理好并清空文件?

  2. PySpark 中是否有任何方法可以在不破坏 pyspark 要点的情况下做到这一点?

【问题讨论】:

  • 您的最终目标是将所有零件文件连接到一个 csv 中吗?如果可以的话,您可以从命令行执行此操作。见this answer
  • 所以只保存到 hdfs 然后从头节点连接?
  • 我通常是这样的,但我不知道你的用例是什么。

标签: apache-spark amazon-s3 pyspark boto3 emr


【解决方案1】:

几个月前我遇到了类似的问题。用过这样的东西

# get the number of non-empty partitions in dataframe df
numNonEmptyPartitions = (df.rdd.glom().map(lambda x: 1 if len(x)>0 else 0).
                                reduce(lambda x,y: x+y))

df = df.coalesce(numNonEmptyPartitions)

现在,您将拥有所有非空分区。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2023-03-29
    • 2017-09-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-08-19
    • 2019-09-09
    • 2020-09-21
    相关资源
    最近更新 更多