【问题标题】:Broadcast join in spark not working for left outer广播加入火花不适用于左外
【发布时间】:2020-07-04 23:40:32
【问题描述】:

我有一个小表 (2k) 记录和大表 (5 百万) 记录。我需要从小表中获取所有数据,并且只从大表中获取匹配数据,为了实现这一点,我在下面的查询中执行了 select /*+ broadcast(small)*/ small.* From small left outer join large 虽然查询返回正确的结果,但是当我检查查询计划时,它显示排序合并广播哈希连接。 如果小表是左表我们不能广播,有什么限制吗?那有什么出路。

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql amazon-emr


    【解决方案1】:

    由于您想从小表而不是大表中选择完整的数据集,Spark 不强制广播连接。 当您更改连接顺序或转换为等连接时,spark 会很高兴地强制执行广播连接。

    例如:

    1. Big-Table left outer join Small-Table -- 启用广播
    2. Small-Table 左外连接 Big-Table -- Broadcast Disabled

    原因: *Spark 将向所有存在大表数据的数据节点共享小表(即广播表)。在您的情况下,我们需要小表中的所有数据,但只需要匹配大表中的数据。所以 spark 不知道这条记录是否在另一个数据节点匹配,甚至根本没有匹配。由于这种模糊性,它无法从小表中选择所有记录(如果这是分发的)。所以在这种情况下,spark 没有使用 Broadcast Join。 *

    【讨论】:

      【解决方案2】:

      通过广播左表更改 order of the tables,因为您正在执行 left join,因此要广播的右表(或)将连接类型更改为 right.

      select /*+ broadcast(small)*/ small.* From small right outer join large
      select /*+ broadcast(small)*/ small.* From large left outer join small
      

      Example:

      df=spark.createDataFrame([(1,'a')],['id','name'])
      df1=spark.createDataFrame([(1,'a')],['id','name'])
      
      #broadcasting on right df1 and performing left join
      df.join(broadcast(df1),['id'],'left').explain()
      #== Physical Plan ==
      #*(2) Project [id#0L, name#1, name#5]
      #+- *(2) BroadcastHashJoin [id#0L], [id#4L], LeftOuter, BuildRight
      #   :- Scan ExistingRDD[id#0L,name#1]
      #   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
      #      +- *(1) Filter isnotnull(id#4L)
      #         +- Scan ExistingRDD[id#4L,name#5]
      
      
      #broadcasting df1 and right join defaults to Sortmerge join
      df.join(broadcast(df1),['id'],'right').explain()
      #== Physical Plan ==
      #*(4) Project [id#4L, name#1, name#5]
      #+- SortMergeJoin [id#0L], [id#4L], RightOuter
      #   :- *(2) Sort [id#0L ASC NULLS FIRST], false, 0
      #   :  +- Exchange hashpartitioning(id#0L, 200)
      #   :     +- *(1) Filter isnotnull(id#0L)
      #   :        +- Scan ExistingRDD[id#0L,name#1]
      #   +- *(3) Sort [id#4L ASC NULLS FIRST], false, 0
      #      +- Exchange hashpartitioning(id#4L, 200)
      #         +- Scan ExistingRDD[id#4L,name#5]
      

      【讨论】:

      • 感谢@Shu 的回答。但在我的场景中,我需要小表中的所有数据。但是你提到的连接类型给了我大表中的所有数据。我不想要。我想要完整的来自小表的数据和只匹配来自大表的数据我也想广播所以不应该有任何洗牌有什么出路。
      • 您可以将spark.sql.autoBroadcastJoinThreshold 增加到您的大表大小,默认为10MB,然后将执行broadcasthashjoin!
      • 再次感谢,但这也是不可能的,因为我的大尺寸超过 10gb,我认为 spark 不支持广播。如果有其他出路,请告诉我。
      猜你喜欢
      • 2020-02-20
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-04-07
      • 2017-02-13
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多