【发布时间】:2016-11-04 19:38:35
【问题描述】:
假设我们有一个键控 RDD RDD[(Int, List[String])],其中包含数千个键和数千到数百万个值:
val rdd = sc.parallelize(Seq(
(1, List("a")),
(2, List("a", "b")),
(3, List("b", "c", "d")),
(4, List("f"))))
对于每个键,我需要从其他键添加随机值。要添加的元素数量会有所不同,具体取决于键中的元素数量。所以输出看起来像:
val rdd2: RDD[(Int, List[String])] = sc.parallelize(Seq(
(1, List("a", "c")),
(2, List("a", "b", "b", "c")),
(3, List("b", "c", "d", "a", "a", "f")),
(4, List("f", "d"))))
我想出了以下显然不是很有效的解决方案(注意:展平和聚合是可选的,我擅长展平数据):
// flatten the input RDD
val rddFlat: RDD[(Int, String)] = rdd.flatMap(x => x._2.map(s => (x._1, s)))
// calculate number of elements for each key
val count = rddFlat.countByKey().toSeq
// foreach key take samples from the input RDD, change the original key and union all RDDs
val rddRandom: RDD[(Int, String)] = count.map { x =>
(x._1, rddFlat.sample(withReplacement = true, x._2.toDouble / count.map(_._2).sum, scala.util.Random.nextLong()))
}.map(x => x._2.map(t => (x._1, t._2))).reduce(_.union(_))
// union the input RDD with the random RDD and aggregate
val rddWithRandomData: RDD[(Int, List[String])] = rddFlat
.union(rddRandom)
.aggregateByKey(List[String]())(_ :+ _, _ ++ _)
实现这一目标的最有效和最优雅的方法是什么?
我使用的是 Spark 1.4.1。
【问题讨论】:
-
根据元素的数量,您可以先获取一组所有可能的元素,然后将其广播并使用映射函数将它们添加到每个条目中。
-
@LiMuBei 数量不等,可能是一千或几千万。澄清了这一点。
-
您能否更详细地解释在您的示例中如何从 rdd1 到 rdd2?随机选择过程在那里如何运作?您还提到了“键中元素的数量”,但只有一个元素,即数字。
-
@maasg rdd2 只是一个例子,基本上它意味着从任何其他键中获取这个数量的任何随机值并将它们放入正在处理的当前键中。 “键中的元素数”是List中的元素数。
-
@AlexandrNikitin 确实是 rdd2 这是一个例子,所以我的问题仍然存在:rdd2 是如何从 rdd1 创建的?例如
rdd2 (1, List("a", "c")) = rdd1(1, List("a")) + (random choice from RDD1 = 3) => List("b","c","d") => random pick # elements = 1 => random pick(1) element => List("c") ==> List("a","c")???我们首先需要放下算法,然后才能尝试用一些技术解决它。
标签: scala apache-spark rdd