【问题标题】:Add random elements to keyed RDD from the same RDD从同一个 RDD 向 keyed RDD 添加随机元素
【发布时间】:2016-11-04 19:38:35
【问题描述】:

假设我们有一个键控 RDD RDD[(Int, List[String])],其中包含数千个键和数千到数百万个值:

val rdd = sc.parallelize(Seq(
  (1, List("a")),
  (2, List("a", "b")),
  (3, List("b", "c", "d")),
  (4, List("f"))))

对于每个键,我需要从其他键添加随机值。要添加的元素数量会有所不同,具体取决于键中的元素数量。所以输出看起来像:

val rdd2: RDD[(Int, List[String])] = sc.parallelize(Seq(
  (1, List("a", "c")),
  (2, List("a", "b", "b", "c")),
  (3, List("b", "c", "d", "a", "a", "f")),
  (4, List("f", "d"))))

我想出了以下显然不是很有效的解决方案(注意:展平和聚合是可选的,我擅长展平数据):

// flatten the input RDD
val rddFlat: RDD[(Int, String)] = rdd.flatMap(x => x._2.map(s => (x._1, s)))
// calculate number of elements for each key
val count = rddFlat.countByKey().toSeq
// foreach key take samples from the input RDD, change the original key and union all RDDs
val rddRandom: RDD[(Int, String)] = count.map { x =>
  (x._1, rddFlat.sample(withReplacement = true, x._2.toDouble / count.map(_._2).sum, scala.util.Random.nextLong()))
}.map(x => x._2.map(t => (x._1, t._2))).reduce(_.union(_))

// union the input RDD with the random RDD and aggregate
val rddWithRandomData: RDD[(Int, List[String])] = rddFlat
    .union(rddRandom)
    .aggregateByKey(List[String]())(_ :+ _, _ ++ _)

实现这一目标的最有效和最优雅的方法是什么?
我使用的是 Spark 1.4.1。

【问题讨论】:

  • 根据元素的数量,您可以先获取一组所有可能的元素,然后将其广播并使用映射函数将它们添加到每个条目中。
  • @LiMuBei 数量不等,可能是一千或几千万。澄清了这一点。
  • 您能否更详细地解释在您的示例中如何从 rdd1 到 rdd2?随机选择过程在那里如何运作?您还提到了“键中元素的数量”,但只有一个元素,即数字。
  • @maasg rdd2 只是一个例子,基本上它意味着从任何其他键中获取这个数量的任何随机值并将它们放入正在处理的当前键中。 “键中的元素数”是List中的元素数。
  • @AlexandrNikitin 确实是 rdd2 这是一个例子,所以我的问题仍然存在:rdd2 是如何从 rdd1 创建的?例如rdd2 (1, List("a", "c")) = rdd1(1, List("a")) + (random choice from RDD1 = 3) => List("b","c","d") => random pick # elements = 1 => random pick(1) element => List("c") ==> List("a","c") ???我们首先需要放下算法,然后才能尝试用一些技术解决它。

标签: scala apache-spark rdd


【解决方案1】:

通过查看当前方法,并且为了确保解决方案的可扩展性,重点领域可能应该是提出一种可以以分布式方式完成的采样机制,从而无需收集钥匙还给司机。

简而言之,我们需要一种分布式方法来对所有值的加权样本。

我的建议是创建一个矩阵keys x values,其中每个单元格是为该键选择值的概率。然后,我们可以随机对该矩阵进行评分,并选择那些落在概率范围内的值。

让我们为此编写一个基于 spark 的算法:

// sample data to guide us. 
//Note that I'm using distinguishable data across keys to see how the sample data distributes over the keys 
val data = sc.parallelize(Seq(
  (1, List("A", "B")),
  (2, List("x", "y", "z")),
  (3, List("1", "2", "3", "4")),
  (4, List("foo", "bar")),
  (5, List("+")),
  (6, List())))

val flattenedData = data.flatMap{case (k,vlist) => vlist.map(v=> (k,v))}
val values = data.flatMap{case (k,list) => list}
val keysBySize = data.map{case (k, list) => (k,list.size)}
val totalElements = keysBySize.map{case (k,size) => size}.sum
val keysByProb = keysBySize.mapValues{size => size.toDouble/totalElements}
val probMatrix = keysByProb.cartesian(values)
val scoredSamples = probMatrix.map{case ((key, prob),value) => 
    ((key,value),(prob, Random.nextDouble))}

ScoredSamples 看起来像这样:

((1,A),(0.16666666666666666,0.911900315814998))
((1,B),(0.16666666666666666,0.13615047422122906))
((1,x),(0.16666666666666666,0.6292430257377151))
((1,y),(0.16666666666666666,0.23839887096373114))
((1,z),(0.16666666666666666,0.9174808344986465))

...

val samples = scoredSamples.collect{case (entry, (prob,score)) if (score<prob) => entry}

samples 看起来像这样:

(1,foo)
(1,bar)
(2,1)
(2,3)
(3,y)
...

现在,我们将采样数据与原始数据合并,得到最终结果。

val result = (flattenedData union samples).groupByKey.mapValues(_.toList)

result.collect()
(1,List(A, B, B))
(2,List(x, y, z, B))
(3,List(1, 2, 3, 4, z, 1))
(4,List(foo, bar, B, 2))
(5,List(+, z))

鉴于所有算法都编写为原始数据的一系列转换(请参阅下面的 DAG),并具有最少的改组(仅最后一个 groupByKey,这是在最小结果集上完成的),它应该是可扩展的.唯一的限制是groupByKey 阶段中​​每个键的值列表,这只是为了符合问题使用的表示。

【讨论】:

  • 谢谢!真棒答案!我还不能说任何关于性能的事情。主观上它看起来和工作得更快。
猜你喜欢
  • 2017-07-28
  • 1970-01-01
  • 2018-12-05
  • 1970-01-01
  • 2014-09-11
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多