【问题标题】:how to make saveAsTextFile NOT split output into multiple file?如何使 saveAsTextFile 不将输出拆分为多个文件?
【发布时间】:2014-08-13 19:23:49
【问题描述】:

在 Spark 中使用 Scala 时,每当我使用 saveAsTextFile 转储结果时,它似乎会将输出拆分为多个部分。我只是向它传递一个参数(路径)。

val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap)
year.saveAsTextFile("year")
  1. 输出的数量是否与它使用的减速器数量相对应?
  2. 这是否意味着输出被压缩?
  3. 我知道我可以使用 bash 将输出组合在一起,但是是否可以选择将输出存储在单个文本文件中,而不进行拆分??我查看了 API 文档,但并没有说明太多。

【问题讨论】:

  • 如果文件很大,在大数据中只使用一个文件通常是不好的做法。
  • 如果输出是排序文件,那么最好的做法是什么?将其保存为文件集合,并使许多输出文件名成为某种索引(即第一个文件命名为“aa”,中间文件命名为“fg”,最后一个文件命名为“zzy”)?
  • 通常情况下,繁重的 spark 作业只会产生非常小的输出(聚合、kpis、流行度等),这些输出是在 hdfs 上产生的,但很可能后者被无关的应用程序使用到大数据。在这种情况下,拥有一个用于传输和使用的命名良好的单个文件更清洁、更容易。

标签: scala apache-spark


【解决方案1】:

将其保存为多个文件的原因是因为计算是分布式的。如果输出足够小,以至于您认为可以将其安装在一台机器上,那么您可以使用

结束您的程序
val arr = year.collect()

然后将生成的数组保存为文件,另一种方法是使用自定义分区器partitionBy,并使其使所有内容都转到一个分区,但不建议这样做,因为您不会获得任何并行化.

如果您需要使用saveAsTextFile 保存文件,您可以使用coalesce(1,true).saveAsTextFile()。这基本上意味着进行计算然后合并到 1 个分区。您还可以使用repartition(1),它只是coalesce 的包装器,并将shuffle 参数设置为true。通过查看RDD.scala 的来源,我发现了大部分内容,你应该看看。

【讨论】:

  • 如何将数组保存为文本文件?数组没有 saveAsTextFile 函数。仅用于 RDD。
  • @user2773013 很好的方法是coalesce 或我建议的partition 方法,但是如果它只在一个节点上,那么存储在hdfs上真的没有意义,这就是为什么使用collect真的是正确的方法
  • 非常有用的答案....在我读过的教程中没有看到 partitionBy 或 coalesce...
【解决方案2】:

对于那些使用更大数据集的人:

  • rdd.collect() 不应在这种情况下使用,因为它会将所有数据收集为驱动程序中的 Array,这是最容易耗尽内存的方法。

  • 也不应使用rdd.coalesce(1).saveAsTextFile(),因为上游阶段的并行性将丢失,无法在存储数据的单个节点上执行。

  • rdd.coalesce(1, shuffle = true).saveAsTextFile() 是最好的简单选项,因为它将保持上游任务的处理并行,然后只对一个节点执行 shuffle(rdd.repartition(1).saveAsTextFile() 是精确的同义词)。

  • rdd.saveAsSingleTextFile()(如下提供)还允许将 rdd 存储在具有特定名称的单个文件中,同时保持rdd.coalesce(1, shuffle = true).saveAsTextFile() 的并行属性。

    李>

rdd.coalesce(1, shuffle = true).saveAsTextFile("path/to/file.txt") 的不便之处在于它实际上生成了一个路径为path/to/file.txt/part-00000 而不是path/to/file.txt 的文件。

下面的解决方案rdd.saveAsSingleTextFile("path/to/file.txt")实际上会产生一个路径为path/to/file.txt的文件:

package com.whatever.package

import org.apache.spark.rdd.RDD
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.hadoop.io.compress.CompressionCodec

object SparkHelper {

  // This is an implicit class so that saveAsSingleTextFile can be attached to
  // SparkContext and be called like this: sc.saveAsSingleTextFile
  implicit class RDDExtensions(val rdd: RDD[String]) extends AnyVal {

    def saveAsSingleTextFile(path: String): Unit =
      saveAsSingleTextFileInternal(path, None)

    def saveAsSingleTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit =
      saveAsSingleTextFileInternal(path, Some(codec))

    private def saveAsSingleTextFileInternal(
        path: String, codec: Option[Class[_ <: CompressionCodec]]
    ): Unit = {

      // The interface with hdfs:
      val hdfs = FileSystem.get(rdd.sparkContext.hadoopConfiguration)

      // Classic saveAsTextFile in a temporary folder:
      hdfs.delete(new Path(s"$path.tmp"), true) // to make sure it's not there already
      codec match {
        case Some(codec) => rdd.saveAsTextFile(s"$path.tmp", codec)
        case None        => rdd.saveAsTextFile(s"$path.tmp")
      }

      // Merge the folder of resulting part-xxxxx into one file:
      hdfs.delete(new Path(path), true) // to make sure it's not there already
      FileUtil.copyMerge(
        hdfs, new Path(s"$path.tmp"),
        hdfs, new Path(path),
        true, rdd.sparkContext.hadoopConfiguration, null
      )
      // Working with Hadoop 3?: https://stackoverflow.com/a/50545815/9297144

      hdfs.delete(new Path(s"$path.tmp"), true)
    }
  }
}

