【问题标题】:Spark: persist and repartition orderSpark:持久化和重新分区顺序
【发布时间】:2016-02-14 11:09:12
【问题描述】:

我有以下代码:

val data = input.map{... }.persist(StorageLevel.MEMORY_ONLY_SER).repartition(2000)

我想知道如果我先进行重新分区有什么区别:

val data = input.map{... }.repartition(2000).persist(StorageLevel.MEMORY_ONLY_SER)

调用reparation和persist的顺序有区别吗?谢谢!

【问题讨论】:

    标签: apache-spark rdd partition persist


    【解决方案1】:

    是的,有区别。

    在第一种情况下,您会在 map 阶段后获得持久 RDD。这意味着每次访问data都会触发repartition

    在第二种情况下,您在重新分区后缓存。当data 被访问并且之前已经物化时,没有额外的工作要做。

    为了证明让我们做一个实验:

    import  org.apache.spark.storage.StorageLevel
    
    val data1 = sc.parallelize(1 to 10, 8)
      .map(identity)
      .persist(StorageLevel.MEMORY_ONLY_SER)
      .repartition(2000)
    data1.count()
    
    val data2 = sc.parallelize(1 to 10, 8)
      .map(identity)
      .repartition(2000)
      .persist(StorageLevel.MEMORY_ONLY_SER)
    data2.count()
    

    并查看存储信息:

    sc.getRDDStorageInfo
    
    // Array[org.apache.spark.storage.RDDInfo] = Array(
    //   RDD "MapPartitionsRDD" (17) StorageLevel:
    //       StorageLevel(false, true, false, false, 1);
    //     CachedPartitions: 2000; TotalPartitions: 2000; MemorySize: 8.6 KB; 
    //     ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B,
    //   RDD "MapPartitionsRDD" (7) StorageLevel:
    //      StorageLevel(false, true, false, false, 1);
    //    CachedPartitions: 8; TotalPartitions: 8; MemorySize: 668.0 B; 
    //    ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B)
    

    如您所见,有两个持久化 RDD,一个有 2000 个分区,一个有 8 个。

    【讨论】:

    • 我迟到了两年才发表评论。但 data1.getNumPartitions 和 data2.getNumPartitions 都返回 2000
    • @hadooper 它应该。中间对象不同,而不是最终对象。
    • 你能解释一下中间对象吗?
    • @hadooper 在一个场景中,Spark 将 RDD 保存在 8 个分区中,而在另一种场景中,首先进行洗牌,然后将 RDD 保存在 2000 个分区中。如果在第二种方法之后需要 2000 个分区更好,因为 shuffle 执行一次(在持久化之前)。
    • @hadooper 保留的分区数(8 vs 2000)
    猜你喜欢
    • 2016-07-17
    • 1970-01-01
    • 1970-01-01
    • 2018-06-29
    • 2019-03-02
    • 1970-01-01
    • 2020-03-12
    • 2019-08-26
    • 2020-03-19
    相关资源
    最近更新 更多