【问题标题】:spark: How does salting work in dealing with skewed data火花:加盐如何处理倾斜数据
【发布时间】:2020-01-26 08:14:24
【问题描述】:

我在一个表中有一个倾斜的数据,然后将其与其他较小的表进行比较。 我知道在连接的情况下加盐起作用-即随机数被附加到大表中的键上,带有来自一系列随机数据的偏斜数据,并且小表中没有偏斜数据的行与相同范围的随机数重复.因此,匹配发生是因为在一个倾斜的特定加盐键的重复值中会有一个命中。 我还读到在执行 groupby 时加盐很有帮助。我的问题是,将随机数附加到密钥时不会破坏组吗?如果是这样,则分组操作的含义已经改变。

【问题讨论】:

    标签: apache-spark join group-by apache-spark-sql skew


    【解决方案1】:
    var df1 = Seq((1,"a"),(2,"b"),(1,"c"),(1,"x"),(1,"y"),(1,"g"),(1,"k"),(1,"u"),(1,"n")).toDF("ID","NAME") 
    
    df1.createOrReplaceTempView("fact")
    
    var df2 = Seq((1,10),(2,30),(3,40)).toDF("ID","SALARY")
    
    df2.createOrReplaceTempView("dim")
    
    val salted_df1 = spark.sql("""select concat(ID, '_', FLOOR(RAND(123456)*19)) as salted_key, NAME from fact """)
    
    salted_df1.createOrReplaceTempView("salted_fact")
    
    val exploded_dim_df = spark.sql(""" select ID, SALARY, explode(array(0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19)) as salted_key from dim""")
    
    //val exploded_dim_df = spark.sql(""" select ID, SALARY, explode(array(0 to 19)) as salted_key from dim""")
    
    exploded_dim_df.createOrReplaceTempView("salted_dim")
    
    val result_df = spark.sql("""select split(fact.salted_key, '_')[0] as ID, dim.SALARY 
                from salted_fact fact 
                LEFT JOIN salted_dim dim 
                ON fact.salted_key = concat(dim.ID, '_', dim.salted_key) """)
    display(result_df)
    

    【讨论】:

      【解决方案2】:

      “我的问题是,当随机数附加到密钥时,它不会破坏组吗?如果是,那么分组操作的含义已经改变。”

      是的,向现有密钥添加盐会破坏组。但是,正如@Gelerion 在他的回答中提到的那样,您可以按加盐和原始密钥分组,然后按原始密钥分组。这适用于聚合,例如

      • 计数
      • 分钟
      • 最大
      • 总和

      可以合并子组的结果。下图显示了计算倾斜 Dataframe 最大值的示例。

      【讨论】:

      • 我真的很想看到查询计划的整体比较以及无盐和有盐计算的性能。
      【解决方案3】:

      我的问题是,当随机数附加到密钥时,它不会破坏组吗?

      嗯,确实如此,为了缓解这种情况,您可以按操作运行两次分组。 先用加盐键,再去掉加盐,再分组。 第二组将采用部分聚合的数据,从而显着减少偏斜影响。

      例如

      import org.apache.spark.sql.functions._
      
      df.withColumn("salt", (rand * n).cast(IntegerType))
        .groupBy("salt", groupByFields)
        .agg(aggFields)
        .groupBy(groupByFields)
        .agg(aggFields)
      

      【讨论】:

      • 如果聚合函数像 count、percentile 和 standardDeviation 这样会产生正确的结果,我知道 sum,这样会很有效,但不确定它会提供的 count、percentile 和 standardDev正确的结果。
      • 如果我们尝试使用倾斜列上的子字符串创建新列怎么办?
      猜你喜欢
      • 2020-09-25
      • 1970-01-01
      • 2016-12-21
      • 2019-03-01
      • 1970-01-01
      • 2015-12-14
      • 2023-02-19
      • 1970-01-01
      • 2017-01-23
      相关资源
      最近更新 更多