【问题标题】:Stratified sampling with pyspark使用 pyspark 进行分层采样
【发布时间】:2018-05-18 04:16:24
【问题描述】:

我有一个 Spark DataFrame,它有一列有很多零和很少的零(只有 0.01% 的零)。

我想随机抽取一个子样本,但要分层一个 - 以便在该列中保持 1 与 0 的比率。

在pyspark中可以吗?

我正在寻找基于DataFrames 而不是RDD-based 的非scala解决方案。

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql


    【解决方案1】:

    我在Stratified sampling in Spark 中建议的解决方案非常简单,可以从 Scala 转换为 Python(甚至转换为 Java - What's the easiest way to stratify a Spark Dataset ? )。

    不过,我会重写它python。让我们首先创建一个玩具DataFrame

    from pyspark.sql.functions import lit
    list = [(2147481832,23355149,1),(2147481832,973010692,1),(2147481832,2134870842,1),(2147481832,541023347,1),(2147481832,1682206630,1),(2147481832,1138211459,1),(2147481832,852202566,1),(2147481832,201375938,1),(2147481832,486538879,1),(2147481832,919187908,1),(214748183,919187908,1),(214748183,91187908,1)]
    df = spark.createDataFrame(list, ["x1","x2","x3"])
    df.show()
    # +----------+----------+---+
    # |        x1|        x2| x3|
    # +----------+----------+---+
    # |2147481832|  23355149|  1|
    # |2147481832| 973010692|  1|
    # |2147481832|2134870842|  1|
    # |2147481832| 541023347|  1|
    # |2147481832|1682206630|  1|
    # |2147481832|1138211459|  1|
    # |2147481832| 852202566|  1|
    # |2147481832| 201375938|  1|
    # |2147481832| 486538879|  1|
    # |2147481832| 919187908|  1|
    # | 214748183| 919187908|  1|
    # | 214748183|  91187908|  1|
    # +----------+----------+---+
    

    如您所见,这个DataFrame 有 12 个元素:

    df.count()
    # 12
    

    分布如下:

    df.groupBy("x1").count().show()
    # +----------+-----+
    # |        x1|count|
    # +----------+-----+
    # |2147481832|   10|
    # | 214748183|    2|
    # +----------+-----+
    

    现在让我们来采样:

    首先我们将设置种子:

    seed = 12
    

    找到分数和采样的关键:

    fractions = df.select("x1").distinct().withColumn("fraction", lit(0.8)).rdd.collectAsMap()
    print(fractions)                                                            
    # {2147481832: 0.8, 214748183: 0.8}
    sampled_df = df.stat.sampleBy("x1", fractions, seed)
    sampled_df.show()
    # +----------+---------+---+
    # |        x1|       x2| x3|
    # +----------+---------+---+
    # |2147481832| 23355149|  1|
    # |2147481832|973010692|  1|
    # |2147481832|541023347|  1|
    # |2147481832|852202566|  1|
    # |2147481832|201375938|  1|
    # |2147481832|486538879|  1|
    # |2147481832|919187908|  1|
    # | 214748183|919187908|  1|
    # | 214748183| 91187908|  1|
    # +----------+---------+---+
    

    我们现在可以检查样本的内容了:

    sampled_df.count()
    # 9
    
    sampled_df.groupBy("x1").count().show()
    # +----------+-----+
    # |        x1|count|
    # +----------+-----+
    # |2147481832|    7|
    # | 214748183|    2|
    # +----------+-----+
    

    【讨论】:

    • @eliasah 有没有办法添加 0.8 和 0.2 分数?我想使用 0.8 作为训练集,另一个 0.2 作为测试集。我尝试使用这种方法获得 0.8,但是在没有子查询支持的 spark 1.6 中很难获得另一个 0.2
    • 您始终可以在主 DF 和采样 DF @EmmaNej 上使用 except
    • @eliasah 是的,但考虑到我有 2000 万条记录并且数据集中没有唯一键,这需要很长时间。
    • @eliasah 不幸的是 Spark 1.6 不支持 left_anti 加入。
    【解决方案2】:

    假设您在“数据”数据框中有泰坦尼克号数据集,您希望使用基于“幸存”目标变量的分层抽样将其拆分为训练集和测试集。

      # Check initial distributions of 0's and 1's
    -> data.groupBy("Survived").count().show()
    
     Survived|count|
     +--------+-----+
     |       1|  342|
     |       0|  549
    
    
      # Taking 70% of both 0's and 1's into training set
    -> train = data.sampleBy("Survived", fractions={0: 0.7, 1: 0.7}, seed=10)
    
      # Subtracting 'train' from original 'data' to get test set 
    -> test = data.subtract(train)
    
    
    
      # Checking distributions of 0's and 1's in train and test sets after the sampling
    -> train.groupBy("Survived").count().show()
    +--------+-----+
    |Survived|count|
    +--------+-----+
    |       1|  239|
    |       0|  399|
    +--------+-----+
    -> test.groupBy("Survived").count().show()
    
    +--------+-----+
    |Survived|count|
    +--------+-----+
    |       1|  103|
    |       0|  150|
    +--------+-----+
    

    【讨论】:

      【解决方案3】:

      这可以通过 PySpark 中的“randomSplit”和“union”轻松完成。

      # read in data
      df = spark.read.csv(file, header=True)
      # split dataframes between 0s and 1s
      zeros = df.filter(df["Target"]==0)
      ones = df.filter(df["Target"]==1)
      # split datasets into training and testing
      train0, test0 = zeros.randomSplit([0.8,0.2], seed=1234)
      train1, test1 = ones.randomSplit([0.8,0.2], seed=1234)
      # stack datasets back together
      train = train0.union(train1)
      test = test0.union(test1)
      

      【讨论】:

        【解决方案4】:

        这是基于@eliasah 和this so thread 接受的答案

        如果你想取回一个训练集和测试集,你可以使用以下函数:

        from pyspark.sql import functions as F 
        
        def stratified_split_train_test(df, frac, label, join_on, seed=42):
            """ stratfied split of a dataframe in train and test set.
            inspiration gotten from:
            https://stackoverflow.com/a/47672336/1771155
            https://stackoverflow.com/a/39889263/1771155"""
            fractions = df.select(label).distinct().withColumn("fraction", F.lit(frac)).rdd.collectAsMap()
            df_frac = df.stat.sampleBy(label, fractions, seed)
            df_remaining = df.join(df_frac, on=join_on, how="left_anti")
            return df_frac, df_remaining
        

        创建一个分层的训练和测试集,其中 80% 用于训练集

        df_train, df_test = stratified_split_train_test(df=df, frac=0.8, label="y", join_on="unique_id")
        

        【讨论】:

        • 我的数据集中没有“unique_id”列。有没有办法重写这个函数?
        猜你喜欢
        • 2017-05-31
        • 1970-01-01
        • 1970-01-01
        • 2018-11-22
        • 1970-01-01
        • 2014-09-08
        • 2022-09-30
        • 2023-03-17
        • 2015-11-21
        相关资源
        最近更新 更多