【问题标题】:Spark - How do I read only the latest (highest) path?Spark - 如何仅读取最新(最高)路径?
【发布时间】:2019-12-05 17:15:36
【问题描述】:

假设我们有一个目录结构/数据分区:

/foo/day=1/lots/of/other/stuff/
/foo/day=2/lots/of/other/stuff/
/foo/day=3/lots/of/other/stuff/
.
.
/foo/day=25/lots/of/other/stuff/

我只想读取最大增量为day的数据,这里是/foo/day=25/lots/of/other/stuff/

如果day 是数据中的一列,我们可以执行以下操作:

spark.read.parquet("s3a://foo/day=*/")
   .withColumn("latestDay",max(col("day")).over())
   .filter(col("day")===col("latestDay"))

假设那一天不是专栏,你能提出更聪明的建议吗?

数据不是使用write.partitionBy("day") 或类似方法写入的。在我的情况下,子路径中的架构甚至不一定有意义地连贯。

也许有一条路径glob pattern 可以执行此操作或类似操作? 还是在性能方面等同于 define the day column 并希望进行谓词推送或类似优化?

【问题讨论】:

    标签: apache-spark hdfs glob


    【解决方案1】:
          import org.apache.hadoop.conf.Configuration
          import org.apache.hadoop.fs.{FileSystem, Path}
          import scala.annotation.tailrec
    
          // replace this with your file system
          val fs: FileSystem = FileSystem.get(new Configuration())
    
          /**
            * Returns the latest partition (folder) contained in the specified path.
            * The latest partition is defined by the name (alfanumerical order). Set the proper flat to enable the check by modification timestamp.
            *
            * @param path      the HDFS folder where to start looking for the latest partition
            * @param recursive if true, returns the latest updated folder in the folder tree with @param(path) as root
            * @param useModificationTimestamp to use the latest added partition
            * @return String path of the latest partition
            */
          def getLatestPartition(path: String, recursive: Boolean = false,
                                 useModificationTimestamp: Boolean = false): String = {
    
            if (recursive) {
              this.getLatestPartitionRecursive(new Path(path), useModificationTimestamp).toString
            } else {
              this.getLatestPartition(new Path(path), useModificationTimestamp).toString
            }
          }
    
          @tailrec
          private def getLatestPartitionRecursive(path: Path, useModificationTimestamp: Boolean): Path = {
            if (fs.listStatus(path).forall(!_.isDirectory)) {
              path
            } else {
              this.getLatestPartitionRecursive(getLatestPartition(path, useModificationTimestamp), useModificationTimestamp)
            }
          }
    
          private def getLatestPartition(path: Path, useModificationTimestamp: Boolean): Path = {
            if (fs.listStatus(path).forall(!_.isDirectory))
              path
            else {
              if (useModificationTimestamp)
                fs.listStatus(path).filter(_.isDirectory).maxBy(_.getModificationTime).getPath
              else
                fs.listStatus(path).filter(_.isDirectory).maxBy(_.getPath.getName).getPath
            }
          }
    

    用法:

        val latest = getLatestPartition("s3a://foo/")
        spark.read.parquet(latest)
    

    要获取最新日期,请从 latest 解析它。

    【讨论】:

      猜你喜欢
      • 2019-03-08
      • 1970-01-01
      • 2017-02-08
      • 2018-11-13
      • 1970-01-01
      • 2017-12-24
      • 2020-01-08
      • 2012-03-29
      • 1970-01-01
      相关资源
      最近更新 更多