【问题标题】:Can I get metadata of files reading by Spark我可以获取 Spark 读取的文件的元数据吗
【发布时间】:2025-12-24 20:45:06
【问题描述】:

假设我们有 2 个文件,文件#1 在 12:55 创建,文件#2 在 12:58 创建。在阅读这两个文件时,我想添加一个新列“creation_time”。属于文件#1 的行在“creation_time”列中为 12:55,属于文件#2 的行在“creation_time”中为 12:58。

new_data = spark.read.option("header", "true").csv("s3://bucket7838-1/input")

我正在使用上面的代码 sn-p 来读取“输入”目录中的文件。

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql


    【解决方案1】:

    使用input_file_name()函数获取文件名,然后使用hdfs file api获取文件时间戳,最后在filename上加入两个数据框强>。

    Example:

    from pyspark.sql.types import *
    from pyspark.sql.functions import *
    URI           = sc._gateway.jvm.java.net.URI
    Path          = sc._gateway.jvm.org.apache.hadoop.fs.Path
    FileSystem    = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
    Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration
    
    fs = FileSystem.get(URI("hdfs://<namenode_address>:8020"), Configuration())
    
    status = fs.listStatus(Path('<hdfs_directory>'))
    
    filestatus_df=spark.createDataFrame([[str(i.getPath()),i.getModificationTime()/1000] for i in status],["filename","modified_time"]).\
    withColumn("modified_time",to_timestamp(col("modified_time")))
    
    input_df=spark.read.csv("<hdfs_directory>").\
    withColumn("filename",input_file_name())
    
    #join both dataframes on filename to get filetimestamp
    df=input_df.join(filestatus_df,['filename'],"left")
    

    【讨论】:

    • 感谢您的精彩回答。 SO必须可以选择两次投票:-)
    • @Shu,来源是AWS S3。
    • 很好的答案 :) 谢谢舒
    • 有没有像 creation_date_file() 这样的 api 提供文件的创建日期以及我们有 input_file_name()?
    • 好的。无论如何,您的解决方案对我帮助很大。
    【解决方案2】:

    步骤如下

    1. 使用 sparkcontext.wholeTextFiles("/path/to/folder/ contains/all/files")
    2. 上面返回一个RDD,其中key是文件的路径,value是文件的内容
    3. rdd.map(lambda x:x[1]) - 这给你一个只有文件内容的rdd
    4. rdd.map(lambda x: customeFunctionToProcessFileContent(x))
    5. 由于 map 函数是并行工作的,因此您执行的任何操作都会更快且不连续 - 只要您的任务不相互依赖,这是并行性的主要标准
    import os
    import time
    
    import pyspark
    from pyspark.sql.functions import udf
    from pyspark.sql.types import *
    
    # reading all the files to create PairRDD 
    input_rdd = sc.wholeTextFiles("file:///home/user/datatest/*",2)
    
    #convert RDD to DF
    
    input_df=spark.createDataFrame(input_rdd)
    
    input_df.show(truncate=False)
    '''
    +---------------------------------------+------------+
    |_1                                     |_2          |
    +---------------------------------------+------------+
    |file:/home/user/datatest/test.txt      |1,2,3  1,2,3|
    |file:/home/user/datatest/test.txt1     |4,5,6  6,7,6|
    +---------------------------------------+------------+
    '''
    input_df.select("_2").take(2)
    #[Row(_2=u'1,2,3\n1,2,3\n'), Row(_2=u'4,5,6\n6,7,6\n')]
    
    
    # function to get a creation time of a file
    def time_convesion(filename):
        return time.ctime(os.path.getmtime(filename.split(":")[1]))
    
    #udf registration
    time_convesion_udf = udf(time_convesion, StringType())
    
    #udf apply over the DF
    final_df = input_df.withColumn("created_time", time_convesion_udf(input_df['_1']))
    
    final_df.show(2,truncate=False)
    '''
    +---------------------------------------+------------+------------------------+
    |_1                                     |_2          |created_time            |
    +---------------------------------------+------------+------------------------+
    |file:/home/user/datatest/test.txt      |1,2,3  1,2,3|Sat Jul 11 18:31:03 2020|
    |file:/home/user/datatest/test.txt1     |4,5,6  6,7,6|Sat Jul 11 18:32:43 2020|
    +---------------------------------------+------------+------------------------+
    '''
    # proceed with the next steps for the implementation
    
    

    上述方法适用于默认分区。所以你可能不会得到输入文件数等于输出文件数(因为输出是分区数)。

    您可以根据计数或基于您的数据的任何其他唯一值对 RDD 重新分区,因此最终输出文件计数等于输入计数。这种方法仅具有并行性,但不会达到最佳分区数的性能

    【讨论】:

    • 谢谢,我会研究这个解决方案。有没有可能使用数据框的解决方案?
    • 在上面的解决方案中我得到文件的创建时间?
    • 编写一个 UDF 以获得您处理的每一行的创建时间。您可以逐个处理文件并将其传递给 UDF 并进行火花加载。 *.com/questions/9679344/… 或以下代码用于从 NAS 读取它 import os path = '/home/user/datatest/' files = [] # r=root, d=directories, f = files for r, d, f in os.walk(path): for file in f: if '.txt' in file: files.append(os.path.join(r, file)) for f in files: print(f) time.ctime(os.path.getmtime(f))
    • spark.read.option("header", "true").csv("s3://bucket7838-1/input") 这会从目录中读取所有文件,我会知道谁那行属于哪个文件?
    • 你想让我一个一个地读取文件并使用 boto3 获取元数据?