【发布时间】:2019-11-13 20:24:18
【问题描述】:
我的 sparquet 文件是这样的
身份证、姓名、日期
1, a, 1980-09-08
2, b, 1980-09-08
3, c, 2017-09-09
希望输出文件是这样的
文件夹19800908 包含数据
身份证、姓名、日期
1, a, 1980-09-08
2, b, 1980-09-08
并且文件夹20170909 包含数据
身份证、姓名、日期
3, c, 2017-09-09
我知道可以 groupBy key date 但不知道如何输出多个 parquet 文件使用此类 MultipleTextOutputFormat
我不想 foreach 循环键,这会变慢并且需要大量内存
现在是这样的代码
val input = sqlContext.read.parquet(sourcePath)
.persist(StorageLevel.DISK_ONLY)
val keyRows: RDD[(Long, Row)] =
input.mapPartitions { partition =>
partition.flatMap { row =>
val key = format.format(row.getDate(3)).toLong
Option((key, row))
}
}.persist(StorageLevel.DISK_ONLY)
val keys = keyRows.keys.distinct().collect()
for (key <- keys) {
val rows = keyRows.filter { case (_key, _) => _key == key }.map(_._2)
val df = sqlContext.createDataFrame(rows, input.schema)
val path = s"${outputPrefix}/$key"
HDFSUtils.deleteIfExist(path)
df.write.parquet(path)
}
如果我使用 MultipleTextOutputFormat 输出如下,这是我不想要的
keyRows.groupByKey()
.saveAsHadoopFile(conf.getOutputPrefixDirectory, classOf[String], classOf[String],
classOf[SimpleMultipleTextOutputFormat[_, _]])
public class SimpleMultipleTextOutputFormat<A, B> extends MultipleTextOutputFormat<A, B> {
@Override
protected String generateFileNameForKeyValue(A key, B value, String name) {
// return super.generateFileNameForKeyValue(key, value, name);
return key.toString();
}
}
【问题讨论】:
-
也许这个功能
saveAsNewAPIHadoopFile有用
标签: scala apache-spark rdd parquet large-data