【问题标题】:Spark MLLib unable to write out to S3 : path already existsSpark MLLib 无法写入 S3:路径已存在
【发布时间】:2018-06-29 07:23:32
【问题描述】:

我在目录 /data/vw/ 的 S3 存储桶中有数据。每一行的格式为:

| abc:2 def:1 ghi:3 ...

我想把它转换成以下格式:

abc abc def ghi ghi ghi

新转换的行应该转到目录/data/spark中的S3

基本上,每个字符串重复冒号后面的次数。我正在尝试将 VW LDA 输入文件转换为相应的文件以供 Spark 的 LDA 库使用。

代码:

import org.apache.spark.{SparkConf, SparkContext}

object Vw2SparkLdaFormatConverter {

  def repeater(s: String): String = {
      val ssplit = s.split(':')
        (ssplit(0) + ' ')  * ssplit(1).toInt
  }

  def main(args: Array[String]) {
      val inputPath = args(0)
      val outputPath = args(1)

      val conf = new SparkConf().setAppName("FormatConverter")
      val sc = new SparkContext(conf)

      val vwdata = sc.textFile(inputPath)
      val sparkdata = vwdata.map(s => s.trim().split(' ').map(repeater).mkString)

      val coalescedSparkData = sparkdata.coalesce(100)
      coalescedSparkData.saveAsTextFile(outputPath)

      sc.stop()
  }
}

当我运行此(作为 AWS 中的 Spark EMR 作业)时,该步骤失败并出现异常:

18/01/20 00:16:28 ERROR ApplicationMaster: User class threw exception: org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory s3a://mybucket/data/spark already exists
org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory s3a://mybucket/data/spark already exists
    at org.apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:131)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1119)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1096)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1096)
    at ...

代码运行为:

spark-submit --class Vw2SparkLdaFormatConverter --deploy-mode cluster --master yarn --conf spark.yarn.submit.waitAppCompletion=true --executor-memory 4g s3a://mybucket/scripts/myscalajar.jar s3a://mybucket/data/vw s3a://mybucket/data/spark

我已尝试指定新的输出路径(/data/spark1 等),确保在步骤运行之前它不存在。即使那样它也不起作用。

我做错了什么?我是 Scala 和 Spark 的新手,所以我可能在这里忽略了一些东西。

【问题讨论】:

  • 顺便说一句,该作业确实在 S3 中创建了目录 /data/spark(我还在那里看到了一个 _temporary),但该作业在此之后不久就失败了。

标签: scala apache-spark amazon-s3 rdd apache-spark-mllib


【解决方案1】:

您可以转换为数据框,然后在启用覆盖的情况下保存。

coalescedSparkData.toDF.write.mode('overwrite').csv(outputPath)

或者如果你坚持使用RDD方法,你可以按照already in this answer描述的那样做

【讨论】:

  • 但为什么一开始就失败了?
  • 上述 sn-p 工作,虽然由于我仍然不知道原始代码的问题,我没有将其标记为解决方案(但赞成你的答案)。谢谢。
猜你喜欢
  • 2021-10-29
  • 2017-01-09
  • 2017-12-12
  • 2016-02-05
  • 2016-12-09
  • 2021-11-17
  • 2015-04-10
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多