【问题标题】:Spark SQL - does renaming columns affect partitioning?Spark SQL - 重命名列会影响分区吗?
【发布时间】:2020-08-05 16:01:59
【问题描述】:

我编写了一个显式连接 API,它使用 l_ 或 r_ 前缀重命名数据集中的列,以消除歧义并解决火花谱系问题,即 columnName1#77 在 columnName1#123、columnName2#55 中找不到... .

部分代码如下:

 def explicitJoin(other: Dataset[_], joinExpr: Column, joinType: String): ExplicitJoinExt = {
  val left = dataset.toDF(dataset.columns.map("l_" + _): _*)
  val right = other.toDF(other.columns.map("r_" + _): _*)

  new ExplicitJoinExt(left.join(right, joinExpr, joinType))
}

然后,用户可以传递一个连接表达式,例如 $"l_columnName1" === $"r_columnName1" && ... 这样他们就可以 100% 明确地知道他们要连接的列。

我遇到了一个新问题,即分区太大而无法加载到内存中(org.apache.spark.shuffle.FetchFailedException: Too large frame....)但读取输入(分区)数据集没有问题。

重命名列会影响输入数据集/数据帧的底层分区吗?

编辑

示例 1 - 常规连接

    case class A(a: Int, b: String)

    val l = (0 to 1000000).map(i => A(i, i.toString))
    val r = (0 to 1000000).map(i => A(i, i.toString))

    val ds1 = l.toDF.as[A].repartition(100, $"a")
    val ds2 = r.toDF.as[A].repartition(100, $"a")

    val joined = ds1.join(ds2, Seq("a"), "inner")

    joined.explain

    == Physical Plan ==
    *Project [a#2, b#3, b#15]
    +- *SortMergeJoin [a#2], [a#14], Inner
       :- *Sort [a#2 ASC NULLS FIRST], false, 0
       :  +- Exchange hashpartitioning(a#2, 100)
       :     +- LocalTableScan [a#2, b#3]
       +- *Sort [a#14 ASC NULLS FIRST], false, 0
          +- ReusedExchange [a#14, b#15], Exchange hashpartitioning(a#2, 100)

示例 2 - 使用我的(可能被误导的)ExplicitJoinExt 涉及重命名

    val joined = ds1
      .explicitJoin(ds2, $"l_a" === $"r_a", "inner") // Pimped on conversion to ExplicitJoin type, columns prefixed by l_ or r_. DS joined by expr and join type
      .selectLeft                                    // Select just left prefixed columns
      .toDF                                          // Convert back from ExplicitJoinExpr to DF
      .as[A]

    joined.explain


    == Physical Plan ==
    *Project [l_a#24 AS a#53, l_b#25 AS b#54]
    +- *BroadcastHashJoin [l_a#24], [r_a#29], Inner, BuildRight
       :- *Project [a#2 AS l_a#24, b#3 AS l_b#25]
       :  +- Exchange hashpartitioning(a#2, 100)
       :     +- LocalTableScan [a#2, b#3]
       +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
          +- *Project [a#14 AS r_a#29]
             +- Exchange hashpartitioning(a#14, 100)
                +- LocalTableScan [a#14]

那么,对于第二次加入,我们似乎再次重新分区 - 对吗?

【问题讨论】:

    标签: apache-spark apache-spark-sql


    【解决方案1】:

    不,我签入了 SPARK 2.3.1。重命名不会影响分区,至少在这种方法中不会:

     val ds11 = ds1.repartition(4) 
    

    不,我也检查了这个。重命名不会影响分区,至少在这种方法中不会:

     val ds11 = ds1.repartition(2, $"cityid")
    

    解释输出:

    val j = left.join(right, $"l_personid" === $"r_personid", "inner").explain
    

    ​显示,在我的例子中,2 和 4 是分区数:

    == Physical Plan ==
    *(2) BroadcastHashJoin [l_personid#641], [r_personid#647], Inner, 
    BuildRight, false
    :- *(2) Project [personid#612 AS l_personid#641, personname#613 AS 
    l_personname#642, cityid#614 AS l_cityid#643]
    :  +- Exchange hashpartitioning(cityid#614, 2)
    :     +- LocalTableScan [personid#612, personname#613, cityid#614]   
    +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
       +- *(1) Project [personid#612 AS r_personid#647, personname#613 AS r_personname#648, cityid#614 AS r_cityid#649]
          +- Exchange hashpartitioning(personid#612, 4)
             +- LocalTableScan [personid#612, personname#613, cityid#614]
    

    可以看到重命名的列被映射回它们的原始名称。

    在对其他帖子的测试中,我们能够确定依赖 AGGRegations 或 JOIN 的新操作将默认为 200,除非

     sqlContext.setConf("spark.sql.shuffle.partitions", "some val")
    

    在将其设置为所需值的代码中发出。如果是一小部分数据被JOINed等,那么结果可能会有所不同。

    【讨论】:

    • 我担心的是,我在加入之前在两个 DS 上对(比如)joinKey1 进行分区,但在加入 $"l_joinKey1" === $"r_joinKey1" 之前重命名为 l_joinKey1 和 r_joinKey 1 - 是Spark 足够聪明地意识到数据已经分区并位于原始列名上?
    • 我检查了场景(午餐期间),发现分区是根据数字继承的,而不是默认的 200。我推断没有问题。你试过解释吗?在改名之前和之后?
    • 这个问题:重命名列会影响输入数据集/数据帧的底层分区吗?,我相信已经回答了。
    • 从解释中可以清楚地看出,重命名的 cols 映射回原来的 cols。我最初应用的分区仍然有效。
    【解决方案2】:

    对于那些仍然遇到这个问题的人:重命名列确实会影响 Spark

    Seq((1, 2))
      .toDF("a", "b")
      .repartition($"b")
      .withColumnRenamed("b", "c")
      .repartition($"c")
      .explain()
    

    给出以下计划:

    == Physical Plan ==
    Exchange hashpartitioning(c#40, 10)
    +- *(1) Project [a#36, b#37 AS c#40]
       +- Exchange hashpartitioning(b#37, 10)
          +- LocalTableScan [a#36, b#37]
    

    此问题已在 this PR 中修复。

    【讨论】:

      猜你喜欢
      • 2014-09-17
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-02-07
      • 2011-05-21
      • 1970-01-01
      • 2021-02-18
      • 1970-01-01
      相关资源
      最近更新 更多