【发布时间】:2016-12-06 22:56:39
【问题描述】:
我在文档中发现了几篇文章、文章、参考资料等,暗示您可以使用foreachPartition 访问特定分区。但是我还没有弄清楚如何处理给定分区中的所有数据。
我的目标是从数据库中选择一些数据,对其进行操作,按列中的唯一值进行分区,然后将每个分区作为一个专门命名的 jsonl 文件写入 s3 以供另一个系统访问。
repartitioned = myDataframe.repartition("processed_date")
repartitioned.foreachPartition(writePartitionToS3)
我尝试了很多方法来解析该数据,但似乎我只能在foreachPartition 中获取单个元组,并且分区本身没有界限以便有效地分离这些数据。
def writePartitionsToS3(partition):
for row in partition:
pprint (row)
产生(为简洁起见删除了几列):
行(entity_id=u'2315183', ... 处理日期=datetime.date(2015, 3, 25)) 行(entity_id=u'2315183', ... 处理日期=datetime.date(2015, 3, 25)) 行(entity_id=u'2315183', ... processes_date=datetime.date(2015, 3, 25)) 行(entity_id=u'2315183', ... processes_date=datetime.date(2015, 3, 25))
也有可能分区没有像我想象的那样定义,但我知道有一个内置的DataFrameWriter 可以按分区写入,但我不能使用它。我真的需要能够生成这样的命名文件,而不是 part-xxx 格式:
s3a://<bucket>/<prefix>/<date processed>.jsonl
我以这样一种方式分块数据,使得分区的大小相对较小(每个处理日期一个,每个实体选择一个作为它自己的 DataFrame),所以这不是问题,但我也不是真的想要到collect() 一个节点上的所有内容来解析分区列表,因为我想将文件并行写入s3。
更新:
我最终通过获取唯一值然后根据这些值过滤原始数据集来实际解决我的问题。请记住,如果数据集非常大,您永远不会想要这样做,但我可以选择,因为我在循环中创建小型数据框(从数据库中选择),然后处理这些块。
# Get a list of the unique values present
# in the processed_date column
uniqueProcessedDates = myDataframe.select('processed_date') \
.distinct().rdd.map(lambda r: r[0]).collect()
# For each unique processed date we want to
# filter records and then write them
for value in uniqueProcessedDates:
sortedRowsThisProcessedDate = myDataframe.filter(postgresDF.processed_date == date)
# some custom function to write the data
writeProcessedDatesToS3(sortedRowsThisProcessedDate.collect())
总而言之,我敢肯定有很多方法会导致效率极低。我正在考虑的一件事是通过需要写入每个文件的确切值集对每个 RDD 重新分区,因为必须以原子方式完成对 s3 的写入。我认为除此之外可能有助于避免在写入数据之前从多个节点收集数据。
【问题讨论】:
标签: apache-spark pyspark apache-spark-sql partitioning pyspark-sql