【问题标题】:Reading parquet files from multiple directories in Pyspark从 Pyspark 中的多个目录读取镶木地板文件
【发布时间】:2016-09-12 10:21:14
【问题描述】:

我需要从多个不是父目录或子目录的路径中读取 parquet 文件。

例如,

dir1 ---
       |
       ------- dir1_1
       |
       ------- dir1_2
dir2 ---
       |
       ------- dir2_1
       |
       ------- dir2_2

sqlContext.read.parquet(dir1) 从 dir1_1 和 dir1_2 读取 parquet 文件

现在我正在读取每个目录并使用“unionAll”合并数据帧。 有没有办法从 dir1_2 和 dir2_1 读取镶木地板文件而不使用 unionAll 或者有什么奇特的方式使用 unionAll

谢谢

【问题讨论】:

  • 您好,我是读取多个Json 文件的类似任务,但这里提供的代码不起作用:(您找到解决方案了吗?

标签: pyspark parquet


【解决方案1】:

有点晚了,但我在搜索时发现了这个,它可能对其他人有帮助......

您也可以尝试将参数列表解压缩为spark.read.parquet()

paths=['foo','bar']
df=spark.read.parquet(*paths)

如果您想将一些 blob 传递到路径参数中,这很方便:

basePath='s3://bucket/'
paths=['s3://bucket/partition_value1=*/partition_value2=2017-04-*',
       's3://bucket/partition_value1=*/partition_value2=2017-05-*'
      ]
df=spark.read.option("basePath",basePath).parquet(*paths)

这很酷,因为您不需要列出 basePath 中的所有文件,并且仍然可以进行分区推断。

【讨论】:

  • 当我刚刚使用这段代码时,它正在搜索/home/目录下的目录,你能把整个语法贴出来吗?
  • @N00b 当我尝试此代码时,它给了我一个错误,即 load 只需要 4 个参数,但我有 24 个文件的路径.. 是否可以选择覆盖它。我试图不进行多次加载和联合,这就是为什么我想使用 load 将多个文件放入 df
  • 非常适合我! @EB,您是否将其保存为列表,然后将其作为表达式运行 (*paths)
【解决方案2】:

SQLContextparquetFile 方法和DataFrameReaderparquet 方法都采用多条路径。所以这两种方法中的任何一种都有效:

df = sqlContext.parquetFile('/dir1/dir1_2', '/dir2/dir2_1')

df = sqlContext.read.parquet('/dir1/dir1_2', '/dir2/dir2_1')

【讨论】:

  • 这些都不适合我。它会找到“可疑路径”,然后给我一长串 java 东西。
【解决方案3】:

如果您有 list 的文件,您可以这样做:

files = ['file1', 'file2',...]
df = spark.read.parquet(*files)

【讨论】:

    【解决方案4】:

    对于兽人

    spark.read.orc("/dir1/*","/dir2/*")
    

    spark 进入 dir1/ 和 dir2/ 文件夹并加载所有 ORC 文件。

    对于镶木地板,

    spark.read.parquet("/dir1/*","/dir2/*")
    

    【讨论】:

      【解决方案5】:

      只是采纳 John Conley 的答案,稍微修饰一下并提供完整的代码(在 Jupyter PySpark 中使用),因为我发现他的答案非常有用。

      from hdfs import InsecureClient
      client = InsecureClient('http://localhost:50070')
      
      import posixpath as psp
      fpaths = [
        psp.join("hdfs://localhost:9000" + dpath, fname)
        for dpath, _, fnames in client.walk('/eta/myHdfsPath')
        for fname in fnames
      ]
      # At this point fpaths contains all hdfs files 
      
      parquetFile = sqlContext.read.parquet(*fpaths)
      
      
      import pandas
      pdf = parquetFile.toPandas()
      # display the contents nicely formatted.
      pdf
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2022-06-16
        • 1970-01-01
        • 2018-08-13
        • 2021-11-10
        • 2021-03-19
        • 2020-09-17
        • 2020-03-05
        相关资源
        最近更新 更多