【发布时间】:2020-08-05 16:01:59
【问题描述】:
我编写了一个显式连接 API,它使用 l_ 或 r_ 前缀重命名数据集中的列,以消除歧义并解决火花谱系问题,即 columnName1#77 在 columnName1#123、columnName2#55 中找不到... .
部分代码如下:
def explicitJoin(other: Dataset[_], joinExpr: Column, joinType: String): ExplicitJoinExt = {
val left = dataset.toDF(dataset.columns.map("l_" + _): _*)
val right = other.toDF(other.columns.map("r_" + _): _*)
new ExplicitJoinExt(left.join(right, joinExpr, joinType))
}
然后,用户可以传递一个连接表达式,例如 $"l_columnName1" === $"r_columnName1" && ... 这样他们就可以 100% 明确地知道他们要连接的列。
我遇到了一个新问题,即分区太大而无法加载到内存中(org.apache.spark.shuffle.FetchFailedException: Too large frame....)但读取输入(分区)数据集没有问题。
重命名列会影响输入数据集/数据帧的底层分区吗?
编辑
示例 1 - 常规连接
case class A(a: Int, b: String)
val l = (0 to 1000000).map(i => A(i, i.toString))
val r = (0 to 1000000).map(i => A(i, i.toString))
val ds1 = l.toDF.as[A].repartition(100, $"a")
val ds2 = r.toDF.as[A].repartition(100, $"a")
val joined = ds1.join(ds2, Seq("a"), "inner")
joined.explain
== Physical Plan ==
*Project [a#2, b#3, b#15]
+- *SortMergeJoin [a#2], [a#14], Inner
:- *Sort [a#2 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(a#2, 100)
: +- LocalTableScan [a#2, b#3]
+- *Sort [a#14 ASC NULLS FIRST], false, 0
+- ReusedExchange [a#14, b#15], Exchange hashpartitioning(a#2, 100)
示例 2 - 使用我的(可能被误导的)ExplicitJoinExt 涉及重命名
val joined = ds1
.explicitJoin(ds2, $"l_a" === $"r_a", "inner") // Pimped on conversion to ExplicitJoin type, columns prefixed by l_ or r_. DS joined by expr and join type
.selectLeft // Select just left prefixed columns
.toDF // Convert back from ExplicitJoinExpr to DF
.as[A]
joined.explain
== Physical Plan ==
*Project [l_a#24 AS a#53, l_b#25 AS b#54]
+- *BroadcastHashJoin [l_a#24], [r_a#29], Inner, BuildRight
:- *Project [a#2 AS l_a#24, b#3 AS l_b#25]
: +- Exchange hashpartitioning(a#2, 100)
: +- LocalTableScan [a#2, b#3]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
+- *Project [a#14 AS r_a#29]
+- Exchange hashpartitioning(a#14, 100)
+- LocalTableScan [a#14]
那么,对于第二次加入,我们似乎再次重新分区 - 对吗?
【问题讨论】:
标签: apache-spark apache-spark-sql