【问题标题】:Converting sequential code into parallel将顺序代码转换为并行代码
【发布时间】:2014-11-16 15:53:55
【问题描述】:

我正在尝试使用 Spark 来理解 MapReduce。
在做一些简单的练习时,我没有问题,按顺序进行,但在并行化我的代码时,我遇到了困难。

考虑以下示例:

var = "Far far away, behind the word mountains, far from the countries Vokalia and Consonantia, there live the blind texts."
k = 10;

for x in range(0, len(var)):
    print(var[0+x:k+x])

它将文本拆分为 10 个字符 (w-shingling)。
使用 Spark 将其“转换”为并行代码的正确方法是什么?

如何编码for loop? Spark 是否提供循环?

我只需要了解整个概念。

PS:我已经阅读了文档,我知道 RDD 是什么等等。我只是不知道如何将顺序代码“转换”为并行代码。

【问题讨论】:

    标签: java python mapreduce apache-spark


    【解决方案1】:

    您可能已经阅读过,Spark 有一个功能丰富的 API。 RDD 操作分为transformationsactions,其中transformations 可以看作是接受RDD 并产生RDD 的函数:f(RDD) => RDD 和动作是接受RDD 并产生一些结果的函数(@987654325 @ 在collect 的情况下或Unitforeach 的情况下)。

    关于如何将某个算法移植到 Spark 的总体思路是,通过结合转换和动作来找到一种使用 Spark 支持的函数范式来表达所述算法的方法,以达到预期的结果。

    依赖于序列的算法,例如上面的 w-shingling,对并行化提出了挑战,因为元素的顺序存在隐含的依赖关系,有时难以表达,这种方式可以在不同的分区中操作.

    在这种情况下,我使用索引作为保留序列的一种方式,同时可以根据转换来表达算法:

    def kShingle(rdd:RDD[Char], n:Int): RDD[Seq[Char]] = {
        def loop(base: RDD[(Long, Seq[Char])], cumm: RDD[(Long, Seq[Char])], i: Int): RDD[Seq[Char]] = {
           if (i<=1) cumm.map(_._2) else {
            val segment =  base.map{case (idx, seq) => (idx-1, seq)}
            loop(segment, cumm.join(segment).map{case (k,(v1,v2)) => (k,v1 ++ v2)}, i-1)
            }
        }
        val seqRdd = rdd.map(char => Seq(char))
        val indexed = seqRdd.zipWithIndex.map(_.swap)
    
        loop(indexed, indexed, n)
    }
    

    Spark-shell 示例:

    val rdd = sc.parallelize("Floppy Disk")
    scala> kShingle(rdd,3).collect
    res23: Array[Seq[Char]] = Array(List(F, l, o), List(i, s, k), List(l, o, p), List(o, p, p), List(p, p, y), List(p, y,  ), List(y,  , D), List( , D, i), List(D, i, s))
    

    【讨论】:

      猜你喜欢
      • 2017-10-15
      • 2013-09-19
      • 2013-07-12
      • 1970-01-01
      • 2018-08-16
      • 1970-01-01
      • 2016-03-19
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多