通过广播左表更改 order of the tables,因为您正在执行 left join,因此要广播的右表(或)将连接类型更改为 right.
select /*+ broadcast(small)*/ small.* From small right outer join large
select /*+ broadcast(small)*/ small.* From large left outer join small
Example:
df=spark.createDataFrame([(1,'a')],['id','name'])
df1=spark.createDataFrame([(1,'a')],['id','name'])
#broadcasting on right df1 and performing left join
df.join(broadcast(df1),['id'],'left').explain()
#== Physical Plan ==
#*(2) Project [id#0L, name#1, name#5]
#+- *(2) BroadcastHashJoin [id#0L], [id#4L], LeftOuter, BuildRight
# :- Scan ExistingRDD[id#0L,name#1]
# +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
# +- *(1) Filter isnotnull(id#4L)
# +- Scan ExistingRDD[id#4L,name#5]
#broadcasting df1 and right join defaults to Sortmerge join
df.join(broadcast(df1),['id'],'right').explain()
#== Physical Plan ==
#*(4) Project [id#4L, name#1, name#5]
#+- SortMergeJoin [id#0L], [id#4L], RightOuter
# :- *(2) Sort [id#0L ASC NULLS FIRST], false, 0
# : +- Exchange hashpartitioning(id#0L, 200)
# : +- *(1) Filter isnotnull(id#0L)
# : +- Scan ExistingRDD[id#0L,name#1]
# +- *(3) Sort [id#4L ASC NULLS FIRST], false, 0
# +- Exchange hashpartitioning(id#4L, 200)
# +- Scan ExistingRDD[id#4L,name#5]