【问题标题】:Spark SQL partition pruning for a cached table缓存表的 Spark SQL 分区修剪
【发布时间】:2017-02-10 15:01:00
【问题描述】:

是否为 apache spark 中的缓存 TempTables 启用了分区修剪?如果是这样,我该如何配置?

我的数据是一堆不同安装的传感器读数,一行包含安装名称、标签、时间戳和值。

我已使用以下命令以 parquet 格式写入数据:

rdd.toDF("installationName", "tag", "timestamp", "value")
  .repartition($"installationName", $"tag")
  .write.partitionBy("installationName","tag").mode("overwrite").parquet(config.output)

我使用以下命令将这些数据读取到使用 Spark HiveContext 的 SQL 表中:

val parquet = hc.read.parquet("/path_to_table/tablename")
parquet.registerTempTable("tablename")

现在,如果我对该表运行 SQL 查询,它会按预期进行分区修剪:

hc.sql("select * from tablename where installationName = 'XXX' and tag = 'YYY'")

查询大约需要 8 秒。但是如果我将表缓存在内存中,然后执行相同的查询,总是需要大约 50 秒:

hc.sql("CACHE TABLE tablename")
hc.sql("select * from tablename where installationName = 'XXX' and tag = 'YYY'")

我目前使用的是 Spark 1.6.1。

【问题讨论】:

  • 您好,感谢您的评论。实际上,在将数据写入镶木地板之前,我会进行重新分区操作。我还使用重新分区测试了上述查询,它的查询时间为 20 秒,效率更高,但它仍然比从 parquet 文件中读取而没有缓存要慢。我的目的是完全避免写入镶木地板文件。您能否提供一些来源-您怎么知道缓存后不支持分区修剪?如果你在这里写一个答案,我可以接受。
  • 更正,缓存在内存中将查询时间减少到1秒以内,这当然已经可以接受了。我想知道它是否可以扩展:这只是我数据的一部分,我实际上有超过 200 倍并且还在不断增长,所以我拥有的数据越多,扫描所有分区的时间就越多,所以分区修剪在这里似乎是有益的.

标签: caching apache-spark partition pruning hivecontext


【解决方案1】:

发生这种情况的原因是由于 Cache 在 spark 中的工作方式。

当您向 DataFrame、RDD 或 DataSet 调用某种进程时,执行计划如下:

val df = sc.parallelize(1 to 10000).toDF("line")
df.withColumn("new_line", col("line") * 10).queryExecution

命令queryExecution 将计划返回给您。看下面代码的逻辑规划:

== Parsed Logical Plan ==
Project [*,('line * 10) AS new_line#7]
+- Project [_1#4 AS line#5]
   +- LogicalRDD [_1#4], MapPartitionsRDD[9] at 

== Analyzed Logical Plan ==
line: int, new_line: int
Project [line#5,(line#5 * 10) AS new_line#7]
+- Project [_1#4 AS line#5]
   +- LogicalRDD [_1#4], MapPartitionsRDD[9] at 

== Optimized Logical Plan ==
Project [_1#4 AS line#5,(_1#4 * 10) AS new_line#7]
+- LogicalRDD [_1#4], MapPartitionsRDD[9] at intRddToDataFrameHolder at 

== Physical Plan ==
Project [_1#4 AS line#5,(_1#4 * 10) AS new_line#7]
+- Scan ExistingRDD[_1#4]

在这种情况下,您可以看到您的代码将执行的所有过程。当你像这样调用cache 函数时:

 df.withColumn("new_line", col("line") * 10).cache().queryExecution

结果会是这样的:

== Parsed Logical Plan ==
'Project [*,('line * 10) AS new_line#8]
+- Project [_1#4 AS line#5]
   +- LogicalRDD [_1#4], MapPartitionsRDD[9] at intRddToDataFrameHolder at <console>:34

== Analyzed Logical Plan ==
line: int, new_line: int
Project [line#5,(line#5 * 10) AS new_line#8]
+- Project [_1#4 AS line#5]
   +- LogicalRDD [_1#4], MapPartitionsRDD[9] at intRddToDataFrameHolder at <console>:34

== Optimized Logical Plan ==
InMemoryRelation [line#5,new_line#8], true, 10000, StorageLevel(true, true, false, true, 1), Project [_1#4 AS line#5,(_1#4 * 10) AS new_line#8], None

== Physical Plan ==
InMemoryColumnarTableScan [line#5,new_line#8], InMemoryRelation [line#5,new_line#8], true, 10000, StorageLevel(true, true, false, true, 1), Pro...

此执行将返回给您执行优化逻辑计划中的InMemoryRelation,这将在您的内存中保存数据结构,或者如果您的数据非常大,它将溢出到磁盘。

将其保存在集群中需要时间,第一次执行时会有点慢,但是当您需要再次访问其他地方的相同数据时,将保存 DF 或 RDD 并且 Spark不会再次请求执行。

【讨论】:

  • 感谢您的回答!在 Spark 中,表缓存是一种急切的操作,这意味着当我第一次运行查询时,数据已经被缓存了。这里缓存数据实际上需要 500 秒,而且缓存后确实提高了查询的性能,现在扫描所有分区只需 50 秒。无论我运行多少次查询,性能总是大致相同的。您的回答没有解决我的问题,即cahcing 后的分区修剪。
猜你喜欢
  • 2020-03-07
  • 1970-01-01
  • 1970-01-01
  • 2020-03-09
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-02-15
  • 1970-01-01
相关资源
最近更新 更多