【问题标题】:Scala - How to select the last element from an RDD?Scala - 如何从 RDD 中选择最后一个元素?
【发布时间】:2019-02-05 05:46:35
【问题描述】:

首先我有一个salesList: List[Sale],为了获得列表中最后一个销售的ID,我使用了lastOption

val lastSaleId: Option[Any] = salesList.lastOption.map(_.saleId)

但是现在我已经修改了一个使用List[Sale] 的方法来使用salesListRdd: List[RDD[Sale]]。所以我改变了获取上次销售 ID 的方式:

  val lastSaleId: Option[Any] = SparkContext
    .union(salesListRdd)
    .collect().toList
    .lastOption.map(_.saleId)

我不确定这是不是最好的方法。因为这里我仍然在将 RDD 收集到一个 List 中,然后将它带到驱动程序节点,这可能会导致驱动程序内存不足。

有没有办法从保留记录的初始顺序的 RDD 中获取最后一次销售的 ID?不是任何排序,而是 Sale 对象最初存储在 List 中的方式?

【问题讨论】:

  • 我们有takeRight(1)?在 RDD 上对吗?
  • @RamanMishra 如提到的here rdd is distributed and there is no way to tell which one is the last string without collecting it to one node

标签: scala apache-spark rdd


【解决方案1】:

至少有两种有效的解决方案。您可以将topzipWithIndex 一起使用:

def lastValue[T](rdd: RDD[T]): Option[T] = {
  rdd.zipWithUniqueId.map(_.swap).top(1)(Ordering[Long].on(_._1)).headOption.map(_._2)
}

或带有自定义键的top

 def lastValue[T](rdd: RDD[T]): Option[T] = {
   rdd.mapPartitionsWithIndex(
     (i, iter) => iter.zipWithIndex.map {  case (x, j) => ((i, j), x) }
   ).top(1)(Ordering[(Int, Long)].on(_._1)).headOption.map(_._2)
 }

前一个需要对zipWithIndex 执行额外操作,而后一个不需要。

使用前请务必了解限制。 Quoting the docs:

请注意,某些 RDD,例如那些由 groupBy() 返回的 RDD,不保证分区中元素的顺序。因此,不能保证分配给每个元素的唯一 ID,如果重新评估 RDD,甚至可能会更改。如果需要固定的排序来保证相同的索引分配,则应使用 sortByKey() 对 RDD 进行排序或将其保存到文件中。

特别是,根据确切的输入,Union 可能根本不会保留输入顺序。

【讨论】:

    【解决方案2】:

    您可以使用zipWithIndex 并按它对descending 进行排序,以便最后一条记录在顶部,然后取(1):

    salesListRdd
        .zipWithIndex()
        .map({ case (x, y) => (y, x) })
        .sortByKey(ascending = false)
        .map({ case (x, y) => y })
        .take(1)
    

    解决方案取自这里:http://www.swi.com/spark-rdd-getting-bottom-records/ 但是,它的效率非常低,因为它会进行大量的分区洗牌。

    【讨论】:

      猜你喜欢
      • 2017-11-28
      • 2016-01-30
      • 1970-01-01
      • 2017-01-20
      • 2013-08-18
      • 2021-08-10
      • 2020-05-16
      • 2020-02-12
      • 2011-06-04
      相关资源
      最近更新 更多