【问题标题】:PySpark Reading Multiple Files in ParallelPySpark 并行读取多个文件
【发布时间】:2019-02-10 06:25:50
【问题描述】:

我的项目中有以下要求,我们正在尝试使用 PySpark 进行数据处理。

我们曾经以 Parquet 文件的形式接收每辆车的传感器数据,并且每辆车都有一个文件。该文件有很多传感器,但其结构化数据采用 Parquet 格式。每个文件的平均文件大小为 200MB。

假设我在一批中收到如下文件并准备好处理。

训练文件大小日期

X1 210MB 05-Sep-18 12:10 AM

X1 280MB 05-Sep-18 05:10 PM

Y1 220MB 05-Sep-18 04:10 AM

Y1 241MB 05-Sep-18 06:10 PM

在处理结束时,我需要从每个源文件或一个包含所有这些车辆的聚合数据的主文件中接收一个聚合的 .csv 文件。

我知道 HDFS 默认块大小为 128MB,每个文件将被分成 2 个块。我可以知道如何使用 PySpark 完成这个要求吗?是否可以并行处理所有这些文件?

请告诉我你的想法

【问题讨论】:

    标签: pyspark apache-spark-sql pyspark-sql parquet


    【解决方案1】:

    我最近也遇到过类似的情况。 您可以传递 CSV 列表及其路径以触发读取 API,例如 spark.read.json(input_file_paths) (source)。这会将所有文件加载到单个数据帧中,最终执行的所有转换将由多个执行程序并行完成,具体取决于您的 spark 配置。

    【讨论】:

      【解决方案2】:

      我也遇到过类似的问题,看来我找到了办法: 1.获取文件列表 2.parallelize这个列表(分布在所有节点之间) 3.编写一个函数,从分发到节点的大列表部分读取所有文件的内容 4.用mapPartition运行它,然后将结果收集为一个列表,每个元素都是每个文件的收集内容。 存储在 AWS s3 和 json 文件上的 Fot 文件:

      def read_files_from_list(file_list):
      #reads files from  list
      #returns content as list of strings, 1 json per string ['{}','{}',...]
         out=[]
         for x in file_list:
            content = sp.check_output([ 'aws', 's3', 'cp', x, '-']) # content of the file. x here is a full path: 's3://bucket/folder/1.json'
            out.append(content)   
         return out #content of all files from the file_list as list of strings, 1 json per string ['{}','{}',...]
      
      
      file_list=['f1.json','f2.json',...]
          ps3="s3://bucket/folder/"
          full_path_chunk=[ps3 + f for f in file_list] #makes list  of strings, with full path for each file
          n_parts = 100
          rdd1 = sc.parallelize(full_path_chunk, n_parts ) #distribute files among nodes
          list_of_json_strings = rdd1.mapPartitions(read_files_from_list).collect()
      

      然后,如果需要,您可以像这样创建 spark 数据框:

      rdd2=sc.parallelize(list_of_json_strings) #this is a trick! via http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets
      df_spark=sqlContext.read.json(rdd2)
      

      函数read_files_from_list只是一个例子,应该改为使用python工具从hdfs读取文件。 希望这会有所帮助:)

      【讨论】:

      • 谢谢 lugger1.. 快速提问 - sc.parallelize(full_path_chunk, n_parts) 是否将文件列表拆分为 100 个块并作为 100 个分区发送到集群节点?
      • 是的,它在节点之间分配完整列表的 100 个部分,所以如果您有固定数量的节点,假设 4 个(每个有 8 个核心),而不是 100 个使用 4*8*3 = 96 以获得更好的性能。
      • 函数read_files_from_list(file_list)中,什么是sp? @lugger1
      • sp 是子进程。在“import subprocess as sp”行之前的某处。
      • 嗨@lugger1 将“read_files_from_list”工作以防镶木地板文件?
      【解决方案3】:

      您可以将所有输入文件放在同一个目录中,然后您可以将目录路径传递给 spark。你也可以使用像/data_dir/*.csv这样的通配符。

      【讨论】:

      • 嗨 Hamza.. 是的,我已经看到一些答案,我可以将所有文件放在一个目录中。如果我这样做,如果我想生成 .csv 文件作为每个 i/p 文件的输出,我是否能够使用 Spark 并行处理每个文件?
      • 我认为 spark 处理它。 Spark 会将它们划分为工作人员。每个工人将处理自己的分区。使用 spark 数据结构时无需考虑并行性。
      • 因此,目录中的所有文件在处理过程中都会加载到RDD中,并在集群中的节点之间进行分区。因此,Spark 将目录中的所有文件视为单个文件。对吗?
      • 是的。如果需要,您还可以对每个分区重新分区或应用函数。
      • 最后一个问题..有点困惑..如果我想处理集群中一个节点中的每个文件..怎么办?假设我有 4 个文件,我希望通过将每个文件作为一个分区来由集群中的 4 个差异节点处理它
      猜你喜欢
      • 1970-01-01
      • 2021-11-19
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-07-12
      • 1970-01-01
      • 2021-10-05
      • 1970-01-01
      相关资源
      最近更新 更多