【问题标题】:Spark not leveraging hdfs partitioning with parquetSpark 不利用镶木地板的 hdfs 分区
【发布时间】:2016-07-07 16:50:16
【问题描述】:

我正在使用以下命令将 parquet 文件写入 hdfs: df.write.mode(SaveMode.Append).partitionBy(id).parquet(path)

之后我会像这样读取和过滤文件:

val file = sqlContext.read.parquet(folder)
val data = file.map(r => Row(r.getInt(4).toString, r.getString(0), r.getInt(1),
    r.getLong(2), r.getString(3)))

val filteredData = data.filter(x => x.thingId.equals("1"))
filteredData.collect()

我希望,Spark 将利用文件的分区并且只读取“thingId = 1”的分区。 事实上,Spark 确实读取了文件的所有分区,而不仅仅是过滤后的分区(thingId=1 的分区)。 如果我查看日志,我可以看到它确实读取了所有内容:

16/03/21 01:32:33 INFO ParquetRelation:从以下位置读取 Parquet 文件 hdfs://sandbox.hortonworks.com/path/id=1/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet 21 年 16 月 3 日 01:32:33 信息 ParquetRelation:从 hdfs://sandbox.hortonworks.com/path/id=42/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet 21 年 16 月 3 日 01:32:33 信息 ParquetRelation:从 hdfs://sandbox.hortonworks.com/path/id=17/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet 21 年 16 月 3 日 01:32:33 信息 ParquetRelation:从 hdfs://sandbox.hortonworks.com/path/0833/id=33/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet 21 年 16 月 3 日 01:32:33 信息 ParquetRelation:从 hdfs://sandbox.hortonworks.com/path/id=26/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet 21 年 16 月 3 日 01:32:33 信息 ParquetRelation:从 hdfs://sandbox.hortonworks.com/path/id=12/part-r-00000-b4e27b02-9a21-4915-89a7-189c30ca3fe3.gz.parquet

我有什么遗漏吗?当我查看文档时,Spark 应该知道基于过滤器,它应该只读取带有 thingID=1 的分区。 有没有人知道问题出在哪里?

【问题讨论】:

    标签: hadoop apache-spark hdfs parquet bigdata


    【解决方案1】:

    一些问题可能会阻止 Spark 成功“下推”谓词(即在输入格式级别使用过滤器):

    1. filter-pushdown 关闭:根据您使用的 Spark 版本,谓词下推选项 (spark.sql.parquet.filterPushdown) 可能已关闭。从 Spark 1.5.0 开始,它默认开启 - 所以请检查您的版本和配置

    2. filter is "opaque":这似乎是这种情况:您正在加载 parquet 文件,将每一行映射到另一行(重新排序列?),然后使用 filter接受函数的方法。 Spark 无法“读取”函数代码并意识到它在分区列上使用了比较 - 对于 Spark,这只是一个可以进行各种检查的 Row => Boolean 函数......

      要使过滤器下推起作用,您需要在将记录映射到与原始结构“分离”的东西之前使用它,并使用使用可由 Spark 解析的过滤器的 filter 重载之一,例如:

      // assuming the relevant column name is "id" in the parquet structure
      val filtered = file.filter("id = 1") 
      
      // or:
      val filtered = file.filter(col("id") === 1) 
      
      // and only then:
      val data = filtered.map(r => Row(...))
      

    【讨论】:

    • 非常感谢 Tzach Zohar :) 我使用的是 Spark 1.5.0,所以问题与您在选项 2. opaque filter 中描述的完全一样。当我在映射行之前进行过滤时,它可以工作!非常感谢!
    猜你喜欢
    • 2017-03-17
    • 2017-11-11
    • 2018-12-23
    • 2017-11-10
    • 2018-10-27
    • 2016-07-04
    • 1970-01-01
    • 1970-01-01
    • 2018-11-22
    相关资源
    最近更新 更多