【问题标题】:Spark filter pushdown with multiple values in subquery子查询中具有多个值的 Spark 过滤器下推
【发布时间】:2020-04-06 21:55:32
【问题描述】:

我有一个小表 adm,其中一列 x 仅包含 10 行。现在我想使用分区修剪过滤另一个由y 分区的表big,其值来自adm

在这里

select * from big b 
where b.y = ( select max(a.x) from adm a)

分区过滤器下推有效,但不幸的是:

select * from big b
where b.y IN (select a.x from adm a )

导致ab 之间的广播连接

即使我使用IN,如何将子查询作为分区过滤器下推

【问题讨论】:

  • 您的问题出在 Spark 或 Hive 上?
  • 我正在使用 Spark 处理数据。这些表是在 Hive 中创建的。我的问题在于 Spark
  • 我认为,这是合乎逻辑的。如果您使用 Max,那么它将返回单值数据帧,该数据帧可以作为变量调用,因此不会有连接,但在第二种情况下,您使用的是 IN,它将在内部将您的数据帧与另一个数据帧的所有可能值连接起来,因为您的第二个数据帧是默认情况下,非常小的 spark 将执行广播连接以使更优化。如果我错了,请纠正我。
  • 是的,但是一旦它进行广播连接,它就必须对big 进行全表扫描,而如果它要进行过滤操作,它只需要读取一些来自big 的分区。所以我的问题是我怎样才能让 Spark 做一个过滤操作而不是广播加入这里
  • 动态分区修剪是我正在寻找的。它是在 Spark 3.0.0 中添加的

标签: apache-spark hive apache-spark-sql hiveql


【解决方案1】:

发生这种情况是因为子查询的结果本身就是一个 RDD,因此 Spark 以真正分布式的方式处理它——通过广播和连接——就像它是任何其他列一样,不一定是分区。
要解决此问题,您需要单独执行子查询,收集结果并将其格式化为可用于IN 子句的值。

scala> val ax = spark.sql("select a.x from adm a")
scala> val inclause = ax.as(Encoders.STRING).map(x => "'"+x+"'").collectAsList().asScala.mkString(",")
scala> spark.sql("select * from big b where b.y IN (" + inclause + ")")

(假设xy 是字符串。)

【讨论】:

  • 您的方法有效,但这不是我想要的。我希望能够在 Hive 的视图中定义查询。然后,spark 应用程序应该只在过滤器被按下的情况下从该视图中读取。我认为可能有一个配置或提示,以避免将 IN 转换为广播连接
猜你喜欢
  • 1970-01-01
  • 2016-01-25
  • 2017-06-17
  • 2021-09-27
  • 2017-08-20
  • 2019-03-26
  • 1970-01-01
  • 1970-01-01
  • 2019-02-18
相关资源
最近更新 更多