您可能已经阅读过,Spark 有一个功能丰富的 API。 RDD 操作分为transformations 和actions,其中transformations 可以看作是接受RDD 并产生RDD 的函数:f(RDD) => RDD 和动作是接受RDD 并产生一些结果的函数(@987654325 @ 在collect 的情况下或Unit 在foreach 的情况下)。
关于如何将某个算法移植到 Spark 的总体思路是,通过结合转换和动作来找到一种使用 Spark 支持的函数范式来表达所述算法的方法,以达到预期的结果。
依赖于序列的算法,例如上面的 w-shingling,对并行化提出了挑战,因为元素的顺序存在隐含的依赖关系,有时难以表达,这种方式可以在不同的分区中操作.
在这种情况下,我使用索引作为保留序列的一种方式,同时可以根据转换来表达算法:
def kShingle(rdd:RDD[Char], n:Int): RDD[Seq[Char]] = {
def loop(base: RDD[(Long, Seq[Char])], cumm: RDD[(Long, Seq[Char])], i: Int): RDD[Seq[Char]] = {
if (i<=1) cumm.map(_._2) else {
val segment = base.map{case (idx, seq) => (idx-1, seq)}
loop(segment, cumm.join(segment).map{case (k,(v1,v2)) => (k,v1 ++ v2)}, i-1)
}
}
val seqRdd = rdd.map(char => Seq(char))
val indexed = seqRdd.zipWithIndex.map(_.swap)
loop(indexed, indexed, n)
}
Spark-shell 示例:
val rdd = sc.parallelize("Floppy Disk")
scala> kShingle(rdd,3).collect
res23: Array[Seq[Char]] = Array(List(F, l, o), List(i, s, k), List(l, o, p), List(o, p, p), List(p, p, y), List(p, y, ), List(y, , D), List( , D, i), List(D, i, s))