【发布时间】:2016-09-07 21:13:00
【问题描述】:
我正在处理一个大型数据集,该数据集由两列分区 - plant_name 和 tag_id。第二个分区 - tag_id 有 200000 个唯一值,我主要通过特定的 tag_id 值访问数据。如果我使用以下 Spark 命令:
sqlContext.setConf("spark.sql.hive.metastorePartitionPruning", "true")
sqlContext.setConf("spark.sql.parquet.filterPushdown", "true")
val df = sqlContext.sql("select * from tag_data where plant_name='PLANT01' and tag_id='1000'")
我希望得到快速响应,因为这会解析为单个分区。在 Hive 和 Presto 中,这需要几秒钟,但在 Spark 中,它会运行几个小时。
实际数据保存在 S3 存储桶中,当我提交 sql 查询时,Spark 关闭并首先从 Hive 元存储中获取所有分区(其中 200000 个),然后调用refresh() 强制执行完整S3 对象存储中所有这些文件的状态列表(实际上是调用listLeafFilesInParallel)。
正是这两个操作如此昂贵,是否有任何设置可以让 Spark 在调用元数据存储期间或之后立即修剪分区?
【问题讨论】:
-
我也试过上面的代码,加上一个额外的配置参数:
sqlContext.setConf("spark.sql.hive.verifyPartitionPath", "false")对性能没有影响 -
这是一个有趣的问题,但很难回答,因为您没有描述
tag_data的 DataFrame 是如何创建的。我认为扩展这个问题是一个好主意,这样它就可以自己重现。 -
如果我对 Hive 和 Parquet 有更多了解,我可能会。事实上,我不知道如何创建一个(双重)分区的 Parquet 文件。我不清楚您是直接使用 Parquet 文件,还是以某种方式涉及 Hive。 (Hive 被提到过好几次了,但如果这只是一个 Parquet 文件,我不知道它的作用是什么。)
-
添加您的 spark 版本。我不确定,但可能会创建外部表(搜索它)会有所帮助(为此启用配置单元支持)。据我了解,它只会进行一次扫描,然后将这些数据保存在配置单元元数据存储中。下次你不会花这个开销。再次验证以上所有内容。
标签: apache-spark amazon-s3 hive parquet