【问题标题】:Balanced RDD partition among workers - Spark工作人员之间的平衡 RDD 分区 - Spark
【发布时间】:2017-08-17 16:40:45
【问题描述】:

我正在使用 RDD 中的 x: key, y: set(values) 称为 file

#values: RDD of tuples (key, val)    
file = values.groupByKey().mapValues(set).cache()
info_file = array(file.map(lambda (x,y): len(y)).collect())
var = np.var(info_file) #extremely high
def f():
     ...
file.foreachPartition(f)

len(y) 的方差非常高,因此大约 1% 的对集合(使用百分位数方法验证)占集合 total = np.sum(info_file) 中值总数的 20%。 如果 Spark 使用 shuffle 随机分区,那么这 1% 很可能会落在同一个分区中,从而导致工作人员之间的负载不平衡。

有没有办法确保“重”元组在分区之间均匀分布? 实际上,我根据threshold = np.percentile(info_file,99.9) 给出的阈值len(y)file 拆分为两个分区heavylight,以便分离这组元组然后重新分区。

light = file.filter(lambda (x,y): len(y) < threshold).cache()
heavy = file.filter(lambda (x,y): len(y) >= threshold).cache()

light.foreachPartition(f)
heavy.foreachPartition(f)

但获得几乎相同的运行时间。负载可能已经优化,只是想检查我是否可以做更多/更好的事情。

【问题讨论】:

    标签: python apache-spark pyspark rdd


    【解决方案1】:

    您可以使用 Ganglia 来监控集群负载。这应该可以很好地指示任何可能导致集群负载不均的数据倾斜。

    如果您确实遇到了不幸的数据倾斜,可以通过重组数据或加盐键等方式与之抗争。参见例如this StackOverflow Q&A

    请注意,您现在也可以将数据拆分为heavy 分区和light 分区,但在这种情况下,您希望cachefile - 而不是heavylight - 因为它是file,你需要处理多次。像这样:

    cachedFile = file.cache()
    
    light = cachedFile.filter(lambda (x,y): len(y) < threshold)
    heavy = cachedFile.filter(lambda (x,y): len(y) >= threshold)
    
    light.foreachPartition(f)
    heavy.foreachPartition(f)
    

    希望对您有所帮助。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-08-16
      • 2017-01-29
      • 1970-01-01
      相关资源
      最近更新 更多