【问题标题】:Spark: Writing RDD Results to File System is SlowSpark:将 RDD 结果写入文件系统很慢
【发布时间】:2019-05-07 15:50:33
【问题描述】:

我正在使用 Scala 开发 Spark 应用程序。我的应用程序只包含一个需要改组的操作(即cogroup)。它在合理的时间完美地运行。我面临的问题是当我想将结果写回文件系统时;由于某种原因,它比运行实际程序需要更长的时间。起初,我尝试在不重新分区或合并的情况下编写结果,我意识到生成的文件数量很大,所以我认为这是问题所在。我在编写之前尝试重新分区(和合并),但应用程序需要很长时间才能执行这些任务。我知道重新分区(和合并)代价高昂,但我的做法是否正确?如果不是,请您给我一些正确方法的提示。

注意事项

  • 我的文件系统是 Amazon S3。
  • 我的输入数据大小约为 130GB。
  • 我的集群包含一个驱动节点和五个从节点,每个节点有 16 个内核和 64 GB 的 RAM。
  • 我为我的工作分配了 15 个执行程序,每个执行程序有 5 个内核和 19GB 的 RAM。

P.S.我尝试使用 Dataframes,同样的问题。

这是我的代码示例,以防万一:

val sc = spark.sparkContext

// loading the samples
val samplesRDD = sc
  .textFile(s3InputPath)
  .filter(_.split(",").length > 7)
  .map(parseLine)
  .filter(_._1.nonEmpty) // skips any un-parsable lines


// pick random samples 
val samples1Ids = samplesRDD
  .map(_._2._1) // map to id
  .distinct
  .takeSample(withReplacement = false, 100, 0)

// broadcast it to the cluster's nodes
val samples1IdsBC = sc broadcast samples1Ids

val samples1RDD = samplesRDD
  .filter(samples1IdsBC.value contains _._2._1)

val samples2RDD = samplesRDD
  .filter(sample => !samples1IdsBC.value.contains(sample._2._1))

// compute
samples1RDD
  .cogroup(samples2RDD)
  .flatMapValues { case (left, right) =>
    left.map(sample1 => (sample1._1, right.filter(sample2 => isInRange(sample1._2, sample2._2)).map(_._1)))
  }
  .map {
    case (timestamp, (sample1Id, sample2Ids)) =>
      s"$timestamp,$sample1Id,${sample2Ids.mkString(";")}"
  }

  .repartition(10)
  .saveAsTextFile(s3OutputPath)

更新

这是使用 Dataframes 的相同代码:

// loading the samples
val samplesDF = spark
  .read
  .csv(inputPath)
  .drop("_c1", "_c5", "_c6", "_c7", "_c8")
  .toDF("id", "timestamp", "x", "y")
  .withColumn("x", ($"x" / 100.0f).cast(sql.types.FloatType))
  .withColumn("y", ($"y" / 100.0f).cast(sql.types.FloatType))

// pick random ids as samples 1
val samples1Ids = samplesDF
  .select($"id") // map to the id
  .distinct
  .rdd
  .takeSample(withReplacement = false, 1000)
  .map(r => r.getAs[String]("id"))

// broadcast it to the executor
val samples1IdsBC = sc broadcast samples1Ids

// get samples 1 and 2
val samples1DF = samplesDF
  .where($"id" isin (samples1IdsBC.value: _*))

val samples2DF = samplesDF
  .where(!($"id" isin (samples1IdsBC.value: _*)))

samples2DF
  .withColumn("combined", struct("id", "lng", "lat"))
  .groupBy("timestamp")
  .agg(collect_list("combined").as("combined_list"))
  .join(samples1DF, Seq("timestamp"), "rightouter")
  .map {
    case Row(timestamp: String, samples: mutable.WrappedArray[GenericRowWithSchema], sample1Id: String, sample1X: Float, sample1Y: Float) =>
      val sample2Info = samples.filter {
        case Row(_, sample2X: Float, sample2Y: Float) =>
          Misc.isInRange((sample2X, sample2Y), (sample1X, sample1Y), 20)
        case _ => false
      }.map {
        case Row(sample2Id: String, sample2X: Float, sample2Y: Float) =>
          s"$sample2Id:$sample2X:$sample2Y"
        case _ => ""
      }.mkString(";")

      (timestamp, sample1Id, sample1X, sample1Y, sample2Info)
    case Row(timestamp: String, _, sample1Id: String, sample1X: Float, sample1Y: Float) => // no overlapping samples
      (timestamp, sample1Id, sample1X, sample1Y, "")
    case _ =>
      ("error", "", 0.0f, 0.0f, "")
  }
  .where($"_1" notEqual "error")
  //      .show(1000, truncate = false)
  .write
  .csv(outputPath)

【问题讨论】:

  • 检查您的执行程序处理并确保它们都接收到相同数量的数据。并尝试更改您的文件系统,以防亚马逊出于任何原因对您的流程施加压力。

标签: scala apache-spark amazon-s3 rdd


【解决方案1】:

这里的问题是,通常通过重命名文件触发提交任务、作业以及在 S3 上重命名非常非常慢。您写入的数据越多,作业结束所需的时间就越长。这就是你所看到的。

修复:切换到不进行任何重命名的 the S3A committers

一些调整选项可大量增加 IO 中的线程数、提交和连接池大小 fs.s3a.threads.max from 10 to something bigger fs.s3a.committer.threads -number files committed by a POST in parallel; default is 8 fs.s3a.connection.maximum + try (fs.s3a.committer.threads + fs.s3a.threads.max + 10)

这些都是相当小的,因为许多作业使用多个存储桶,如果每个作业都有大量数字,那么创建一个 s3a 客户端会非常昂贵......但是如果您有数千个文件,那么可能是值得的。

【讨论】:

  • 好的,我能够让它工作,我只需要使用 Dataframes 而不是 RDD,并将 Hadoop 升级到 v3。虽然我现在可以看到在 S3 中没有创建任何临时文件(这意味着 s3 提交者正在按应有的方式工作),但在编写结果时我仍然没有看到任何改进。我想知道是否还有其他问题。
  • 如果 _SUCCESS 文件是一些 JSON 统计信息,而不是 0 字节标记,您可以判断您是否已切换到新的提交者。它说什么
  • 我在 _SUCCESS 文件中得到了一个 JSON 对象,这就是我发现它实际上使用了 s3a 提交程序的方式。
  • 不错。好的,。在这一点上,您已经以最快的速度将工作提交给 S3。一个问题:它列出了所有提交的文件。多少个文件?可能是一些线程池/连接池调优可以加快最终提交
  • 文件数以千计。我尝试重新分区(和合并),但这些操作需要很长时间。
猜你喜欢
  • 2020-09-15
  • 1970-01-01
  • 2012-02-22
  • 1970-01-01
  • 2016-10-29
  • 1970-01-01
  • 2017-03-06
  • 1970-01-01
  • 2015-09-23
相关资源
最近更新 更多