【问题标题】:Skewed dataset join in Spark?在 Spark 中加入倾斜的数据集?
【发布时间】:2016-11-02 06:18:19
【问题描述】:

我正在使用 Spark RDD 加入两个大型数据集。一个数据集非常倾斜,因此很少有执行程序任务需要很长时间才能完成工作。我该如何解决这种情况?

【问题讨论】:

    标签: join apache-spark


    【解决方案1】:

    关于如何完成的相当不错的文章:https://datarus.wordpress.com/2015/05/04/fighting-the-skew-in-spark/

    短版:

    • 将随机元素添加到大型 RDD 并使用它创建新的连接键
    • 使用explode/flatMap 将随机元素添加到小型RDD 以增加条目数并创建新的连接键
    • 在新的连接键上加入 RDD,由于随机播种,现在可以更好地分布

    【讨论】:

    • 浏览了您提到的文章。在我的例子中,更大的表有 23500 个分区和 20 亿条记录。较小的表有 500 万条记录。我如何确定'N'的值?有什么建议吗?
    • @kalpesh,还有一个参数需要考虑,Spark 中的 shuffle 块的大小应该小于 2GB SPARK-6235。我建议关注分区的大小,通常应该是~128MB,而不是分区数。我已经看到应用程序在许多分区 (> 32k) 上运行良好。
    • @kalpesh 好吧,您想在新键上创建足够多的分区,以便所有执行程序将数据加入以最大化并行性。
    • @LiMuBei 有一些我在 python 代码中没有得到的东西 small_rdd_transformed = small_rdd.cartesian(sc.parallelize(range(0, N))).map(lambda x: ((x[0][0], x[1]), x[0][1])).coalesce(num_parts).cache() # replicate the small rdd in scala x is a tuple2 那么 x[0][0] 是什么意思?..
    • 我的错误是缺少mapPartitionWithIndex
    【解决方案2】:

    假设您必须在 A.id=B.id 上连接两个表 A 和 B。让我们假设表 A 在 id=1 上存在偏差。

    从 A 中选择 A.id 在 A.id = B.id 上加入 B

    解决倾斜连接问题有两种基本方法:

    方法一:

    将您的查询/数据集分成 2 个部分 - 一个仅包含倾斜数据,另一个包含非倾斜数据。 在上面的例子中。查询将变为 -

     1. select A.id from A join B on A.id = B.id where A.id <> 1;
     2. select A.id from A join B on A.id = B.id where A.id = 1 and B.id = 1;
    

    第一个查询不会有任何偏差,因此 ResultStage 的所有任务将大致同时完成。

    如果我们假设 B 只有几行 B.id = 1,那么它将适合内存。所以第二个查询将被转换为广播连接。这在 Hive 中也称为 Map-side join。

    参考:https://cwiki.apache.org/confluence/display/Hive/Skewed+Join+Optimization

    然后可以合并两个查询的部分结果以获得最终结果。

    方法二:

    上面 LeMuBei 也提到过,第二种方法尝试通过附加额外的列来随机化连接键。 步骤:

    1. 在较大的表 (A) 中添加一列,例如 skewLeft,并为所有行填充 0 到 N-1 之间的随机数。

    2. 在较小的表 (B) 中添加一列,例如 skewRight。将较小的表复制 N 次。因此,对于原始数据的每个副本,新 skewRight 列中的值将从 0 变化到 N-1。为此,您可以使用explode sql/dataset 运算符。

    在 1 和 2 之后,连接 2 个数据集/表,连接条件更新为 -

                    *A.id = B.id && A.skewLeft = B.skewRight*
    

    参考:https://datarus.wordpress.com/2015/05/04/fighting-the-skew-in-spark/

    【讨论】:

    • @prakharjain- 如果您可以使用一些简单的示例数据框详细说明第二种方法,那就太好了。我需要在 Pyspark 中实现它。谢谢
    • 第二种方法不是很清楚。介意添加一些示例吗?
    • @redTiger datarus.wordpress.com/2015/05/04/fighting-the-skew-in-spark 用一个例子解释了方法 2。
    【解决方案3】:

    根据您遇到的特定类型的偏差,可能有不同的解决方法。基本思路是:

    • 修改您的连接列,或创建一个新的连接列,该列没有倾斜,但仍保留足够的信息来进行连接
    • 在该非倾斜列上执行连接 - 生成的分区不会倾斜
    • 连接后,您可以将连接列更新回您的首选格式,或者在创建新列时将其删除

    如果倾斜的数据参与连接,李牧北的答案中引用的“Fighting the Skew In Spark”文章是一种很好的技术。就我而言,偏斜是由连接列中的大量空值引起的。空值不参与连接,但由于连接列上的 Spark 分区,连接后的分区非常倾斜,因为有一个包含所有空值的巨大分区。

    我通过添加一个新列来解决它,该列将所有空值更改为分布良好的临时值,例如“NULL_VALUE_X”,其中 X 被替换为 1 到 10,000 之间的随机数,例如(在 Java 中):

    // Before the join, create a join column with well-distributed temporary values for null swids.  This column
    // will be dropped after the join.  We need to do this so the post-join partitions will be well-distributed,
    // and not have a giant partition with all null swids.
    String swidWithDistributedNulls = "swid_with_distributed_nulls";
    int numNullValues = 10000; // Just use a number that will always be bigger than number of partitions
    Column swidWithDistributedNullsCol =
        when(csDataset.col(CS_COL_SWID).isNull(), functions.concat(
            functions.lit("NULL_SWID_"),
            functions.round(functions.rand().multiply(numNullValues)))
        )
        .otherwise(csDataset.col(CS_COL_SWID));
    csDataset = csDataset.withColumn(swidWithDistributedNulls, swidWithDistributedNullsCol);
    

    然后在这个新列上加入,然后在加入之后:

    outputDataset.drop(swidWithDistributedNullsCol);
    

    【讨论】:

      【解决方案4】:

      参考https://datarus.wordpress.com/2015/05/04/fighting-the-skew-in-spark/ 下面是使用 Pyspark 数据帧 API 对抗 spark 偏差的代码

      创建 2 个数据框:

      from math import exp
      from random import randint
      from datetime import datetime
      
      def count_elements(splitIndex, iterator):
          n = sum(1 for _ in iterator)
          yield (splitIndex, n)
      
      def get_part_index(splitIndex, iterator):
          for it in iterator:
              yield (splitIndex, it)
      
      num_parts = 18
      # create the large skewed rdd
      skewed_large_rdd = sc.parallelize(range(0,num_parts), num_parts).flatMap(lambda x: range(0, int(exp(x))))
      skewed_large_rdd = skewed_large_rdd.mapPartitionsWithIndex(lambda ind, x: get_part_index(ind, x))
      
      skewed_large_df = spark.createDataFrame(skewed_large_rdd,['x','y'])
      
      small_rdd = sc.parallelize(range(0,num_parts), num_parts).map(lambda x: (x, x))
      
      small_df = spark.createDataFrame(small_rdd,['a','b'])
      

      大df将数据分成100个bin,小df复制100次

      salt_bins = 100
      from pyspark.sql import functions as F
      
      skewed_transformed_df = skewed_large_df.withColumn('salt', (F.rand()*salt_bins).cast('int')).cache()
      
      small_transformed_df = small_df.withColumn('replicate', F.array([F.lit(i) for i in range(salt_bins)]))
      
      small_transformed_df = small_transformed_df.select('*', F.explode('replicate').alias('salt')).drop('replicate').cache()
      

      终于避免了歪斜的连接

      t0 = datetime.now()
      result2 = skewed_transformed_df.join(small_transformed_df, (skewed_transformed_df['x'] == small_transformed_df['a']) & (skewed_transformed_df['salt'] == small_transformed_df['salt']) )
      result2.count() 
      print "The direct join takes %s"%(str(datetime.now() - t0))
      

      【讨论】:

        【解决方案5】:

        Apache DataFu 有两种执行倾斜连接的方法,这些方法实现了先前答案中的一些建议。

        joinSkewed 方法进行加盐(添加随机数列来拆分倾斜的值)。

        broadcastJoinSkewed 方法适用于您可以将数据框划分为倾斜和常规部分,如moriarty007 的答案中的Approach 2 所述。

        DataFu 中的这些方法对于使用 Spark 2.x 的项目很有用。如果您已经使用 Spark 3,则有 dedicated methods for doing skewed joins

        完全披露 - 我是 Apache DataFu 的成员。

        【讨论】:

          【解决方案6】:

          您可以尝试将“偏斜”的 RDD 重新分区到更多分区,或者尝试增加 spark.sql.shuffle.partitions(默认为 200)。

          在你的情况下,我会尝试将分区的数量设置为远高于执行者的数量。

          【讨论】:

          • spark.sql.shuffle.partitions 不会帮助扭曲数据。将有 200 个分区,但其中只有少数有数据。
          • 将 spark.sql.shuffle.partitions 增加到更高的数字不会帮助解决 Skew。 skewed key 对应的所有数据仍然会进入同一个 reducer,会导致速度变慢。
          • @prakharjain 这并不完全正确。增加分区数将减少具有许多记录的两个键放在同一分区中的机会。
          • spark.sql.shuffle.partitions 不会成功。加入倾斜数据将导致数据混洗的热点问题,因为连接点上的相同值将被散列到相同的散列键中,因此,所有这些行(具有相同值)将被发送到同一个执行程序在洗牌过程中。参考:oreilly.com/library/view/high-performance-spark/9781491943199/…
          猜你喜欢
          • 2016-12-21
          • 2015-07-08
          • 1970-01-01
          • 1970-01-01
          • 2021-04-23
          • 1970-01-01
          • 2019-03-09
          • 2019-04-01
          • 1970-01-01
          相关资源
          最近更新 更多