【问题标题】:Spark: Manipulate all of a specific RDD or DataFrame partition's dataSpark:操作所有特定 RDD 或 DataFrame 分区的数据
【发布时间】:2016-12-06 22:56:39
【问题描述】:

我在文档中发现了几篇文章、文章、参考资料等,暗示您可以使用foreachPartition 访问特定分区。但是我还没有弄清楚如何处理给定分区中的所有数据。

我的目标是从数据库中选择一些数据,对其进行操作,按列中的唯一值进行分区,然后将每个分区作为一个专门命名的 jsonl 文件写入 s3 以供另一个系统访问。

repartitioned = myDataframe.repartition("processed_date")
repartitioned.foreachPartition(writePartitionToS3)

我尝试了很多方法来解析该数据,但似乎我只能在foreachPartition 中获取单个元组,并且分区本身没有界限以便有效地分离这些数据。

def writePartitionsToS3(partition):
    for row in partition:
        pprint (row)

产生(为简洁起见删除了几列):

行(entity_id=u'2315183', ... 处理日期=datetime.date(2015, 3, 25)) 行(entity_id=u'2315183', ... 处理日期=datetime.date(2015, 3, 25)) 行(entity_id=u'2315183', ... processes_date=datetime.date(2015, 3, 25)) 行(entity_id=u'2315183', ... processes_date=datetime.date(2015, 3, 25))

也有可能分区没有像我想象的那样定义,但我知道有一个内置的DataFrameWriter 可以按分区写入,但我不能使用它。我真的需要能够生成这样的命名文件,而不是 part-xxx 格式:

s3a://<bucket>/<prefix>/<date processed>.jsonl

我以这样一种方式分块数据,使得分区的大小相对较小(每个处理日期一个,每个实体选择一个作为它自己的 DataFrame),所以这不是问题,但我也不是真的想要到collect() 一个节点上的所有内容来解析分区列表,因为我想将文件并行写入s3。


更新:

我最终通过获取唯一值然后根据这些值过滤原始数据集来实际解决我的问题。请记住,如果数据集非常大,您永远不会想要这样做,但我可以选择,因为我在循环中创建小型数据框(从数据库中选择),然后处理这些块。

# Get a list of the unique values present
# in the processed_date column
uniqueProcessedDates = myDataframe.select('processed_date') \
    .distinct().rdd.map(lambda r: r[0]).collect()

# For each unique processed date we want to
# filter records and then write them
for value in uniqueProcessedDates:
    sortedRowsThisProcessedDate = myDataframe.filter(postgresDF.processed_date == date)

    # some custom function to write the data
    writeProcessedDatesToS3(sortedRowsThisProcessedDate.collect())

总而言之,我敢肯定有很多方法会导致效率极低。我正在考虑的一件事是通过需要写入每个文件的确切值集对每个 RDD 重新分区,因为必须以原子方式完成对 s3 的写入。我认为除此之外可能有助于避免在写入数据之前从多个节点收集数据。

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql partitioning pyspark-sql


    【解决方案1】:

    访问没有边界。 DataFrame.repartition 使用哈希分区器来打乱数据,因此行的共现没有更广泛的意义。您可以在这里假设特定processed_date 的所有记录都位于特定分区上。

    你可以通过添加sortWithinPartitions来稍微改善一下情况:

    (myDataframe
        .repartition("processed_date")
        .sortWithinPartitions("processed_date"))
    

    能够一一访问单个日期的所有记录。

    另一个可能的改进是使用orderBy方法:

    myDataframe.orderBy("processed_date")
    

    这将导致日期连续,但仍无法访问边界。

    在这两种情况下,您都必须在迭代分区时手动检测边界。

    最后,如果你想要真正的控制使用RDDrepartitionAndSortWithinPartitions 方法。这将为您提供对数据分布的细粒度控制。您可以定义partitionFunc 以特定方式分发数据,因此预先没有分区边界。

    DataFrameWriter.partitionBy 使用了不同的机制,在这里对你没有用处。

    【讨论】:

    • 这确实让我再次行动起来,即使我最终使用了不同的解决方案。我认为repartitionAndSortWithinPartitions 处理数据的方法可能是最灵活的,尽管我花了一些时间来了解如何使用partitionFunc 操作数据。我还没有找到一个使用这种方法的真正清晰和简单的例子。
    猜你喜欢
    • 1970-01-01
    • 2019-02-11
    • 1970-01-01
    • 2017-08-16
    • 1970-01-01
    • 1970-01-01
    • 2020-09-18
    • 2020-02-28
    • 1970-01-01
    相关资源
    最近更新 更多