【问题标题】:Apache Spark RDD filter into two RDDsApache Spark RDD 过滤成两个 RDD
【发布时间】:2015-06-15 07:57:31
【问题描述】:

我需要将一个 RDD 分成两部分:

1 个满足条件的部分;另一部分没有。我可以在原始 RDD 上执行两次filter,但似乎效率低下。有没有办法可以做我所追求的?我在 API 和文献中都找不到任何东西。

【问题讨论】:

    标签: apache-spark rdd


    【解决方案1】:

    Spark 默认不支持这个。如果您事先缓存相同的数据,两次过滤并没有那么糟糕,而且过滤本身很快。

    如果真的只是两种不同的类型,可以使用辅助方法:

    implicit class RDDOps[T](rdd: RDD[T]) {
      def partitionBy(f: T => Boolean): (RDD[T], RDD[T]) = {
        val passes = rdd.filter(f)
        val fails = rdd.filter(e => !f(e)) // Spark doesn't have filterNot
        (passes, fails)
      }
    }
    
    val (matches, matchesNot) = sc.parallelize(1 to 100).cache().partitionBy(_ % 2 == 0)
    

    但只要您有多种类型的数据,只需将过滤后的数据分配给新的 val。

    【讨论】:

    • 不,Java 没有扩展方法。
    • 在运行过滤器之前不应该使用rdd.cache() 吗?这肯定会提高您的第二个过滤器的速度。
    • @MariusSoutier 它不会将整个数据集迭代两次吗?一次用于普通过滤器,第二次用于非过滤器?
    • @user401445 正确,它将评估 RDD 两次,因此不会提高效率。
    【解决方案2】:

    Spark RDD 没有这样的 api。

    这是一个基于pull request for rdd.span 的版本应该可以工作:

    import scala.reflect.ClassTag
    import org.apache.spark.rdd._
    
    def split[T:ClassTag](rdd: RDD[T], p: T => Boolean): (RDD[T], RDD[T]) = {
    
        val splits = rdd.mapPartitions { iter =>
            val (left, right) = iter.partition(p)
            val iterSeq = Seq(left, right)
            iterSeq.iterator
        }
    
        val left = splits.mapPartitions { iter => iter.next().toIterator}
    
        val right = splits.mapPartitions { iter => 
            iter.next()
            iter.next().toIterator
        }
        (left, right)
    }
    
    val rdd = sc.parallelize(0 to 10, 2)
    
    val (first, second) = split[Int](rdd, _ % 2 == 0 )
    
    first.collect
    // Array[Int] = Array(0, 2, 4, 6, 8, 10)
    

    【讨论】:

    • 我敢打赌这比两个过滤器更复杂,效率更低
    • @JustinPihony 是的,过滤器效率更高。
    • 这种方法会导致 rdd 被计算两次(除非它被预先缓存)。由于这对标准的“两个过滤器”没有任何好处,我认为不值得使用它。即使是内置的 randomSplit(...) 也会导致对给定 rdd 的多次评估。似乎没有办法(至少我还没有找到)创建一个返回两个 RDD 的 1-pass split 方法。
    【解决方案3】:

    关键是,你不想做一个过滤器,而是一个地图。

    (T) -> (Boolean, T)
    

    对不起,我在 Scala 语法方面效率低下。但想法是通过将答案集映射到键/值对来拆分答案集。 Key 可以是一个布尔值,指示它是否通过了“过滤器”谓词。

    您可以通过分区处理来控制不同目标的输出。只需确保您不将并行处理限制在下游的两个分区。

    另见How do I split an RDD into two or more RDDs?

    【讨论】:

    • 从简单的角度来看我喜欢这个
    【解决方案4】:

    如果您可以使用T 而不是RDD[T],那么您可以使用do this。否则,您可能会这样做:

    val data = sc.parallelize(1 to 100)
    val splitData = data.mapPartitions{iter => {
        val splitList = (iter.toList).partition(_%2 == 0)
        Tuple1(splitList).productIterator
      }
    }.map(_.asInstanceOf[Tuple2[List[Int],List[Int]]])
    

    然后,当你去执行一个动作时,你可能需要减少它来合并列表

    【讨论】:

    • 我很想知道为什么这被否决了,因为它是唯一真正回答 OP 问题的答案
    • (注意:我没有投反对票)您的方法很有趣,但它没有回答问题。 OP 要求partition(RDD[A], A => Boolean): (RDD[A], RDD[A]),你的将是partition(RDD[A], A => Boolean): RDD[List[A], List[A]]
    【解决方案5】:

    你可以使用subtract function(如果过滤操作太昂贵)。

    PySpark 代码:

    rdd1 = data.filter(filterFunction)
    
    rdd2 = data.subtract(rdd1)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2014-05-13
      • 2017-08-19
      • 1970-01-01
      • 2019-07-16
      • 2017-03-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多