【问题标题】:What are the differences between slices and partitions of RDDs?RDD的切片和分区有什么区别?
【发布时间】:2014-08-07 19:13:52
【问题描述】:

我正在使用 Spark 的 Python API 并运行 Spark 0.8。

我正在存储大量的浮点向量 RDD,我需要针对整个集合执行一个向量的计算。

RDD 中的切片和分区之间有什么区别吗?

当我创建 RDD 时,我将 100 作为参数传递给它,这使它将 RDD 存储为 100 个切片并在执行计算时创建 100 个任务。我想知道对数据进行分区是否会通过使系统更有效地处理数据来提高切片之外的性能(即对分区执行操作与仅对切片 RDD 中的每个元素进行操作之间是否存在差异)。

例如,这两段代码有什么明显的区别吗?

rdd = sc.textFile(demo.txt, 100)

rdd = sc.textFile(demo.txt)
rdd.partitionBy(100)

【问题讨论】:

标签: python apache-spark


【解决方案1】:

我相信 slicespartitions 在 Apache Spark 中是一回事。

但是,您发布的两段代码之间存在细微但可能显着的差异。

此代码将尝试使用 100 个并发任务将 demo.txt 直接加载到 100 个分区中:

rdd = sc.textFile('demo.txt', 100)

对于未压缩的文本,它将按预期工作。但是如果你有一个demo.gz 而不是demo.txt,你最终会得到一个只有1个分区的RDD。无法并行读取 gzip 文件。

另一方面,下面的代码将首先将demo.txt打开到一个具有默认分区数的RDD中,然后它将显式地将数据重新分区为100个大小大致相等的分区.

rdd = sc.textFile('demo.txt')
rdd = rdd.repartition(100)

所以在这种情况下,即使使用demo.gz,您最终也会得到一个包含 100 个分区的 RDD。

作为旁注,我将您的 partitionBy() 替换为 repartition(),因为我相信这就是您正在寻找的。 partitionBy() 要求 RDD 是元组的 RDD。由于 repartition() 在 Spark 0.8.0 中不可用,您应该可以使用 coalesce(100, shuffle=True)

Spark 可以为 RDD 的每个分区运行 1 个并发任务,最多为集群中的核心数。所以如果你有一个有 50 个核心的集群,你希望你的 RDD 至少有 50 个分区(可能2-3x times that)。

从 Spark 1.1.0 开始,您可以检查 RDD 有多少个分区,如下所示:

rdd.getNumPartitions()  # Python API
rdd.partitions.size     // Scala API

在 1.1.0 之前,使用 Python API 执行此操作的方法是 rdd._jrdd.splits().size()

【讨论】:

    【解决方案2】:

    您可以按如下方式进行分区:

    import org.apache.spark.Partitioner
    
    val p = new Partitioner() {
      def numPartitions = 2
      def getPartition(key: Any) = key.asInstanceOf[Int]
    }
    recordRDD.partitionBy(p)
    

    【讨论】:

      猜你喜欢
      • 2014-06-19
      • 2015-08-27
      • 2019-07-24
      • 2016-05-27
      • 1970-01-01
      • 1970-01-01
      • 2011-08-16
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多