【发布时间】:2017-08-28 04:26:18
【问题描述】:
我正在尝试根据列将一组 S3 文件(如下所示)拆分为基于各个列的文件夹。我不确定下面的代码有什么问题。
column 1, column 2
20130401, value1
20130402, value2
20130403, value3
val newDataDF = sqlContext.read.parquet("s3://xxxxxxx-bucket/basefolder/")
newDataDF.cache()
val uniq_days = newDataDF.select(newDataDF("column1")).distinct.show()
uniq_days.cache()
uniq_days.foreach(x => {newDataDF.filter(newDataDF("column1") === x).write.save(s"s3://xxxxxx-bucket/partitionedfolder/$x/")})
你能帮忙吗?即使是 pyspark 版本也可以。 我正在寻找以下结构。
s3://xxxxxx-bucket/partitionedfolder/20130401/part-***
column 1, column 2
20130401, value 1
s3://xxxxxx-bucket/partitionedfolder/20130402/part-***
column 1, column 2
20130402, value 1
s3://xxxxxx-bucket/partitionedfolder/20130403/part-***
column 1, column 2
20130403, value 1
这是错误
org.apache.spark.SparkException: Job aborted due to stage failure: Task 22 in stage 82.0 failed 4 times, most recent failure: Lost task 22.3 in stage 82.0 (TID 2753
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
Caused by: java.lang.NullPointerException
更新当前解决方案:
val newDataDF = sqlContext.read.parquet("s3://xxxxxx-bucket/basefolder/")
newDataDF.cache()
val uniq_days = newDataDF.select(newDataDF("column1")).distinct.rdd.map(_.getString(0)).collect().toList
uniq_days.foreach(x => {newDataDF.filter(newDataDF("column1") === x).write.save(s"s3://xxxxxx-bucket/partitionedfolder/$x/")})
【问题讨论】:
标签: scala apache-spark amazon-s3 split pyspark