【问题标题】:How to calculate the best numberOfPartitions for coalesce?如何计算合并的最佳 numberOfPartitions?
【发布时间】:2016-11-29 11:56:47
【问题描述】:

所以,我知道一般情况下应该在以下情况下使用coalesce()

由于filter 或其他可能导致减少原始数据集(RDD、DF)的操作,分区数减少。 coalesce() 对于过滤大型数据集后更有效地运行操作很有用。

我也知道它比repartition 便宜,因为它仅在必要时通过移动数据来减少洗牌。我的问题是如何定义coalesce 采用的参数(idealPartionionNo)。我正在处理一个从另一位工程师传递给我的项目,他正在使用以下计算来计算该参数的值。

// DEFINE OPTIMAL PARTITION NUMBER
implicit val NO_OF_EXECUTOR_INSTANCES = sc.getConf.getInt("spark.executor.instances", 5)
implicit val NO_OF_EXECUTOR_CORES = sc.getConf.getInt("spark.executor.cores", 2)

val idealPartionionNo = NO_OF_EXECUTOR_INSTANCES * NO_OF_EXECUTOR_CORES * REPARTITION_FACTOR

然后将其与 partitioner 对象一起使用:

val partitioner = new HashPartitioner(idealPartionionNo)

但也用于:

RDD.filter(x=>x._3<30).coalesce(idealPartionionNo)

这是正确的方法吗? idealPartionionNo 值计算背后的主要思想是什么? REPARTITION_FACTOR 是什么?我通常如何定义它?

此外,由于 YARN 负责即时识别可用的执行程序,因此有一种方法可以即时获取该数字 (AVAILABLE_EXECUTOR_INSTANCES) 并将其用于计算 idealPartionionNo(即,将 NO_OF_EXECUTOR_INSTANCES 替换为 @987654336 @)?

理想情况下,一些实际的表单示例:

  • 这是一个数据集大小);
  • 这里有一些 RDD/DF 的转换和可能的重用。
  • 这里是您应该重新分区/合并的地方。
  • 假设您有 n executorsm cores 和一个 partition factor 等于 k

然后:

  • 理想的分区数是 ==> ???

另外,如果您可以将我推荐给一个很好的博客来解释这些,我将不胜感激。

