【问题标题】:Why is the fold action necessary in Spark?为什么在 Spark 中需要折叠操作?
【发布时间】:2016-04-04 10:52:48
【问题描述】:

我有一个愚蠢的问题涉及fold 并减少PySpark。我理解这两种方法之间的区别,但是,如果两者都需要应用的函数是可交换的幺半群,我无法找出fold cannot be substituted byreduce` 的示例。

另外,在fold的PySpark实现中使用acc = op(obj, acc),为什么使用这个操作顺序而不是acc = op(acc, obj)? (对我来说,第二个订单听起来更接近leftFold

干杯

托马斯

【问题讨论】:

    标签: apache-spark pyspark rdd reduce fold


    【解决方案1】:

    空 RDD

    RDD为空时不能替换:

    val rdd = sc.emptyRDD[Int]
    rdd.reduce(_ + _)
    // java.lang.UnsupportedOperationException: empty collection at   
    // org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$ ...
    
    rdd.fold(0)(_ + _)
    // Int = 0
    

    您当然可以将reduceisEmpty 上的条件结合起来,但这很丑。

    可变缓冲区

    折叠的另一个用例是使用可变缓冲区进行聚合。考虑遵循 RDD:

    import breeze.linalg.DenseVector
    
    val rdd = sc.parallelize(Array.fill(100)(DenseVector(1)), 8)
    

    假设我们想要所有元素的总和。一个天真的解决方案是使用+ 简单地减少:

    rdd.reduce(_ + _)
    

    不幸的是,它为每个元素创建了一个新向量。由于对象创建和随后的垃圾回收成本很高,因此使用可变对象可能会更好。 reduce 是不可能的(RDD 的不变性并不意味着元素的不变性),但可以通过fold 实现,如下所示:

    rdd.fold(DenseVector(0))((acc, x) => acc += x)
    

    这里使用零元素作为可变缓冲区,每个分区初始化一次,保持实际数据不变。

    acc = op(obj, acc),为什么用这个运算顺序而不是acc = op(acc, obj)

    SPARK-6416SPARK-7683

    【讨论】:

    • 感谢您的回答,但您能否详细说明一下可变缓冲区示例? PySpark中有类似的例子吗?
    • 由于每次调用fold 时都会创建zeroElement,并且它不是数据的一部分,因此可以安全地对其进行变异。 PySpark 对 RDD 中的数据变异可能产生的影响部分免疫,因此很难找到一个好的 Python 示例。虽然这是一个实施细节,但不是合同的一部分。
    猜你喜欢
    • 1970-01-01
    • 2016-10-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-01-19
    • 2014-04-07
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多