【问题标题】:Hadoop / Spark Read Many CSV filesHadoop / Spark 读取许多 CSV 文件
【发布时间】:2017-03-09 01:36:19
【问题描述】:

我有很多结构化数据以一种非常有意义的方式存储,我希望以同样有意义的、完整和有效的方式对其进行处理。

+- some-hdfs-path/
  +- level-1_var-01/
  |  +- level-2_var-001.csv
  |  +- ...
  |  +- level-2_var-nnn.csv
  +- level-1_var-02/
  |  +- level-2_other-001.csv
  |  +- ...
  |  +- level-2_other-mmm.csv
  +- ... /
  +- level-1_var-nn/
  |  +- ...

每个文件大约 100MB,大约有 1,000,000 行。每个目录的文件数量(通常约为 100 个)各不相同,文件名也各不相同。换句话说,我不知道有多少文件或它们叫什么,但我确实需要它们的名称,当然还有它们的内容。

我无法处理从 sc.textFile("/some-hdfs-path/level-1_var-01/*.csv")sc.wholeTextFiles("/some-hdfs-path/level-1_var-01") 收到的 RDD。

总体目标是实际获取 level-1_var/ 目录中每个文件的第一行和最后一行。合并每个 level-1_var 的结果,然后返回并在 some-other-hdfs-path/level-1-var/ 中为每个 level-1_var/ 写出全新的文件集

我是 Hadoop/Spark 和使用 RDD 的新手。我已经阅读了上述两个函数的documentation,但我仍然对如何迭代我要返回的 RDD 并进行处理感到困惑。

编辑:文件包含时间序列数据,因此不希望将每个目录中的文件内容连接起来。我愿意将文件的内容作为附加列添加到一个巨大的数据框中,而不是作为行。

【问题讨论】:

    标签: python csv hadoop apache-spark hdfs


    【解决方案1】:

    通过替换您的配置和属性,使用此代码在 pySpark 中读取 CSV。

    from pyspark.sql import SparkSession
    from pyspark.sql import Row
    
    def get_first_and_last(filename):
        #rdd variable holds the content of file(it's distributed)
        rdd = spark.read.csv(filename, header=True, mode="DROPMALFORMED").rdd
    
        #Here filename holds abs path. Feel free to substring as per your needs 
        return Row(filename, rdd.first, rdd.take(rdd.count()).last())
    
    
    spark = SparkSession \
        .builder \
        .appName("Read CSVs") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    
    # This file list is not distributed one, It holds list of filenames only
    filesList = spark.sparkContext\
        .wholeTextFiles("/some-hdfs-path/level-*_var-*/*.csv")\
        .map(lambda x: x[0])\  
        .collect()
    
    #output array
    records = filesList.map(get_first_and_last)
    
    for record in records:
        print(record)
    

    我已经在 scala 中尝试了等效代码,并且可以根据需要查看结果。

    编辑:根据 cmets 添加了另一种方法。

    注意: 使用sparkContext.wholeTextFiles() 时首选小文件,因为每个文件都将完全加载到内存中。 documentation

    records = spark.sparkContext\
        .wholeTextFiles("/some-hdfs-path/level-*_var-*/*.csv")\
        .map(lambda x : Row(x[0], x[1].split("\\n")[0], x[1].split("\\n")[-1]))\
    
    for record in records.collect():
        print(record)
    

    pySpark - SparkSession

    【讨论】:

    • 在多个CSV文件的情况下df的结构是什么样的?
    • df 结构将取决于 schema 定义。 1) 读取 CSV 或 CSV 文件列表时没有差异火花。 2) 但是,如果我们想分阶段处理,目录中所有 CSV 文件的架构应该相同。
    【解决方案2】:

    您可以使用 spark 2.0 的 SparkSession 对象并给出 csv 的目录

    val df =spark.read.csv(pathOfDirectory)
    

    上面的df将包含目录中所有csv的数据

    【讨论】:

    • 这不是 scala 语法吗? ...并且该行不是 some_rdd = sc.wholeTextFiles("/some-hdfs-path/level-1_var-01") 吗?我不确定
    猜你喜欢
    • 1970-01-01
    • 2021-03-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-10-11
    • 2019-02-08
    • 1970-01-01
    • 2019-07-10
    相关资源
    最近更新 更多