【问题标题】:How do I return Spark RDD partition values without a local iterator?如何在没有本地迭代器的情况下返回 Spark RDD 分区值?
【发布时间】:2018-06-11 09:53:10
【问题描述】:

我正在学习 Spark 及其与 RDD 分区分布相关的并行性。我有一台 4 CPU 机器,因此我有 4 个并行单元。要返回分区索引“0”的成员,我找不到在不强制 RDD 使用 localIterator 的情况下返回此分区的方法。

我习惯于非常简洁。有没有更简洁的方法来按分区过滤 RDD?以下两种方法都可以,但是看起来很笨拙。

scala> val data = 1 to 20
data: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)

scala> val distData = sc.parallelize(data)
distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[75] at parallelize at <console>:26

scala> distData.mapPartitionsWithIndex{
   (index,it) => {
      it.toList.map(x => if (index == 0) (x)).iterator
   }
}.toLocalIterator.toList.filterNot(
   _.isInstanceOf[Unit]
)
res107: List[AnyVal] = List(1, 2, 3, 4, 5)

scala> distData.mapPartitionsWithIndex{
   (index,it) => {
      it.toList.map(x => if (index == 0) (x)).iterator
   }
}.toLocalIterator.toList.filter(
   _ match{
      case x: Unit => false
      case x => true
   }
)
res108: List[AnyVal] = List(1, 2, 3, 4, 5)

【问题讨论】:

    标签: scala apache-spark iterator rdd


    【解决方案1】:
    distData.mapPartitionsWithIndex{ (index, it) => 
          if (index == 0) it else Array[Int]().iterator
    }
    

    你可以返回一个空的迭代器,它会正常工作。

    【讨论】:

    • 所以我注意到它返回的是一个 RDD 而不是一个数组。但是根据您的说法,您停留在 RDD 域(Spark 领域)中,因此您保持并行性,因此不要“采用(5)”并使用单线程 - 对吗? scala&gt; distData.mapPartitionsWithIndex{ (index, it) =&gt; if (index == 0) it else Array[Int]().iterator }.take(5) res24: Array[Int] = Array(1, 2, 3, 4, 5)
    • 您应该尽量使用 RDD 来执行操作,以便您可以轻松扩展您的应用程序。
    猜你喜欢
    • 2019-06-13
    • 2017-05-03
    • 1970-01-01
    • 2017-07-29
    • 1970-01-01
    • 2020-09-24
    • 2021-01-10
    • 2015-09-13
    • 1970-01-01
    相关资源
    最近更新 更多