【问题标题】:sampling with weight using pyspark使用 pyspark 进行权重采样
【发布时间】:2018-02-01 08:17:11
【问题描述】:

我在使用 PySpark 的 spark 上有一个不平衡的数据帧。 我想重新采样以使其平衡。 我只在 PySpark 中找到示例函数

sample(withReplacement, fraction, seed=None)

但我想用单位体积的重量对数据框进行采样 在 Python 中,我可以这样做

df.sample(n,Flase,weights=log(unitvolume))

有什么方法可以让我使用 PySpark 做同样的事情吗?

【问题讨论】:

    标签: python apache-spark pyspark sampling


    【解决方案1】:

    Spark 提供了用于分层抽样的工具,但这仅适用于分类数据。您可以尝试对其进行分桶:

    from pyspark.ml.feature import Bucketizer
    from pyspark.sql.functions import col, log
    
    df_log = df.withColumn("log_unitvolume", log(col("unitvolume"))
    splits = ... # A list of splits
    
    bucketizer = Bucketizer(splits=splits, inputCol="log_unitvolume", outputCol="bucketed_log_unitvolume")
    
    df_log_bucketed = bucketizer.transform(df_log)
    

    计算统计数据:

    counts = df.groupBy("bucketed_log_unitvolume")
    fractions  = ...  # Define fractions from each bucket:
    

    并将这些用于采样:

    df_log_bucketed.sampleBy("bucketed_log_unitvolume", fractions)
    

    您也可以尝试将 log_unitvolume 重新缩放到 [0, 1] 范围,然后:

    from pyspark.sql.functions import rand 
    
    df_log_rescaled.where(col("log_unitvolume_rescaled") < rand())
    

    【讨论】:

    • spark 2.4 有没有类似的方法??我看到 Bucketizer 从 spark 3 中释放出来。
    【解决方案2】:

    一种方法是使用udf 制作采样列。此列将有一个随机数乘以您想要的重量。然后我们按照采样列排序,取前N个。

    考虑以下说明性示例:

    创建虚拟数据

    import numpy as np
    import string
    import pyspark.sql.functions as f
    
    index = range(100)
    weights = [i%26 for i in index]
    labels = [string.ascii_uppercase[w] for w in weights]
    
    df = sqlCtx.createDataFrame(
        zip(index, labels, weights),
        ('index', 'label', 'weight')
    )
    
    df.show(n=5)
    #+-----+-----+------+
    #|index|label|weight|
    #+-----+-----+------+
    #|    0|    A|     0|
    #|    1|    B|     1|
    #|    2|    C|     2|
    #|    3|    D|     3|
    #|    4|    E|     4|
    #+-----+-----+------+
    #only showing top 5 rows
    

    添加采样列

    在本例中,我们希望使用列 weight 作为权重对 DataFrame 进行采样。我们使用numpy.random.random() 定义一个udf 来生成统一的随机数并乘以权重。然后我们在这个列上使用sort(),并使用limit()得到想要的样本数。

    N = 10  # the number of samples
    
    def get_sample_value(x):
        return np.random.random() * x
    
    get_sample_value_udf = f.udf(get_sample_value, FloatType())
    
    df_sample = df.withColumn('sampleVal', get_sample_value_udf(f.col('weight')))\
        .sort('sampleVal', ascending=False)\
        .select('index', 'label', 'weight')\
        .limit(N)
    

    结果

    正如预期的那样,DataFrame df_sample 有 10 行,它的内容往往在字母表末尾有字母(更高的权重)。

    df_sample.count()
    #10
    
    df_sample.show()
    #+-----+-----+------+
    #|index|label|weight|
    #+-----+-----+------+
    #|   23|    X|    23|
    #|   73|    V|    21|
    #|   46|    U|    20|
    #|   25|    Z|    25|
    #|   19|    T|    19|
    #|   96|    S|    18|
    #|   75|    X|    23|
    #|   48|    W|    22|
    #|   51|    Z|    25|
    #|   69|    R|    17|
    #+-----+-----+------+
    

    【讨论】:

      猜你喜欢
      • 2018-05-18
      • 2020-06-04
      • 1970-01-01
      • 2017-05-22
      • 1970-01-01
      • 2020-12-28
      • 2019-09-03
      • 2019-01-23
      • 1970-01-01
      相关资源
      最近更新 更多