【问题标题】:How do I read the value of a continuous index from spark RDD如何从 spark RDD 读取连续索引的值
【发布时间】:2017-05-12 23:32:32
【问题描述】:

我对 Spark Scala 从系列键获取第一个值有疑问,我创建了一个新的 RDD,如下所示:

[(a,1),(a,2),(a,3),(a,4),(b,1),(b,2),(a,3),(a,4),(a,5),(b,8),(b,9)]

我想获取这样的结果:

[(a,1),(b,1),(a,3),(b,8)]

如何使用 RDD 中的 scala 来做到这一点

【问题讨论】:

  • Spark 不维护你的集合的顺序,所以这是你现有数据结构无法做到的。您需要引入一个用于对数据进行排序并使用窗口函数的键:databricks.com/blog/2015/07/15/…
  • @lee 不太明白你想要做什么?您能解释一下您要获取的系列吗?
  • @SumeetSharma 我认为他只想保留每组具有相同键的连续杯中的第一个杯,但 RDD 不保持顺序,因此组会与初始序列不同跨度>
  • 你能分享你已经尝试过的吗?

标签: scala apache-spark rdd


【解决方案1】:

如 cmets 中所述,为了能够使用 RDD 中元素的顺序,您必须以某种方式在数据本身中表示此顺序。正是为了这个目的,创建了zipWithIndex——索引被添加到数据中;然后,通过一些操作(join 在带有修改索引的 RDD 上)我们可以得到你需要的:

// add index to RDD:
val withIndex = rdd.zipWithIndex().map(_.swap)

// create another RDD with indices increased by one, to later join each element with the previous one
val previous = withIndex.map { case (index, v) => (index + 1, v) }

// join RDDs, filter out those where previous "key" is identical
val result = withIndex.leftOuterJoin(previous).collect {
  case (i, (left, None)) => (i, left) // keep first element in RDD
  case (i, (left, Some((key, _)))) if left._1 != key => (i, left) // keep only elements where the previous key is different
}.sortByKey().values // if you want to preserve the original order...

result.collect().foreach(println)
// (a,1)
// (b,1)
// (a,3)
// (b,8)

【讨论】:

  • 感谢 Tzach Zohar。这是我需要的正确答案。谢谢!
  • 很高兴为您提供帮助;表示有用答案的最佳方式是接受(通过旁边的绿色 V 标记)或投票;)
猜你喜欢
  • 1970-01-01
  • 2017-07-06
  • 2017-10-22
  • 2015-01-05
  • 2021-05-26
  • 1970-01-01
  • 2016-03-13
  • 2018-04-08
相关资源
最近更新 更多