【问题标题】:Spark - repartition() vs coalesce()Spark - 重新分区()与合并()
【发布时间】:2015-10-15 03:39:20
【问题描述】:

根据学习火花

请记住,对数据重新分区是一项相当昂贵的操作。 Spark 还有一个优化版本的repartition(),称为coalesce(),它允许避免数据移动,但前提是您要减少 RDD 分区的数量。

我得到的一个区别是repartition() 可以增加/减少分区数,但coalesce() 只能减少分区数。

如果分区分布在多台机器上并运行coalesce(),如何避免数据移动?

【问题讨论】:

    标签: apache-spark distributed-computing rdd


    【解决方案1】:

    它避免了 full 洗牌。如果知道数量正在减少,那么执行程序可以安全地将数据保留在最小数量的分区上,只需将数据从额外的节点移到我们保留的节点上。

    所以,它会变成这样:

    Node 1 = 1,2,3
    Node 2 = 4,5,6
    Node 3 = 7,8,9
    Node 4 = 10,11,12
    

    然后coalesce 降到 2 个分区:

    Node 1 = 1,2,3 + (10,11,12)
    Node 3 = 7,8,9 + (4,5,6)
    

    请注意,节点 1 和节点 3 不需要移动其原始数据。

    【讨论】:

    • 感谢您的回复。文档最好说minimize data movement 而不是avoiding data movement
    • 是否应该使用repartition 而不是coalesce
    • @Niemand 我认为当前的文档很好地涵盖了这一点:github.com/apache/spark/blob/… 请记住,所有repartition 所做的只是调用coalesce 并将shuffle 参数设置为true。让我知道这是否有帮助。
    • 是否可以减少现有分区文件的数量?我没有 hdfs,但有很多文件有问题。
    • 重新分区在统计上会变慢,因为它不知道它正在缩小......尽管也许他们可以优化它。在内部它只是用shuffle = true 标志调用coalesce
    【解决方案2】:

    这里需要注意的一点是,Spark RDD 的基本原则是不变性。重新分区或合并将创建新的 RDD。基本 RDD 将继续存在其原始分区数。如果用例需要将 RDD 持久化到缓存中,那么新创建的 RDD 也必须这样做。

    scala> pairMrkt.repartition(10)
    res16: org.apache.spark.rdd.RDD[(String, Array[String])] =MapPartitionsRDD[11] at repartition at <console>:26
    
    scala> res16.partitions.length
    res17: Int = 10
    
    scala>  pairMrkt.partitions.length
    res20: Int = 2
    

    【讨论】:

    • 不错的一个!这是至关重要的,至少对于这个有经验的 scala 开发人员来说,这并不明显——即,repartitioncoalesce 都没有尝试修改数据,只是它在节点之间的分布方式
    • @Harikrishnan 因此,如果我正确理解了其他答案,那么在合并 Spark 的情况下使用现有分区,但是由于 RDD 是不可变的,您能否描述 Coalesce 如何使用现有分区?根据我的理解,我认为 Spark 会将新分区附加到合并中的现有分区。
    • 但是如果执行图所知道的不再使用“旧”RDD,如果不持久化,它将从内存中清除,不是吗?
    【解决方案3】:

    贾斯汀的回答很棒,而且这个回答更深入。

    repartition 算法会进行完全洗牌,并使用均匀分布的数据创建新分区。让我们用 1 到 12 的数字创建一个 DataFrame。

    val x = (1 to 12).toList
    val numbersDf = x.toDF("number")
    

    numbersDf 在我的机器上包含 4 个分区。

    numbersDf.rdd.partitions.size // => 4
    

    以下是数据在分区上的划分方式:

    Partition 00000: 1, 2, 3
    Partition 00001: 4, 5, 6
    Partition 00002: 7, 8, 9
    Partition 00003: 10, 11, 12
    

    让我们用repartition 方法做一个full-shuffle,并在两个节点上获取这些数据。

    val numbersDfR = numbersDf.repartition(2)
    

    这是numbersDfR 数据在我的机器上的分区方式:

    Partition A: 1, 3, 4, 6, 7, 9, 10, 12
    Partition B: 2, 5, 8, 11
    

    repartition 方法创建新分区并将数据均匀分布在新分区中(对于较大的数据集,数据分布更均匀)。

    coalescerepartition 之间的区别

    coalesce 使用现有分区来最小化被洗牌的数据量。 repartition 创建新分区并进行完全洗牌。 coalesce 导致具有不同数据量的分区(有时分区具有很大不同的大小),repartition 导致大小大致相等的分区。

    coalescerepartition 更快吗?

    coalesce 可能比repartition 运行得更快,但不等大小的分区通常比等大小的分区运行得慢。您通常需要在过滤大型数据集后重新分区数据集。我发现 repartition 总体上更快,因为 Spark 是为使用相同大小的分区而构建的。

    注意我好奇地观察到repartition can increase the size of data on disk。确保在大型数据集上使用重新分区/合并时运行测试。

    Read this blog post如果您想了解更多详情。

    何时在实践中使用合并和重新分区

    【讨论】:

    • 很好的答案@Powers,但分区 A 和 B 中的数据不是倾斜的吗?它是如何均匀分布的?
    • 另外,在不出现 OOM 错误的情况下获取分区大小的最佳方法是什么。我使用rdd.glom().map(len).collect(),但它给出了很多OOM错误。
    • @anwartheravian - 分区 A 和分区 B 的大小不同,因为 repartition 算法不会为非常小的数据集平均分配数据。我使用repartition 将 500 万条记录组织到 13 个分区中,每个文件的大小在 89.3 MB 和 89.6 MB 之间 - 相当!
    • @Powers 这个看起来更详细的答案。
    • 这更好地解释了差异。谢谢!
    【解决方案4】:

    所有的答案都为这个经常被问到的问题增加了一些知识。

    所以按照这个问题的时间线的传统,这是我的 2 美分。

    在非常特殊的情况下,我发现 重新分区比合并更快

    在我的应用程序中,当我们估计的文件数量低于某个阈值时,重新分区的工作速度更快。

    这就是我的意思

    if(numFiles > 20)
        df.coalesce(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)
    else
        df.repartition(numFiles).write.mode(SaveMode.Overwrite).parquet(dest)
    

    在上面的 sn-p 中,如果我的文件少于 20 个,则合并需要永远完成,而重新分区要快得多,所以上面的代码。

    当然,这个数字 (20) 将取决于工作人员的数量和数据量。

    希望对您有所帮助。

    【讨论】:

      【解决方案5】:

      以一种简单的方式 COALESCE :- 仅用于减少分区数,没有数据混洗,它只是压缩分区

      REPARTITION:- 用于增加和减少分区的数量,但是会发生洗牌

      例子:-

      val rdd = sc.textFile("path",7)
      rdd.repartition(10)
      rdd.repartition(2)
      

      两个都很好

      但是当我们需要在一个集群中查看输出时,我们通常会考虑这两件事,我们会这样做。

      【讨论】:

      • Coalese 也会有数据移动。
      【解决方案6】:

      但您还应该确保,如果您正在处理大量数据,则即将合并节点的数据应该具有高度配置。因为所有的数据都会被加载到那些节点上,可能会导致内存异常。 虽然维修费用很高,但我更喜欢使用它。因为它会平均分配数据。

      在合并和重新分区之间进行选择是明智的。

      【讨论】:

        【解决方案7】:

        repartition - 建议在增加分区数量的同时使用它,因为它涉及到所有数据的洗牌。

        coalesce - 建议在减少分区数的同时使用。例如,如果您有 3 个分区并且您想将其减少到 2 个,coalesce 会将第 3 个分区的数据移动到分区 1 和 2。分区 1 和 2 将保留在同一个容器中。 另一方面,repartition 会打乱所有分区中的数据,因此执行器之间的网络使用率会很高,并且会影响性​​能。

        coalesce 的性能优于repartition,同时减少了分区数量。

        【讨论】:

        • 有用的解释。
        • @Kamalesan C - 用简单的话很好的解释,我希望我能多次支持这个答案。
        【解决方案8】:

        我想在 Justin 和 Power 的回答中补充一点 -

        repartition 将忽略现有分区并创建新分区。所以你可以用它来修复数据倾斜。您可以提及分区键来定义分布。数据倾斜是“大数据”问题空间中最大的问题之一。

        coalesce 将与现有分区一起工作,并对其中的一个子集进行洗牌。它无法像repartition 那样修复数据偏差。因此,即使它更便宜,它也可能不是您需要的东西。

        【讨论】:

          【解决方案9】:

          对于所有出色的答案,我想补充一点,repartition 是利用数据并行化的最佳选择之一。虽然coalesce 提供了一个减少分区的廉价选项,并且在将数据写入 HDFS 或其他接收器以利用大写入时非常有用。

          我发现这在以 parquet 格式写入数据以充分利用数据时很有用。

          【讨论】:

            【解决方案10】:

            对于从 PySpark (AWS EMR) 生成单个 csv 文件作为输出并将其保存在 s3 上时遇到问题的人,使用重新分区会有所帮助。原因是,coalesce 不能完全洗牌,但 repartition 可以。本质上,您可以使用 repartition 增加或减少分区数,但只能使用 coalesce 减少分区数(但不能减少 1 个)。以下是尝试将 csv 从 AWS EMR 写入 s3 的任何人的代码:

            df.repartition(1).write.format('csv')\
            .option("path", "s3a://my.bucket.name/location")\
            .save(header = 'true')
            

            【讨论】:

              【解决方案11】:

              code 和代码文档的后续内容是 coalesce(n)coalesce(n, shuffle = false) 相同,repartition(n)coalesce(n, shuffle = true) 相同

              因此,coalescerepartition 都可以用来增加分区数

              使用shuffle = true,您实际上可以合并到更大的数字 的分区。如果您有少量分区,这很有用, 比如说 100,可能有几个分区异常大。

              另一个需要强调的重要提示是,如果您大幅减少分区数量,您应该考虑使用 shuffled 版本的coalesce(在这种情况下与repartition 相同) )。这将允许您在父分区上并行执行计算(多任务)。

              但是,如果您要进行剧烈的合并,例如到numPartitions = 1,这可能会导致您在比您喜欢的更少的节点上进行计算(例如,numPartitions = 1 的情况下只有一个节点)。为避免这种情况,您可以传递shuffle = true。这将添加一个 shuffle 步骤,但意味着当前上游分区将并行执行(无论当前分区是什么)。

              也请参考相关答案here

              【讨论】:

                【解决方案12】:

                重新分区:将数据随机分配到新数量的分区中。

                例如。初始数据帧被划分为 200 个分区。

                df.repartition(500): 数据将从 200 个分区洗牌到新的 500 个分区。

                合并:将数据打乱到现有数量的分区中。

                df.coalesce(5):数据将从剩余的 195 个分区洗牌到现有的 5 个分区。

                【讨论】:

                  【解决方案13】:

                  repartition 算法对数据进行完全洗牌并创建大小相等的数据分区。 coalesce 合并现有分区以避免完全洗牌。

                  Coalesce 非常适合采用具有大量分区的 RDD,并在单个工作节点上组合分区以生成具有较少分区的最终 RDD。

                  Repartition 将重新排列 RDD 中的数据以生成您请求的最终分区数。 DataFrame 的分区似乎是一个应该由框架管理的低级实现细节,但事实并非如此。在将大型 DataFrame 过滤成较小的 DataFrame 时,您几乎应该总是对数据进行重新分区。 您可能会经常将大型 DataFrame 过滤成较小的 DataFrame,因此请习惯重新分区。

                  Read this blog post如果您想了解更多详情。

                  【讨论】:

                    【解决方案14】:

                    另一个区别是考虑到存在倾斜连接并且您必须在其之上合并的情况。在大多数情况下,重新分区将解决倾斜连接,然后您可以进行合并。

                    另一种情况是,假设你在一个数据帧中保存了中/大量的数据,你必须批量生产到 Kafka。在某些情况下,重新分区有助于在生产到 Kafka 之前收集列表。但是,当卷非常高时,重新分区可能会导致严重的性能影响。在这种情况下,直接从数据帧生成到 Kafka 会有所帮助。

                    旁注:Coalesce 不会像工作人员之间的完整数据移动那样避免数据移动。它确实减少了发生的洗牌次数。我想这就是这本书的意思。

                    【讨论】:

                      【解决方案15】:

                      即使在@Rob 的回答中提到的分区号减少的情况下,重新分区 >> 合并也有一个用例,即将数据写入单个文件。

                      @Rob 的回答暗示了好的方向,但我认为需要进一步解释才能了解幕后发生的事情。

                      如果您需要在写入之前过滤数据,那么 repartitioncoalesce 更合适,因为在加载操作之前合并会被下推。

                      例如: load().map(…).filter(…).coalesce(1).save()

                      翻译为: load().coalesce(1).map(…).filter(…).save()

                      这意味着您的所有数据都将合并到一个分区中,在该分区中将被过滤,失去所有并行性。 即使对于像column='value' 这样非常简单的过滤器也会发生这种情况。

                      重新分区不会发生这种情况:load().map(…).filter(…).repartition(1).save()

                      在这种情况下,过滤会在原始分区上并行进行。

                      只是为了给出一个数量级,在我的例子中,当从 Hive 表加载后过滤 109M 行 (~105G) 和 ~1000 个分区时,运行时从 ~6h for coalesce(1) 下降到 ~2m for repartition (1).

                      具体例子取自this article from AirBnB,相当不错,涵盖了Spark中重新分区技术的更多方面。

                      【讨论】:

                      • 你对这个有把握吗?我下午去看看。
                      • 100% 在撰写本文时使用 Spark 2.4.x,还没有尝试过更新版本,如果您这样做了,请告诉我们! :)
                      • OK 将在本周晚些时候查看 databricks 模拟。欢呼
                      【解决方案16】:

                      基本上,重新分区允许您增加或减少分区的数量。 Repartition 重新分配来自所有分区的数据,这会导致完全 shuffle,这是非常昂贵的操作。

                      Coalesce 是 Repartition 的优化版本,您只能减少分区数量。由于我们只能减少分区的数量,它所做的就是将一些分区合并为一个分区。通过合并分区,与重新分区相比,跨分区的数据移动更低。所以在 Coalesce 中是最小的数据移动,但说 coalesce 不进行数据移动是完全错误的说法。

                      另一件事是通过提供分区数量进行重新分区,它会尝试在所有分区上均匀地重新分配数据,而在 Coalesce 的情况下,我们在某些情况下仍然可能存在倾斜数据。

                      【讨论】:

                        【解决方案17】:

                        ○ coalesce 使用现有分区来最大程度地减少混洗的数据量。重新分区会创建新分区并进行完全随机播放。

                        ○ 合并会产生具有不同数据量的分区(有时分区具有许多不同的大小),重新分区会产生大小大致相等的分区。

                        ○合并我们可以减少分区,但是我们可以增加和减少分区。

                        【讨论】:

                          【解决方案18】:

                          Coalesce 比重新分区执行得更好。合并总是减少分区。假设如果您在 yarn 中启用动态分配,您有四个分区和执行器。如果对其应用过滤器,则一个或多个执行器可能是空的,没有数据。这个问题可以通过合并而不是重新分区来解决。

                          【讨论】:

                            猜你喜欢
                            • 2016-04-22
                            • 2016-10-02
                            • 1970-01-01
                            • 2015-01-01
                            • 2019-03-02
                            • 2022-08-03
                            • 1970-01-01
                            • 1970-01-01
                            • 2019-08-26
                            相关资源
                            最近更新 更多