【问题标题】:Spark RDD Partition effectsSpark RDD 分区效果
【发布时间】:2021-02-24 16:16:42
【问题描述】:

我对重新分区操作感到困惑。请看下面的代码

import org.apache.spark._
import org.apache.log4j._


object FriendsByAge {

  def parseLine(line: String)={
    val fields = line.split(",")
    val age = fields(2).toInt
    val numFriends = fields(3).toInt

    (age, numFriends)
  }

  def main(args: Array[String]) = {

    Logger.getLogger("org").setLevel(Level.ERROR)

    val sc = new SparkContext("local[*]", "FriendsByAge")

    val lines = sc.textFile("./data/fakefriends-noheader.csv").repartition(1000)
    val rdd = lines.map(parseLine)

    println(rdd.getNumPartitions)

    val totalsByAge = rdd.mapValues(x=> (x,1)).reduceByKey((x, y) => (x._1+y._1, x._2 + y._2))

    println(totalsByAge.getNumPartitions)

    val averagesByAges = totalsByAge.mapValues(x => x._1/x._2)

    println(averagesByAges.getNumPartitions)
    val results = averagesByAges.collect()

    results.sortWith(_._2> _._2).foreach(println)
  }


}

这里我在将文件读入 1000 个分区后对 rdd 进行重新分区。由于 map 操作会创建新的 RDD,并且不会保留分区。我仍然看到相同数量的分区。

问题是我如何知道子 RDD 是否会保留父 RDD 分区?子RDD使repartition失效的条件是什么。

【问题讨论】:

  • 你实际上有多少条记录?
  • @thebluephantom 我有 1856 条记录。我正在尝试了解 spark 分区,所以我使用了小数据。
  • 请添加每次迭代的分区数。

标签: apache-spark rdd partitioning


【解决方案1】:

mapValues 不会改变已经生效的分区,它是一个narrow 转换。你有两个。

reduceByKey 是关联的。 Spark 在本地聚合并将这些结果发送到驱动程序或相关分区 - 在您的情况下。如果您不使用reduceByKey 上的参数来代替number of partitions,则为新RDD 保留相同数量的分区,尽管分布不同。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-06-16
    • 2017-05-03
    • 2017-02-17
    • 2020-09-18
    • 2016-01-04
    • 2016-07-17
    • 2016-08-01
    相关资源
    最近更新 更多