【发布时间】:2017-01-10 10:00:13
【问题描述】:
业务案例是我们希望将一个大的 parquet 文件按列拆分为多个小文件作为分区。我们已经使用 dataframe.partition("xxx").write(...) 进行了测试。 100K 条记录花了大约 1 小时。因此,我们将使用 map reduce 在不同的文件夹中生成不同的 parquet 文件。示例代码:
import org.apache.hadoop.io.NullWritable
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String =
key.asInstanceOf[String]+"/aa"
}
object Split {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("SplitTest")
val sc = new SparkContext(conf)
sc.parallelize(List(("w", "www"), ("b", "blog"), ("c", "com"), ("w", "bt")))
.map(value => (value._1, value._2 + "Test"))
.partitionBy(new HashPartitioner(3))//.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)
.saveAsHadoopFile(args(0), classOf[String], classOf[String],
classOf[RDDMultipleTextOutputFormat])
sc.stop()
}
}
上面的示例只是生成了一个文本文件,如何生成多个输出格式的parquet文件?
【问题讨论】:
标签: scala apache-spark parquet