【问题标题】:Get default HDFS path that Parquet file is saved in获取保存 Parquet 文件的默认 HDFS 路径
【发布时间】:2018-01-17 18:37:03
【问题描述】:

我运行了一个 Spark 作业,最终保存了 Parquet 文件,并且该作业成功完成。但是,我只指定了文件名,并没有指定 HDFS 路径。有没有办法打印出 spark 将文件写入的默认 HDFS 路径?我查看了sc._conf.getAll(),但那里似乎没有任何有用的东西。

【问题讨论】:

    标签: python scala hadoop apache-spark parquet


    【解决方案1】:

    AFAIK 这是其中一种方式(除了简单的命令方式是hadoop fs -ls -R | grep -i yourfile)....

    以下是示例 scala 代码 sn-p....(如果您想在 python 或 java 中执行此操作,您可以模拟相同的 api 调用) 获取镶木地板文件列表。并像下面一样过滤它们......

    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
    import org.apache.hadoop.io.{BytesWritable, Text}
    import org.apache.spark.{SparkConf, SparkContext}
    //other imports here 
    lazy val sparkConf = new SparkConf()    
     lazy val sc = SparkContext.getOrCreate(sparkConf)   
     lazy val fileSystem = FileSystem.get(sc.hadoopConfiguration)
        val fileSystem = listChaildStatuses(fileSystem , new Path("yourbasepathofHDFS")) // normally hdfs://server/user like this...
      val allparquet = fileSystem.filter(_.getPath.getName.endsWith(".parquet"))
    // now you can print these parquet files out of which your files will be present and you can know the base path...
    

    支持方式如下

    /**
            * Get [[org.apache.hadoop.fs.FileStatus]] objects for all Chaild children (files) under the given base path. If the
            * given path points to a file, return a single-element collection containing [[org.apache.hadoop.fs.FileStatus]] of
            * that file.
            */
         def listChaildStatuses(fs: FileSystem, basePath: Path): Seq[FileStatus] = {
            listChaildStatuses(fs, fs.getFileStatus(basePath))
        }
    
     /**
        * Get [[FileStatus]] objects for all Chaild children (files) under the given base path. If the
        * given path points to a file, return a single-element collection containing [[FileStatus]] of
        * that file.
        */
      def listChaildStatuses(fs: FileSystem, baseStatus: FileStatus): Seq[FileStatus] = {
        def recurse(status: FileStatus): Seq[FileStatus] = {
          val (directories, leaves) = fs.listStatus(status.getPath).partition(_.isDirectory)
          leaves ++ directories.flatMap(f => listChaildStatuses(fs, f))
        }
    
        if (baseStatus.isDirectory) recurse(baseStatus) else Seq(baseStatus)
      }
    

    【讨论】:

    • 请检查上面。希望有帮助!
    猜你喜欢
    • 1970-01-01
    • 2019-06-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多