【问题标题】:How does range partitioner work in Spark?范围分区器如何在 Spark 中工作?
【发布时间】:2017-01-08 15:35:07
【问题描述】:

我不太清楚范围分区器在 Spark 中是如何工作的。它使用(Reservoir Sampling)来取样。我对计算输入边界的方式感到困惑。

 // This is the sample size we need to have roughly balanced output partitions, capped at 1M.
  val sampleSize = math.min(20.0 * partitions, 1e6)
  // Assume the input partitions are roughly balanced and over-sample a little bit.
  val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt

为什么计算出的 sampleSize 应该乘以 3.0?以及如何获得边界?有人可以给我看一些关于这个的例子吗?谢谢!

【问题讨论】:

    标签: apache-spark partitioning


    【解决方案1】:

    范围分区的背景

    您发布的代码来自用于获取未分区 RDD 并由新范围分区器对其进行分区的方法。这包括三个步骤:

    1. 计算合理的范围边界
    2. 从这些范围边界构造一个分区器,为您提供从键 K 到分区索引的函数
    3. 根据这个新的分区器随机播放 RDD

    您的问题涉及这些步骤中的第一步。理想情况下,您可以收集所有 RDD 数据,对其进行排序,并确定将排序后的集合划分为nPartitions 块的范围界限。简单的!

    没那么多。该算法的计算量为 O(n log n),并且需要与集合成比例的内存。这些事实(尤其是第二个)使得在分布式 Spark 框架中执行变得不切实际。但是我们不需要我们的分区是完全平衡的,因为在我糟糕的收集和排序实现之后它们将是平衡的。只要我们的分区最终合理平衡,我们就很清楚。如果我们可以使用一种算法,给我们提供近似分位数边界但运行速度更快,这可能是一个胜利。

    好的,所以我们有动力开发一种运行速度快且不占用太多内存的高效算法。事实证明,水库取样是一种很好的方法。如果您的集合有 1B 元素并且您对 1M 进行采样,则 1M 元素的第 10 个百分位大约等于 1B 的第 10 个百分位。您可以使用完全相同的收集和排序算法来确定范围界限,但要针对完整数据的随机抽样子集进行缩减。

    您关于乘以 3 的具体问题

    第一行 (sampleSize) 估计了充分表示真实值范围所需的样本数。这有点武断,可能基于反复试验。但是由于您想并行采样,您需要知道从每个分布式partition 中取多少值,而不是整体取多少值。第二行 (sampleSizePerPartition) 估计了这个数字。

    之前我提到我们希望分区大致平衡。这是因为大量的 Spark 函数依赖于这个属性——包括sampleSizePerPartition 代码。我们知道分区大小略有不同,但假设它们变化不大。通过从每个分区中采样 3 倍于完全平衡时所需的值,我们可以容忍更多的分区不平衡。

    考虑一下如果您有 100,000 个分区会发生什么。在这种情况下,sampleSize 是 200 万(20 * 个分区)

    如果您从每个分区中抽取 20 个随机元素,那么如果任何分区的元素少于 20 个,那么您最终得到的样本将少于 sampleSize。从每个分区中提取 60 个元素是激进的,但可以确保您在除最极端的不平衡分区场景之外的所有场景中都获得足够的样本。

    【讨论】:

    • 非常感谢您的详细回答!您能否添加一些关于如何使用此确定边界()方法计算输入边界的 cmets?
    • 我可以复制代码并简要浏览一下,但您的用例是什么?您是尝试替换采样算法还是绘制自己的自定义范围?
    • 非常感谢!我刚刚在均匀划分小范围时遇到了一些小问题。例如,如果尝试val datapart = List(0, 50, 100, 150) val rdd = sc.parallelize(datapart).map((_, 1)),则运行val rangePart = new RangePartitioner(4, rdd) 拆分为4 个分区。但是我返回的结果不是平均分配的。你能帮忙解释一下原因和边界是如何计算的吗? @蒂姆P
    • 啊。不能保证它会以很小的数据大小均匀地分割事物,因为这不是编写 Spark 的目的。我用你的 4 元素列表尝试了你的代码,得到了分区大小[2, 1, 1, 0]。然后我尝试了 100k 元素范围并得到了[24626, 25280, 24860, 25234]
    • 谢谢!是的,我得到了类似的结果。并且仍在挖掘发生这种情况的原因。
    猜你喜欢
    • 1970-01-01
    • 2021-05-16
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多