【问题标题】:Example of usage of a monoid for distributed computation with spark使用 monoid 进行分布式计算的示例
【发布时间】:2015-09-03 17:43:25
【问题描述】:

我有用户爱好数据(RDD[Map[String, Int]]),例如:

("food" -> 3, "music" -> 1),
("food" -> 2),
("game" -> 5, "twitch" -> 3, "food" -> 3)

我想计算它们的统计数据,并将统计数据表示为 Map[String, Array[Int]] 而数组大小为 5,例如:

("food" -> Array(0, 1, 2, 0, 0),
 "music" -> Array(1, 0, 0, 0, 0),
 "game" -> Array(0, 0, 0, 0, 1),
 "twitch" -> Array(0, 0, 1, 0 ,0))

foldLeft 似乎是正确的解决方案,但是RDD不能使用它,并且数据太大而无法转换为List/Array无法使用foldLeft,我怎么能做这份工作?

【问题讨论】:

    标签: scala apache-spark rdd


    【解决方案1】:

    诀窍是用一个类替换示例中的 Array,该类包含您想要的数据的某些部分的统计信息,并且可以与相同统计信息的另一个实例(覆盖数据的其他部分)组合到提供整个数据的统计数据。

    例如,如果您有一个涵盖数据 3、3、2 和 5 的统计数据,我认为它看起来类似于 (0, 1, 2, 0, 1),如果您有另一个涵盖数据 3、4、4 的实例,它看起来喜欢(0, 0, 1, 2,0)。现在您所要做的就是定义一个+ 操作,让您组合(0, 1, 2, 0, 1) + (0, 0, 1, 2, 0) = (0,1,3,2,1),涵盖数据3、3、2、5 和3、4、4。

    让我们这样做,然后调用类StatMonoid

    case class StatMonoid(flags: Seq[Int] = Seq(0,0,0,0,0)) {
        def + (other: StatMonoid) = 
            new StatMonoid( (0 to 4).map{idx => flags(idx) + other.flags(idx)})
    }
    

    该类包含5个计数器的序列,并定义一个+操作,让它与其他计数器组合。

    我们还需要一个方便的方法来构建它,这可以是StatMonoid 中的构造函数,也可以是伴生对象中的构造函数,或者只是一个简单的方法,如您所愿:

    def stat(value: Int): StatMonoid = value match {
        case 1 => new StatMonoid(Seq(1,0,0,0,0))
        case 2 => new StatMonoid(Seq(0,1,0,0,0))
        case 3 => new StatMonoid(Seq(0,0,1,0,0))
        case 4 => new StatMonoid(Seq(0,0,0,1,0))
        case 5 => new StatMonoid(Seq(0,0,0,0,1))
        case _ => throw new RuntimeException("illegal init value: $value")
    }
    

    这使我们可以轻松计算涵盖单个数据的统计实例,例如:

    scala> stat(4)
    res25: StatMonoid = StatMonoid(List(0, 0, 0, 1, 0))
    

    它还允许我们通过简单地添加它们来将它们组合在一起:

    scala> stat(1) + stat(2) + stat(2) + stat(5) + stat(5) + stat(5)
    res18: StatMonoid = StatMonoid(Vector(1, 2, 0, 0, 3))
    

    现在将此应用于您的示例,假设我们将您提到的数据作为 Map 的 RDD:

    val rdd =  sc.parallelize(List(Map("food" -> 3, "music" -> 1), Map("food" -> 2), Map("game" -> 5, "twitch" -> 3, "food" -> 3)))
    

    要找到每种食物的stat,我们需要做的就是将数据展平得到(“foodId” -> id)元组,将每个id转换为上面StatMonoid的实例,最后组合他们一起吃每种食物:

    import org.apache.spark.rdd.PairRDDFunctions
    rdd.flatMap(_.toList).mapValue(stat).reduceByKey(_ + _).collect
    

    产生:

    res24: Array[(String, StatMonoid)] = Array((game,StatMonoid(List(0, 0, 0, 0, 1))), (twitch,StatMonoid(List(0, 0, 1, 0, 0))), (music,StatMonoid(List(1, 0, 0, 0, 0))), (food,StatMonoid(Vector(0, 1, 2, 0, 0))))
    

    现在,顺便说一下,如果你想知道我为什么将这个类称为 StateMonoid,这仅仅是因为......它一个幺半群 :D,并且是一个非常常见和方便的类,称为产品。简而言之,幺半群只是可以以关联方式相互组合的事物,它们在 Spark 中开发时非常常见,因为它们自然地定义了可以在分布式从站上并行执行的操作,并聚集在一起形成最终结果。

    【讨论】:

    • 在这种情况下,因为您依赖于 int 加法的幺半群行为,您可以进一步定义一个组,这意味着您具有可交换性 - 这可能是一个很大的好处,因为这为您提供了广泛的灵活性为您的计算排序
    • 感谢您的评论!实际上,在reduceByKey 期间没有强制执行操作顺序,因此在这种情况下甚至需要交换性。对 Algebird 的快速检查也证实了他们将 IndexedSeq 作为 Monoid、Group 和 Ring 提供。
    • @Daenyth - 我正在阅读此评论,但不明白您定义组的意思,您能指出一些文档吗?
    • @jjayadeep 这是mathematical conceptHere's 更简短的解释。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2012-07-17
    • 1970-01-01
    • 1970-01-01
    • 2018-10-20
    • 2011-04-26
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多