诀窍是用一个类替换示例中的 Array,该类包含您想要的数据的某些部分的统计信息,并且可以与相同统计信息的另一个实例(覆盖数据的其他部分)组合到提供整个数据的统计数据。
例如,如果您有一个涵盖数据 3、3、2 和 5 的统计数据,我认为它看起来类似于 (0, 1, 2, 0, 1),如果您有另一个涵盖数据 3、4、4 的实例,它看起来喜欢(0, 0, 1, 2,0)。现在您所要做的就是定义一个+ 操作,让您组合(0, 1, 2, 0, 1) + (0, 0, 1, 2, 0) = (0,1,3,2,1),涵盖数据3、3、2、5 和3、4、4。
让我们这样做,然后调用类StatMonoid:
case class StatMonoid(flags: Seq[Int] = Seq(0,0,0,0,0)) {
def + (other: StatMonoid) =
new StatMonoid( (0 to 4).map{idx => flags(idx) + other.flags(idx)})
}
该类包含5个计数器的序列,并定义一个+操作,让它与其他计数器组合。
我们还需要一个方便的方法来构建它,这可以是StatMonoid 中的构造函数,也可以是伴生对象中的构造函数,或者只是一个简单的方法,如您所愿:
def stat(value: Int): StatMonoid = value match {
case 1 => new StatMonoid(Seq(1,0,0,0,0))
case 2 => new StatMonoid(Seq(0,1,0,0,0))
case 3 => new StatMonoid(Seq(0,0,1,0,0))
case 4 => new StatMonoid(Seq(0,0,0,1,0))
case 5 => new StatMonoid(Seq(0,0,0,0,1))
case _ => throw new RuntimeException("illegal init value: $value")
}
这使我们可以轻松计算涵盖单个数据的统计实例,例如:
scala> stat(4)
res25: StatMonoid = StatMonoid(List(0, 0, 0, 1, 0))
它还允许我们通过简单地添加它们来将它们组合在一起:
scala> stat(1) + stat(2) + stat(2) + stat(5) + stat(5) + stat(5)
res18: StatMonoid = StatMonoid(Vector(1, 2, 0, 0, 3))
现在将此应用于您的示例,假设我们将您提到的数据作为 Map 的 RDD:
val rdd = sc.parallelize(List(Map("food" -> 3, "music" -> 1), Map("food" -> 2), Map("game" -> 5, "twitch" -> 3, "food" -> 3)))
要找到每种食物的stat,我们需要做的就是将数据展平得到(“foodId” -> id)元组,将每个id转换为上面StatMonoid的实例,最后组合他们一起吃每种食物:
import org.apache.spark.rdd.PairRDDFunctions
rdd.flatMap(_.toList).mapValue(stat).reduceByKey(_ + _).collect
产生:
res24: Array[(String, StatMonoid)] = Array((game,StatMonoid(List(0, 0, 0, 0, 1))), (twitch,StatMonoid(List(0, 0, 1, 0, 0))), (music,StatMonoid(List(1, 0, 0, 0, 0))), (food,StatMonoid(Vector(0, 1, 2, 0, 0))))
现在,顺便说一下,如果你想知道我为什么将这个类称为 StateMonoid,这仅仅是因为......它是一个幺半群 :D,并且是一个非常常见和方便的类,称为产品。简而言之,幺半群只是可以以关联方式相互组合的事物,它们在 Spark 中开发时非常常见,因为它们自然地定义了可以在分布式从站上并行执行的操作,并聚集在一起形成最终结果。