【问题标题】:Performance impact of RDD API vs UDFs mixed with DataFrame APIRDD API 与 UDF 与 DataFrame API 混合的性能影响
【发布时间】:2016-12-16 02:15:29
【问题描述】:

(Scala 特有的问题。)

虽然 Spark 文档鼓励在可能的情况下使用 DataFrame API,但如果 DataFrame API 不足,通常需要选择回退到 RDD API 或使用 UDF。这两种替代方案之间是否存在固有的性能差异?

RDD 和 UDF 的相似之处在于它们都不能从 Catalyst 和 Tungsten 优化中受益。是否有任何其他开销,如果有,这两种方法之间是否存在差异?

举一个具体的例子,假设我有一个 DataFrame,其中包含一列具有自定义格式的文本数据(不适合正则表达式匹配)。我需要解析该列并添加一个包含结果标记的新向量列。

【问题讨论】:

    标签: scala performance apache-spark apache-spark-sql rdd


    【解决方案1】:

    它们都不能从 Catalyst 和 Tungsten 优化中受益

    这并不完全正确。虽然 UDF 没有从 Tungsten 优化中受益(可以说简单的 SQL 转换在那里也没有得到巨大的提升),但您仍然可以从 Catalyst 提供的执行计划优化中受益。让我们用一个简单的例子来说明这一点(注意:Spark 2.0 和 Scala。不要将此推断到早期版本,尤其是 PySpark):

    val f = udf((x: String) => x == "a")
    val g = udf((x: Int) => x + 1)
    
    val df = Seq(("a", 1), ("b", 2)).toDF
    
    df
      .groupBy($"_1")
      .agg(sum($"_2").as("_2"))
      .where(f($"_1"))
      .withColumn("_2", g($"_2"))
      .select($"_1")
      .explain
    
    // == Physical Plan ==
    // *HashAggregate(keys=[_1#2], functions=[])
    // +- Exchange hashpartitioning(_1#2, 200)
    //    +- *HashAggregate(keys=[_1#2], functions=[])
    //       +- *Project [_1#2]
    //          +- *Filter UDF(_1#2)
    //             +- LocalTableScan [_1#2, _2#3]
    

    执行计划向我们展示了几件事:

    • Selection 已在聚合前下推。
    • Projection 在聚合之前已被下推,并有效地移除了第二个 UDF 调用。

    根据数据和管道,这几乎可以免费提供显着的性能提升。

    话虽如此,RDD 和 UDF 都需要在安全和不安全之间进行迁移,而后者的灵活性要低得多。尽管如此,如果您唯一需要的是一个简单的map 类行为而不初始化昂贵的对象(如数据库连接),那么 UDF 就是要走的路。

    在稍微复杂一些的场景中,您可以轻松地下拉到通用 Dataset 并保留 RDDs 以供您确实需要访问自定义分区等一些低级功能的情况。

    【讨论】:

    • 这里物理计划中显示的优化更多是对DF上执行的整个系列操作的催化剂优化。但是 f($"_1") 和 g($"_2") 不会以任何方式被 spark 优化,spark 所知道的是,如果我调用该函数,它会返回一个值。假设我们对列的函数进行了相当优化,那么 SQL 查询可以是高效的。我的理解正确吗?但是对于 spark 这个函数是一个黑盒
    【解决方案2】:

    (注意:我没有测量为此)

    给我,随机播放和(de)序列化是主要成本。但是,在这些之后,具有清洁的代码是最重要的。考虑到这一点:

    使用RDD操作的主要缺点是需要/进入完整JVM对象的(de)序列化。使用UDF可能仅(de)序列化所需列。请注意,这是在处理诸如镶木地板的列的课题数据时,我不知道的其他数据格式,但希望在许多情况下都有类似的顺序。

    因此,如果您的算法主要过滤和播种操作,并且/或可以用DataFrame Op和本地UDF表达,您应该使用这些。但是,如果您的算法需要在许多列上进行复杂的处理,则可能更好地支付光序列化上面,并在JVM对象上执行清洁和高效的Scala代码。

    所以,在我实现复杂数学算法的个人体验中,我通常用两个步骤拆分代码:

    1. Pure DataFrame OP尽可能多地进行过滤,加入和GroupBy Op。在极少数情况下,当需要使用DataFrame方法时所需的特定本地OP时,我可以使用UDF(并且如果它仅需要很少的列)
    2. 然后转换为RDD和使用(平面)地图OP,用于数学和或复杂查找部分 Li>

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-11-08
      • 1970-01-01
      • 2016-09-26
      • 2019-08-29
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多