【问题标题】:Comparing two data frames in Spark (performance)比较 Spark 中的两个数据帧(性能)
【发布时间】:2019-06-02 07:00:21
【问题描述】:

我需要在我的 spark 应用程序中比较两个数据帧。我浏览了以下帖子。 How to obtain the difference between two DataFrames?

但是,我不明白为什么最佳答案中的方法

df1.unionAll(df2).except(df1.intersect(df2))

比问题中的那个好

df1.except(df2).union(df2.except(df1))

谁能解释一下? 据我了解,后者适用于两个较小的数据集,而前者适用于一个大型数据集。是因为后者做了一个 distinct 作为联合的一部分吗?即使那样,如果两个数据帧更有可能具有相同的记录,我们在后一种情况下处理的是一个小数据集。

【问题讨论】:

  • 作为最初问题的提问者,这里有一些澄清:两者都不是“更好”,问题不是关于正确性、效率或性能,而是关于最合适的惯用方法。 (由我)选择了“正确”的答案,因为它解决了不存在精确操作的事实,并且需要结合其他逻辑操作。正如下面提交的答案所示,在效率等方面“最佳”将始终取决于上下文。
  • @Ajay Vepakomma,我正在使用代码,但不接受 except,引发 invalid 语法错误。

标签: java scala performance apache-spark apache-spark-sql


【解决方案1】:

假设df1df2(大小分别为N 和M)都太大而无法广播,但df1df2 之间没有重叠。

我们称它为结果di。在这种情况下,df1.intersect(df2) 将需要 N + M 行的完全洗牌,但是输出的大小将等于 0。在这种情况下,df1.unionAll(df2).except(di) 可以作为广播连接执行(这种优化可能需要adaptive execution除非用户强制执行特定计划)。还需要注意的是,这样的计划不需要缓存。

相比之下,df1.except(df2).union(df2.except(df1)) 的成本在交叉口的基数方面是恒定的。

同时,如果d1 的广播太大,它已经有一个与except 兼容的分区,所以剩下的查询应该不需要额外的shuffle。

【讨论】:

    【解决方案2】:

    第一件事 - unionAll 在 Spark 的第 2 版中已被弃用。请改用union,就像你在第二个sn-p中所做的那样。

    其次,在问题的答案中,您参考的是,没有信息表明第一段代码更好。我准备了这样一个场景。对我来说,第一个用了 31 秒,第二个用了 18 秒。在我的例子中,df1 有大约 300 万行,df2 有大约 100 万行,每个 5 列。

    如果我们现在分析第一个查询的优化逻辑执行计划:

    == Optimized Logical Plan ==
    GlobalLimit 21
    +- LocalLimit 21
       +- Aggregate [_c0#10, _c1#11, _c2#12, _c3#13, _c4#14], [cast(_c0#10 as string) AS _c0#67, cast(_c1#11 as string) AS _c1#68, cast(_c2#12 as string) AS _c2#69, cast(_c3#13 as string) AS _c3#70, cast(_c4#14 as string) AS _c4#71]
          +- Join LeftAnti, (((((_c0#10 <=> _c0#52) && (_c1#11 <=> _c1#53)) && (_c2#12 <=> _c2#54)) && (_c3#13 <=> _c3#55)) && (_c4#14 <=> _c4#56))
             :- Union
             :  :- Relation[_c0#10,_c1#11,_c2#12,_c3#13,_c4#14] csv
             :  +- Project [_c0#30, _c1#31, _c2#32, _c3#33, cast(_c4#34 as double) AS _c4#40]
             :     +- Relation[_c0#30,_c1#31,_c2#32,_c3#33,_c4#34] csv
             +- Aggregate [_c0#52, _c1#53, _c2#54, _c3#55, _c4#56], [_c0#52, _c1#53, _c2#54, _c3#55, _c4#56]
                +- Join LeftSemi, (((((_c0#52 <=> _c0#30) && (_c1#53 <=> _c1#31)) && (_c2#54 <=> _c2#32)) && (_c3#55 <=> _c3#33)) && (_c4#56 <=> _c4#46))
                   :- Relation[_c0#52,_c1#53,_c2#54,_c3#55,_c4#56] csv
                   +- Project [_c0#30, _c1#31, _c2#32, _c3#33, cast(_c4#34 as double) AS _c4#46]
                      +- Relation[_c0#30,_c1#31,_c2#32,_c3#33,_c4#34] csv
    

    我们可以看到,有UnionJoin(intersection)同时运行,代价非常高,尤其是Union,而对于第二个查询:

    == Optimized Logical Plan ==
    GlobalLimit 21
    +- LocalLimit 21
       +- Union
          :- LocalLimit 21
          :  +- Aggregate [_c0#10, _c1#11, _c2#12, _c3#13, _c4#14], [cast(_c0#10 as string) AS _c0#120, cast(_c1#11 as string) AS _c1#121, cast(_c2#12 as string) AS _c2#122, cast(_c3#13 as string) AS _c3#123, cast(_c4#14 as string) AS _c4#124]
          :     +- Join LeftAnti, (((((_c0#10 <=> _c0#30) && (_c1#11 <=> _c1#31)) && (_c2#12 <=> _c2#32)) && (_c3#13 <=> _c3#33)) && (_c4#14 <=> _c4#98))
          :        :- Relation[_c0#10,_c1#11,_c2#12,_c3#13,_c4#14] csv
          :        +- Project [_c0#30, _c1#31, _c2#32, _c3#33, cast(_c4#34 as double) AS _c4#98]
          :           +- Relation[_c0#30,_c1#31,_c2#32,_c3#33,_c4#34] csv
          +- LocalLimit 21
             +- Aggregate [_c0#30, _c1#31, _c2#32, _c3#33, _c4#104], [cast(_c0#30 as string) AS _c0#130, cast(_c1#31 as string) AS _c1#131, cast(_c2#32 as string) AS _c2#132, cast(_c3#33 as string) AS _c3#133, cast(_c4#104 as string) AS _c4#134]
                +- Join LeftAnti, (((((_c0#30 <=> _c0#10) && (_c1#31 <=> _c1#11)) && (_c2#32 <=> _c2#12)) && (_c3#33 <=> _c3#13)) && (_c4#104 <=> _c4#14))
                   :- Project [_c0#30, _c1#31, _c2#32, _c3#33, cast(_c4#34 as double) AS _c4#104]
                   :  +- Relation[_c0#30,_c1#31,_c2#32,_c3#33,_c4#34] csv
                   +- Relation[_c0#10,_c1#11,_c2#12,_c3#13,_c4#14] csv
    

    有两个LeftAnti 同时运行(相对恭维)。这样占用的空间更少,效率更高。这可以在 SparkUI 中看到:

    第一个查询: 第二个查询:

    在第一种情况下,阶段 7 - Union 成本最高,而在第二种情况下,阶段 42 和 41(上图)相对较快。

    【讨论】:

      猜你喜欢
      • 2018-01-15
      • 1970-01-01
      • 2020-06-07
      • 1970-01-01
      • 2019-07-21
      • 1970-01-01
      • 1970-01-01
      • 2017-04-26
      相关资源
      最近更新 更多