【问题标题】:Spark-Obtaining file name in RDDsSpark-在 RDD 中获取文件名
【发布时间】:2021-10-18 14:28:30
【问题描述】:

我正在尝试处理 4 个每天都在增长的文本文件目录。我需要做的是,如果有人试图搜索发票号码,我应该给他们一个包含它的文件列表。

我能够通过将文本文件中的值加载为 RDD 来映射和减少它们。但是如何获取文件名和其他文件属性呢?

【问题讨论】:

    标签: apache-spark


    【解决方案1】:

    从 Spark 1.6 开始,您可以将text 数据源和input_file_name 函数组合如下:

    斯卡拉

    import org.apache.spark.sql.functions.input_file_name
    
    val inputPath: String = ???
    
    spark.read.text(inputPath)
      .select(input_file_name, $"value")
      .as[(String, String)] // Optionally convert to Dataset
      .rdd // or RDD
    

    Python

    2.x 之前的版本有问题,转换为 RDD 时可能不会保留名称):

    from pyspark.sql.functions import input_file_name
    
    (spark.read.text(input_path)
        .select(input_file_name(), "value"))
        .rdd)
    

    这也可以与其他输入格式一起使用。

    【讨论】:

    • 对我来说,这种方法在 Python 中使用时不起作用。应该是 input_file_name 的字段在执行第一个操作(如 .take(10))时被填充,但每个后续操作(如行上的映射)都会产生一个空字符串。但在 Scala 中它可以工作。 Spark 1.6
    • @ludwigm 仅当您不从 JVM 移动数据时,这才适用于 PySpark。
    • @zero323,为什么我不能从 JVM 中移动 input_file_name。我需要保存文件列表吗?
    • 没错,对于 Dataset 和 DataFrames,您只需添加一列:val content = sqlContext.read.text(inputPath).withColumn("filename", input_file_name)
    • 也适用于流媒体:spark.readStream .option("sep", ",") .schema(someSchema) .option("header", "true") .csv("hdfs://path/") .withColumn("input_file_name", input_file_name)
    【解决方案2】:

    如果你在 pyspark 中可以试试这个:

        test = sc.wholeTextFiles("pathtofile")
    

    您将得到一个结果 RDD,其中第一个元素 = 文件路径,第二个元素 = 内容

    【讨论】:

      【解决方案3】:

      如果您的文本文件足够小,您可以使用SparkContext.wholeTextFiles,它返回的RDD 为(filename,content)

      【讨论】:

      • 在 1.6+ 中实际上可以不读取完整文件。
      【解决方案4】:

      如果您的文本文件对于SparkContext.wholeTextFiles 来说太大,您可以使用(简单)自定义InputFormat,然后调用SparkContext.hadoopRDD

      InputFormat 需要返回一个元组(文件名,行)而不是行,然后您可以使用查看行内容的谓词进行过滤,然后将其唯一并收集文件名。

      在 Spark 中,代码如下所示:

      val ft = classOf[FileNamerInputFormat]
      val kt = classOf[String]
      val vt = classOf[String]
      
      val hadoopConfig = new Configuration(sc.hadoopConfiguration)
      sc.newAPIHadoopFile(path, ft, kt, vt, hadoopConfig)
        .filter { case (f, l) => isInteresting(l) }
        .map { case (f, _) => f } 
        .distinct()
        .collect()
      

      【讨论】:

      • 你能扩展一下吗?也许是一个例子?一旦您处于 MapPartitionsRDD 或另一个不基于文件读取的 RDD 中,这将有何帮助?
      • @JustinPihony 扩展了答案。我希望你不要让我显示 InputFormat... :)
      • 嗨,Alister,非常感谢您的回复。我已经使用了您解决方案的后半部分,并且无需使用自定义输入格式就可以获得(文件,行)。查看此链接:themodernlife.github.io/scala/spark/hadoop/hdfs/2014/09/28/…
      • @VipinBhaskaran 请注意,您正在使用标记为 Developer API 的函数,因此它不稳定。只是要记住的事情
      • 很棒的谷歌搜索,感谢您指出这一点!不过,我同意@JustinPihony 的观点——你用那个甜蜜、甜蜜的 API 调用承担了一些风险。
      【解决方案5】:

      您可以使用WholeTextFile() 来实现此目的。但是,如果输入文件很大,那么使用 WholeTextFile() 会适得其反,因为它将整个文件内容放入单个记录中。

      在这种情况下检索文件名的最佳方法是使用mapPartitionsWithInputSplit()。您可以在 my blog 上找到使用此方案的工作示例。

      【讨论】:

      • 添加了更多细节,我希望它能满足。代码相当大,最好是从博客中删除!
      • 这看起来已经更好了,我已经删除了我之前的评论以避免将来混淆。
      【解决方案6】:

      如果您使用数据帧 API,您可以使用来自 org.apache.spark.sql.functionsinput_file_name 函数从 HDFS 获取文件名。下面的 sn-ps 可能会帮助你理解。

      val df = spark.read.csv("/files/")
      val df2 = df.withColumn("file_name", split(input_file_name(), "/").getItem(7).cast(StringType)) 
      val df3 = df.withColumn("file_name", input_file_name()) 
      

      df2 现在包含名为“file_name”的新字段,其中包含使用split 函数提取的 HDFS 文件名。如果您需要完整的 HDFS 路径,您可以使用 input_file_name() 函数,仅在 df3 中显示。

      【讨论】:

        【解决方案7】:

        直接使用 Spark 似乎有点矫枉过正……如果这些数据要“收集”到驱动程序,为什么不使用 HDFS API? Hadoop 通常与 Spark 捆绑在一起。这是一个例子:

        import org.apache.hadoop.fs._
        import org.apache.hadoop.conf._
        
        val fileSpec = "/data/Invoices/20171123/21"
        val conf = new Configuration()
        val fs = org.apache.hadoop.fs.FileSystem.get(new URI("hdfs://nameNodeEneteredHere"),conf)
        val path = new Path(fileSpec)
        // if(fs.exists(path) && fs.isDirectory(path) == true) ...
        val fileList = fs.listStatus(path)
        

        那么用println(fileList(0)),info(格式化)like this first item(作为例子)可以看成org.apache.hadoop.fs.FileStatus

        FileStatus {
            path=hdfs://nameNodeEneteredHere/Invoices-0001.avro; 
            isDirectory=false; 
            length=29665563;
            replication=3;
            blocksize=134217728;
            modification_time=1511810355666;
            access_time=1511838291440;
            owner=codeaperature;
            group=supergroup;
            permission=rw-r--r--;
            isSymlink=false
        }
        

        fileList(0).getPath 将提供hdfs://nameNodeEneteredHere/Invoices-0001.avro

        我猜这种读取文件的方法主要是使用 HDFS 名称节点,而不是在每个执行程序中。 TLDR;我打赌 Spark 可能会轮询名称节点以获取 RDD。如果底层的 Spark 调用轮询 namenode 来管理 RDD,也许上面是一个有效的解决方案。尽管如此,提出任一方向的贡献的 cmets 都会受到欢迎。

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2015-12-04
          • 1970-01-01
          • 2015-06-19
          • 2017-10-07
          • 2020-08-01
          • 1970-01-01
          • 2020-02-10
          • 2021-07-29
          相关资源
          最近更新 更多