【问题讨论】:

    标签: scala apache-spark rdd


    【解决方案1】:

    实际上,最佳分区数更多地取决于您拥有的数据、您使用的转换和整体配置,而不是可用资源。

    • 如果分区数太少,您将遇到长时间的 GC 暂停、不同类型的内存问题,最后是资源利用率不佳。
    • 如果分区数量过多,那么维护成本很容易超过处理成本。此外,如果您使用非分布式归约操作(例如 reducetreeReduce 对比),大量分区会导致驱动程序负载更高。

    您可以找到一些规则,这些规则建议与内核数量相比过度订阅分区(因子 2 或 3 似乎很常见)或将分区保持在一定大小,但这并没有考虑到您自己的代码:

    • 如果分配很多,则可能会出现较长的 GC 暂停,使用较小的分区可能会更好。
    • 如果某段代码很昂贵,那么您的 shuffle 成本可以通过更高的并发性来摊销。
    • 如果您有过滤器,则可以根据谓词的判别力调整分区数(如果您希望保留 5% 的数据和 99% 的数据,您会做出不同的决定)。

    在我看来:

    • 对于一次性作业,保留更多的分区以保持安全(慢总比失败好)。
    • 对于可重用作业,从保守配置开始,然后执行 - 监控 - 调整配置 - 重复。
    • 不要尝试根据执行器或核心的数量使用固定数量的分区。首先了解您的数据和代码,然后调整配置以反映您的理解。

      通常,确定集群表现出稳定行为的每个分区的原始数据量相对容易(根据我的经验,它在几百兆字节的范围内,具体取决于您使用的格式和数据结构)加载数据和配置)。这是您正在寻找的“神奇数字”。

    一般情况下你必须记住的一些事情:

    • 分区数不一定反映 数据分布。任何需要 shuffle 的操作(*byKeyjoinRDD.partitionByDataset.repartition)都可能导致数据分布不均匀。始终监控您的工作是否存在严重的数据偏差。
    • 分区数通常不是恒定的。任何具有多个依赖项(unioncoGroupjoin)的操作都会影响分区的数量。

    【讨论】:

      【解决方案2】:

      您的问题是有效的,但 Spark 分区优化完全取决于您正在运行的计算。您需要有充分的理由重新分区/合并;如果你只是计算一个 RDD(即使它有大量稀疏的分区),那么任何重新分区/合并步骤都会减慢你的速度。

      重新分区与合并

      repartition(n)(与coalesce(n, shuffle = true)coalesce(n, shuffle = false)相同)的区别在于执行模型。shuffle模型取原始RDD中的每个分区,将其数据随机发送给所有执行者,并且产生一个带有新的(更小或更多)分区数的 RDD。no-shuffle 模型创建一个新的 RDD,它将多个分区作为一个任务加载。

      让我们考虑一下这个计算:

      sc.textFile("massive_file.txt")
        .filter(sparseFilterFunction) // leaves only 0.1% of the lines
        .coalesce(numPartitions, shuffle = shuffle)
      

      如果shuffletrue,则文本文件/过滤器计算发生在textFile 中默认给出的许多任务中,并且微小的过滤结果被打乱。如果shufflefalse,那么总任务数最多为numPartitions

      如果numPartitions 为 1,则差异非常明显。 shuffle 模型将并行处理和过滤数据,然后将 0.1% 的过滤结果发送给一个 executor 用于下游 DAG 操作。 no-shuffle 模型将从一开始就在一个核心上处理和过滤数据。

      采取的步骤

      考虑您的下游操作。如果您只使用此数据集一次,那么您可能根本不需要重新分区。如果您要保存过滤后的 RDD 以供以后使用(例如,存储到磁盘),请考虑上述权衡。熟悉这些模型需要经验,当一个模型表现更好时,请尝试两种模型,看看它们的表现如何!

      【讨论】:

        【解决方案3】:

        正如其他人所回答的那样,没有公式可以计算出您的要求。也就是说,您可以对第一部分做出有根据的猜测,然后随着时间的推移对其进行微调。

        第一步是确保您有足够的分区。如果每个执行器有 NO_OF_EXECUTOR_INSTANCES 执行器和 NO_OF_EXECUTOR_CORES 核心,那么您可以同时处理 NO_OF_EXECUTOR_INSTANCES*NO_OF_EXECUTOR_CORES 分区(每个分区都将转到特定实例的特定核心)。 也就是说,这假设一切都在核心之间平均分配,并且一切都需要完全相同的时间来处理。这种情况很少见。由于局部性(例如,数据需要来自不同的节点)或仅仅因为它们不平衡(例如,如果您的数据按根域分区,那么分区包括谷歌可能会很大)。这就是 REPARTITION_FACTOR 发挥作用的地方。这个想法是我们“超额预订”每个核心,因此如果一个完成得非常快而一个完成得慢,我们可以选择在它们之间划分任务。 2-3 倍通常是个好主意。

        现在让我们看一下单个分区的大小。假设您的整个数据大小为 X MB,并且您有 N 个分区。每个分区平均为 X/N MB。如果 N 相对于 X 很大,那么您的平均分区大小可能非常小(例如,几 KB)。在这种情况下,降低 N 通常是个好主意,因为管理每个分区的开销变得太高。另一方面,如果大小非常大(例如几 GB),那么您需要同时保存大量数据,这会导致诸如垃圾收集、高内存使用等问题。

        最佳大小是一个很好的问题,但通常人们似乎更喜欢 100-1000MB 的分区,但实际上几十 MB 也可能是好的。

        您应该注意的另一件事是,当您进行计算时,您的分区如何变化。例如,假设您从 1000 个分区开始,每个分区 100MB,但随后过滤数据,使每个分区变为 1K,那么您可能应该合并。当您进行 groupby 或加入时,可能会发生类似的问题。在这种情况下,分区的大小和分区的数量都会发生变化,并可能达到不理想的大小。

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2013-05-13
          相关资源
          最近更新 更多