可以这样使用:

import com.whatever.package.SparkHelper.RDDExtensions

rdd.saveAsSingleTextFile("path/to/file.txt")
// Or if the produced file is to be compressed:
import org.apache.hadoop.io.compress.GzipCodec
rdd.saveAsSingleTextFile("path/to/file.txt.gz", classOf[GzipCodec])

这个sn-p:

  • 首先将带有rdd.saveAsTextFile("path/to/file.txt") 的rdd 存储在一个临时文件夹path/to/file.txt.tmp 中,就好像我们不想将数据存储在一个文件中一样(这使得上游任务的处理保持并行)

    李>
  • 然后,仅使用hadoop file system api,我们继续处理不同输出文件的merge (FileUtil.copyMerge()),以创建我们的最终输出单个文件path/to/file.txt

    李>

【讨论】:

    【解决方案3】:

    您可以先拨打coalesce(1),然后拨打saveAsTextFile() - 但如果您有大量数据,这可能是个坏主意。每次拆分都会生成单独的文件,就像在 Hadoop 中一样,以便让单独的映射器和减速器写入不同的文件。仅当您的数据很少时,只有一个输出文件才是一个好主意,在这种情况下,您也可以像 @aaronman 所说的那样执行 collect()。

    【讨论】:

    • 很好,没有想到 coalesce 比弄乱分区器更干净,话虽如此,我仍然认为如果您的目标是将它放入一个文件 collect 可能是正确的方法做吧
    • 这行得通。但是,如果你使用 coalesce,那意味着你只使用了 1 个 reducer。这不会因为只使用 1 个减速器而减慢进程吗??
    • 是的,但这就是您所要求的。 Spark 每个分区输出一个文件。另一方面,你为什么关心文件的数量?在 spark 中读取文件时,您只需指定父目录,所有分区都被读取为单个 RDD
    • 请不要coalesce(1)unless you know what you are doing
    【解决方案4】:

    正如其他人所提到的,您可以收集或合并您的数据集以强制 Spark 生成单个文件。但这也限制了可以并行处理数据集的 Spark 任务的数量。我更喜欢让它在输出 HDFS 目录中创建一百个文件,然后使用hadoop fs -getmerge /hdfs/dir /local/file.txt 将结果提取到本地文件系统中的单个文件中。当然,当您的输出是相对较小的报告时,这是最有意义的。

    【讨论】:

      【解决方案5】:

      您可以拨打repartition()并按照以下方式进行操作:

      val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap)
      
      var repartitioned = year.repartition(1)
      repartitioned.saveAsTextFile("C:/Users/TheBhaskarDas/Desktop/wc_spark00")
      

      【讨论】:

        【解决方案6】:

        您将能够在下一版本的 Spark 中执行此操作,在当前版本 1.0.0 中,除非您以某种方式手动执行此操作,例如,如您提到的那样,使用 bash 脚本调用,否则这是不可能的。

        【讨论】:

        • Spark 的下一个版本就在这里,具体怎么做还不清楚:(
        【解决方案7】:

        我还想提一下,文档明确指出,在使用真正少量的分区调用 coalesce 时,用户应该小心。这可能会导致上游分区继承此数量的分区。

        除非确实需要,否则我不建议使用 coalesce(1)。

        【讨论】:

          【解决方案8】:

          在 Spark 1.6.1 中,格式如下所示。它创建一个输出文件。如果输出足够小可以处理,最好使用它。基本上它的作用是返回一个新的 RDD,该 RDD 被减少为 numPartitions 分区。如果你正在做一个剧烈的合并,例如对于 numPartitions = 1,这可能会导致您的计算发生在比您喜欢的更少的节点上(例如,在 numPartitions = 1 的情况下为一个节点)

          pair_result.coalesce(1).saveAsTextFile("/app/data/")
          

          【讨论】:

            【解决方案9】:

            这是我输出单个文件的答案。我刚刚加了coalesce(1)

            val year = sc.textFile("apat63_99.txt")
                          .map(_.split(",")(1))
                          .flatMap(_.split(","))
                          .map((_,1))
                          .reduceByKey((_+_)).map(_.swap)
            year.saveAsTextFile("year")
            

            代码:

            year.coalesce(1).saveAsTextFile("year")
            

            【讨论】:

              猜你喜欢
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 2021-12-22
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 2020-07-29
              相关资源
              最近更新 更多