【问题标题】:Weird behavior of DataFrame operationsDataFrame 操作的奇怪行为
【发布时间】:2017-07-27 13:06:48
【问题描述】:

考虑代码:

val df1 = spark.table("t1").filter(col("c1")=== lit(127))
val df2 = spark.sql("select x,y,z from  ORCtable")
val df3 = df1.join(df2.toDF(df2.columns.map(_ + "_R"): _*),
  trim(upper(coalesce(col("y_R"), lit("")))) === trim(upper(coalesce(col("a"), lit("")))), "leftouter")
df3.select($"y_R",$"z_R").show(500,false)

这会产生警告WARN TaskMemoryManager: Failed to allocate a page (2097152 bytes), try again.代码失败java.lang.OutOfMemoryError: GC overhead limit exceeded

但如果我运行以下代码:

val df1 = spark.table("t1").filter(col("c1")=== lit(127))
val df2 = spark.sql("select x,y,z from  ORCtable limit 2000000")//only difference here
//ORC table has 1651343 rows so doesn't exceed limit 2000000
val df3 = df1.join(df2.toDF(df2.columns.map(_ + "_R"): _*),
  trim(upper(coalesce(col("y_R"), lit("")))) === trim(upper(coalesce(col("a"), lit("")))), "leftouter")
df3.select($"y_R",$"z_R").show(500,false)

这会产生正确的输出。我不知道为什么会发生这种情况以及发生了什么变化。有人可以帮忙解释一下吗?

【问题讨论】:

    标签: scala apache-spark dataframe orc


    【解决方案1】:

    回答我自己的问题:Spark physical execution plan 对于生成相同dataframe 的两种方式是不同的,可以通过调用.explain() 方法来检查。

    第一种方式使用broadcast-hash join,这会导致java.lang.OutOfMemoryError: GC overhead limit exceeded,而后一种方式运行sort-merge join,这通常速度较慢,但​​不会对垃圾收集造成太大压力。

    物理执行计划的这种差异是由df2 dataframe 上的附加filter 操作引入的。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2020-06-25
      • 1970-01-01
      • 2018-04-22
      • 1970-01-01
      • 1970-01-01
      • 2018-10-26
      • 1970-01-01
      相关资源
      最近更新 更多