【问题标题】:Spark split a file into multiple folders based on a fieldSpark根据字段将文件拆分为多个文件夹
【发布时间】:2017-08-28 04:26:18
【问题描述】:

我正在尝试根据列将一组 S3 文件(如下所示)拆分为基于各个列的文件夹。我不确定下面的代码有什么问题。

column 1, column 2
20130401, value1
20130402, value2
20130403, value3

val newDataDF = sqlContext.read.parquet("s3://xxxxxxx-bucket/basefolder/")
    newDataDF.cache()
    val uniq_days = newDataDF.select(newDataDF("column1")).distinct.show()
    uniq_days.cache()
    uniq_days.foreach(x => {newDataDF.filter(newDataDF("column1") === x).write.save(s"s3://xxxxxx-bucket/partitionedfolder/$x/")})

你能帮忙吗?即使是 pyspark 版本也可以。 我正在寻找以下结构。

s3://xxxxxx-bucket/partitionedfolder/20130401/part-***

    column 1, column 2
    20130401, value 1
s3://xxxxxx-bucket/partitionedfolder/20130402/part-***

    column 1, column 2
    20130402, value 1
s3://xxxxxx-bucket/partitionedfolder/20130403/part-***

    column 1, column 2
    20130403, value 1

这是错误

org.apache.spark.SparkException: Job aborted due to stage failure: Task 22 in stage 82.0 failed 4 times, most recent failure: Lost task 22.3 in stage 82.0 (TID 2753

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
Caused by: java.lang.NullPointerException

更新当前解决方案:

val newDataDF = sqlContext.read.parquet("s3://xxxxxx-bucket/basefolder/")
newDataDF.cache()
val uniq_days = newDataDF.select(newDataDF("column1")).distinct.rdd.map(_.getString(0)).collect().toList
uniq_days.foreach(x => {newDataDF.filter(newDataDF("column1") === x).write.save(s"s3://xxxxxx-bucket/partitionedfolder/$x/")})

【问题讨论】:

    标签: scala apache-spark amazon-s3 split pyspark


    【解决方案1】:

    我认为你在存档中错过了“s”。 :)

    http://docs.scala-lang.org/overviews/core/string-interpolation.html#the-s-string-interpolator

    变化:

    write.save("s3://xxxxxx-bucket/partitionedfolder/$x/")})
    

    收件人:

    write.save(s"s3://xxxxxx-bucket/partitionedfolder/$x/")})
    

    还有更多问题,show 永远不会返回任何值。

    变化:

    val uniq_days = newDataDF.select(newDataDF("mevent_day")).distinct.show()
    uniq_days.cache()
    

    收件人:

    val uniq_days = newDataDF.select(newDataDF("mevent_day")).distinct.rdd.map(_.getString(0)).collect().toList
    

    【讨论】:

    • 我不这么认为..我现在添加了它,仍然是同样的错误。
    • 谢谢。我做了 val uniq_days = newDataDF.select(newDataDF("mevent_day")).distinct.collect().toList uniq_days.foreach(x => {newDataDF.filter(newDataDF("mevent_day") === x).write .save(s"s3://xxxxxxxx-bucket/partitionedfolder/$x/")}) 错误是 java.lang.RuntimeException: Unsupported literal type class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema [2013- 04-02]
    • 我猜这可能是一个原因 - stackoverflow.com/questions/32551919/…
    • uniq_days - 给我 res78: List[org.apache.spark.sql.Row] = List([2016-08-17], [2014-12-13], [2014-05- 27],…………
    • @androboy:我的错,已编辑。使用 val uniq_days = newDataDF.select(newDataDF("mevent_day")).distinct.rdd.map(_.getString(0)).collect().toList
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-09-13
    • 2012-07-04
    • 1970-01-01
    • 2011-12-25
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多