【问题标题】:How to Split a large parquet file to multiple parquet and save in different hadoop path by time column如何将大型镶木地板文件拆分为多个镶木地板并按时间列保存在不同的hadoop路径中
【发布时间】:2019-11-13 20:24:18
【问题描述】:

我的 sparquet 文件是这样的

身份证、姓名、日期

1, a, 1980-09-08

2, b, 1980-09-08

3, c, 2017-09-09

希望输出文件是这样的

文件夹19800908 包含数据

身份证、姓名、日期

1, a, 1980-09-08

2, b, 1980-09-08

并且文件夹20170909 包含数据

身份证、姓名、日期

3, c, 2017-09-09

我知道可以 groupBy key date 但不知道如何输出多个 parquet 文件使用此类 MultipleTextOutputFormat

我不想 foreach 循环键,这会变慢并且需要大量内存

现在是这样的代码

   val input = sqlContext.read.parquet(sourcePath)
      .persist(StorageLevel.DISK_ONLY)

    val keyRows: RDD[(Long, Row)] =
      input.mapPartitions { partition =>
        partition.flatMap { row =>
          val key =  format.format(row.getDate(3)).toLong
          Option((key, row))
        }
      }.persist(StorageLevel.DISK_ONLY)

    val keys = keyRows.keys.distinct().collect()

    for (key <- keys) {
      val rows = keyRows.filter { case (_key, _) => _key == key }.map(_._2)
      val df = sqlContext.createDataFrame(rows, input.schema)
      val path = s"${outputPrefix}/$key"
      HDFSUtils.deleteIfExist(path)
      df.write.parquet(path)
    }

如果我使用 MultipleTextOutputFormat 输出如下,这是我不想要的

    keyRows.groupByKey()
      .saveAsHadoopFile(conf.getOutputPrefixDirectory, classOf[String], classOf[String],
        classOf[SimpleMultipleTextOutputFormat[_, _]])
public class SimpleMultipleTextOutputFormat<A, B> extends MultipleTextOutputFormat<A, B> {

    @Override
    protected String generateFileNameForKeyValue(A key, B value, String name) {
//        return super.generateFileNameForKeyValue(key, value, name);
        return key.toString();
    }
}

【问题讨论】:

  • 也许这个功能saveAsNewAPIHadoopFile有用

标签: scala apache-spark rdd parquet large-data


【解决方案1】:

可以使用分区列写入:

df.write.partitionBy("dateString").parquet("/path/to/file").

不同之处 - 文件夹名称将类似于“dateString=2017-09-09”,并且必须在保存之前创建新的字符串列“dateString”。

【讨论】:

  • 源文件很大,有25g。运行非常内存的任务导致错误“spark.yarn.executor.memoryoverhead”
  • 也许,内存问题与写入无关,而与处理有关。这可以通过从 write 子句中删除“partitionBy”来检查。
  • 但是我需要“partitionBy”子句对源文件进行分区~
【解决方案2】:

来自这个帖子spark partition data writing by timestamp

    input
      .withColumn("_key", date_format(col(partitionField), format.toPattern))
      .write
      .partitionBy("_key")
      .parquet(conf.getOutputPrefixDirectory)

但是如何去掉文件夹名'_ke='

【讨论】:

    猜你喜欢
    • 2015-05-30
    • 2018-05-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-06-02
    • 2018-12-23
    • 2021-11-02
    • 1970-01-01
    相关资源
    最近更新 更多