对于那些使用更大数据集的人:
rdd.collect() 不应在这种情况下使用,因为它会将所有数据收集为驱动程序中的 Array,这是最容易耗尽内存的方法。
也不应使用rdd.coalesce(1).saveAsTextFile(),因为上游阶段的并行性将丢失,无法在存储数据的单个节点上执行。
rdd.coalesce(1, shuffle = true).saveAsTextFile() 是最好的简单选项,因为它将保持上游任务的处理并行,然后只对一个节点执行 shuffle(rdd.repartition(1).saveAsTextFile() 是精确的同义词)。
-
rdd.saveAsSingleTextFile()(如下提供)还允许将 rdd 存储在具有特定名称的单个文件中,同时保持rdd.coalesce(1, shuffle = true).saveAsTextFile() 的并行属性。
李>
rdd.coalesce(1, shuffle = true).saveAsTextFile("path/to/file.txt") 的不便之处在于它实际上生成了一个路径为path/to/file.txt/part-00000 而不是path/to/file.txt 的文件。
下面的解决方案rdd.saveAsSingleTextFile("path/to/file.txt")实际上会产生一个路径为path/to/file.txt的文件:
package com.whatever.package
import org.apache.spark.rdd.RDD
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.hadoop.io.compress.CompressionCodec
object SparkHelper {
// This is an implicit class so that saveAsSingleTextFile can be attached to
// SparkContext and be called like this: sc.saveAsSingleTextFile
implicit class RDDExtensions(val rdd: RDD[String]) extends AnyVal {
def saveAsSingleTextFile(path: String): Unit =
saveAsSingleTextFileInternal(path, None)
def saveAsSingleTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit =
saveAsSingleTextFileInternal(path, Some(codec))
private def saveAsSingleTextFileInternal(
path: String, codec: Option[Class[_ <: CompressionCodec]]
): Unit = {
// The interface with hdfs:
val hdfs = FileSystem.get(rdd.sparkContext.hadoopConfiguration)
// Classic saveAsTextFile in a temporary folder:
hdfs.delete(new Path(s"$path.tmp"), true) // to make sure it's not there already
codec match {
case Some(codec) => rdd.saveAsTextFile(s"$path.tmp", codec)
case None => rdd.saveAsTextFile(s"$path.tmp")
}
// Merge the folder of resulting part-xxxxx into one file:
hdfs.delete(new Path(path), true) // to make sure it's not there already
FileUtil.copyMerge(
hdfs, new Path(s"$path.tmp"),
hdfs, new Path(path),
true, rdd.sparkContext.hadoopConfiguration, null
)
// Working with Hadoop 3?: https://stackoverflow.com/a/50545815/9297144
hdfs.delete(new Path(s"$path.tmp"), true)
}
}
}
可以这样使用:
import com.whatever.package.SparkHelper.RDDExtensions
rdd.saveAsSingleTextFile("path/to/file.txt")
// Or if the produced file is to be compressed:
import org.apache.hadoop.io.compress.GzipCodec
rdd.saveAsSingleTextFile("path/to/file.txt.gz", classOf[GzipCodec])
这个sn-p:
-
首先将带有rdd.saveAsTextFile("path/to/file.txt") 的rdd 存储在一个临时文件夹path/to/file.txt.tmp 中,就好像我们不想将数据存储在一个文件中一样(这使得上游任务的处理保持并行)
李>
-
然后,仅使用hadoop file system api,我们继续处理不同输出文件的merge (FileUtil.copyMerge()),以创建我们的最终输出单个文件path/to/file.txt。
李>