【问题标题】:Reverse iteraror RDD反向迭代 RDD
【发布时间】:2017-12-17 02:42:47
【问题描述】:

我有以下类型的纯 Scala 代码:

import breeze.numerics.log
import spire.random.Dist
import org.apache.commons.math3.distribution.NormalDistribution
import scala.collection.mutable.Buffer


def foo1(zs: Buffer[Double])={
  val S = zs.zip(zs.reverse)
    .map { case (x, y) =>log(x) * log(1 - y) }.sum
  S
}

val x = Dist.uniform(0.0, 1.0).sample[Buffer](10)
val y = x.sortWith(_<_)
val cdf=new NormalDistribution(0, 1)
val z = y.map(x_ => cdf.cumulativeProbability(x_))

foo1(z)

z 已排序,因为cdf 正在增加

我想为 Spark 重写它,但是对于 RDD 数据类型,没有反向方法。如何为 Spark 编写此代码?

def foo2(z_rdd: RDD[Double])={
    var S = z_rdd.zip(z_rdd.???)
    .map { case (x, y) =>log(x) * log(1 - y) }.sum
    S
}

其中???函数是反转的z_rdd

【问题讨论】:

    标签: scala apache-spark reverse rdd


    【解决方案1】:

    如果您尝试使用反向副本压缩 RDD,请记住,Spark zip 要求两个 RDD 均等分区:

    http://spark.apache.org/docs/latest/api/java/org/apache/spark/rdd/RDD.html#zip-org.apache.spark.rdd.RDD-scala.reflect.ClassTag-

    假设两个 RDD 有相同数量的分区每个分区中相同数量的元素(例如,一个是通过另一个映射生成的)。

    因此,完成rdd zip rdd.reversed的方法是:

    1. 如前所述,将 zipWithIndex 应用于 RDD
    2. 以相反的顺序对其进行排序,并将生成的 RDD 与索引一起压缩
    3. reduceByKeygroupByKey 来自第 1 步和第 2 步的 RDD 的并集,以索引为键

    我不确定这个食谱是否可以改进。

    【讨论】:

      【解决方案2】:

      您可以使用zipWithIndex为RDD的值添加索引,然后按索引反向排序:

      z_rdd.zip(
        z_rdd.zipWithIndex()
          .sortBy(_._2, ascending = false)
      ).map({ case (doubleA, (doubleB, _)) =>
        …
      })
      

      【讨论】:

        猜你喜欢
        • 2014-09-15
        • 1970-01-01
        • 1970-01-01
        • 2010-10-27
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2014-11-20
        • 1970-01-01
        相关资源
        最近更新 更多