【问题标题】:Spark caching effect on Optimized Logical PlanSpark缓存对优化逻辑计划的影响
【发布时间】:2020-07-22 23:24:08
【问题描述】:

我正在看这个问题和出色的答案Spark: Explicit caching can interfere with Catalyst optimizer's ability to optimize some queries?

要点是:

val df = spark.range(100)
df.join(df, Seq("id")).filter('id <20).explain(true)

通过首先应用过滤为不使用索引的系统生成足够稳健的计划:

== Optimized Logical Plan ==
Project [id#16L]
+- Join Inner, (id#16L = id#18L)
   :- Filter (id#16L < 20)
   :  +- Range (0, 100, step=1, splits=Some(8))
   +- Filter (id#18L < 20)
      +- Range (0, 100, step=1, splits=Some(8))

接下来的例子表明:

df.join(df, Seq("id")).cache.filter('id <20).explain(true)

生成此计划:

== Optimized Logical Plan ==
Filter (id#16L < 20)
+- InMemoryRelation [id#16L], StorageLevel(disk, memory, deserialized, 1 replicas)
      +- *(2) Project [id#16L]
         +- *(2) BroadcastHashJoin [id#16L], [id#21L], Inner, BuildRight
            :- *(2) Range (0, 100, step=1, splits=8)
            +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#112]
               +- *(1) Range (0, 100, step=1, splits=8)

那么这个呢?

df.join(df, Seq("id")).filter('id <20).cache.explain(true)

生成:

== Optimized Logical Plan ==
InMemoryRelation [id#16L], StorageLevel(disk, memory, deserialized, 1 replicas)
   +- *(1) Filter (id#16L < 20)
      +- *(1) InMemoryTableScan [id#16L], [(id#16L < 20)]
            +- InMemoryRelation [id#16L], StorageLevel(disk, memory, deserialized, 1 replicas)
                  +- *(2) Project [id#16L]
                     +- *(2) BroadcastHashJoin [id#16L], [id#21L], Inner, BuildRight
                        :- *(2) Range (0, 100, step=1, splits=8)
                        +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#112]
                           +- *(1) Range (0, 100, step=1, splits=8)

寻求澄清。

  • 我原以为第一个 Opt Log Pl 将通过缓存作为最后一个方面获得。我怀疑一定很简单。是一样的吗?我认为不会。

【问题讨论】:

  • 您好!您使用的是哪个版本的 Spark?我的本地 spark-shell 上有一个不同的 OLP,使用 Spark 2.4.4
  • 245 在数据块上
  • 将在 3 后尝试@BlueSheepToken
  • 我回答了,希望这一切都有意义!在 Spark3 上,这是一个有趣的实验(完成了),但它会增加噪音以理解下推谓词

标签: apache-spark caching


【解决方案1】:

我认为你在实验中遇到了一个错误。

如果您在新的 spark-shell 中运行以下命令:

val df = spark.range(100)
df.join(df, Seq("id")).filter('id <20).cache.explain(true)

您将拥有以下优化的逻辑计划:

== Optimized Logical Plan ==
InMemoryRelation [id#0L], StorageLevel(disk, memory, deserialized, 1 replicas)
   +- *(2) Project [id#0L]
      +- *(2) BroadcastHashJoin [id#0L], [id#2L], Inner, BuildRight
         :- *(2) Filter (id#0L < 20)
         :  +- *(2) Range (0, 100, step=1, splits=12)
         +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
            +- *(1) Filter (id#2L < 20)
               +- *(1) Range (0, 100, step=1, splits=12)

使用下推谓词正确推送过滤器。

但是,在一个新的 spark-shell 中,如果你运行:

val df = spark.range(100)
df.join(df, Seq("id")).cache.filter('id <20).explain(true)
df.join(df, Seq("id")).filter('id <20).cache.explain(true)

您将拥有以下优化的逻辑计划:

== Optimized Logical Plan ==
InMemoryRelation [id#0L], StorageLevel(disk, memory, deserialized, 1 replicas)
   +- *(1) Filter (id#0L < 20)
      +- *(1) InMemoryTableScan [id#0L], [(id#0L < 20)]
            +- InMemoryRelation [id#0L], StorageLevel(disk, memory, deserialized, 1 replicas)
                  +- *(2) Project [id#0L]
                     +- *(2) BroadcastHashJoin [id#0L], [id#2L], Inner, BuildRight
                        :- *(2) Range (0, 100, step=1, splits=12)
                        +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
                           +- *(1) Range (0, 100, step=1, splits=12)

没有优化的计划。

为什么?

这是因为我们已经缓存了 DAG:df.join(df, Seq("id"))

因此,即使我们在过滤器之后使用过滤器和缓存再次编写它,Spark-Engine 也会看到join DAG 并从这里运行它,因此随后添加一个过滤器。对于 Spark 引擎,使用缓存的 Dataframe 比重新计算整个 DAG 更快。

如何解决?

您可以简单地 unpersist dag:df.join(df, Seq("id")).unpersist() 然后 df.join(df, Seq("id")).filter('id &lt;20).cache.explain(true) 给出正确的 OLP

【讨论】:

  • 对我来说它看起来像一个错误。这是你的结论吗?
  • 一定是这样。干杯。我重新启动了集群,但一定是出错了。
  • 你应该尝试简单地取消持久化:df.join(df, Seq("id")).unpersist() 然后df.join(df, Seq("id")).filter('id &lt;20).cache.explain(true)
  • 加一可以取消持久化以前缓存的内容。答案本身值得更新 IMO。
  • 我想我还有一点要说,但我正在运输中,但是好东西
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-01-21
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多