【问题标题】:outlier detection in pysparkpyspark中的异常值检测
【发布时间】:2019-03-09 02:09:39
【问题描述】:

我有一个如下所示的 pyspark 数据框。

+---+-------+--------+
|age|balance|duration|
+---+-------+--------+
|  2|   2143|     261|
| 44|     29|     151|
| 33|      2|      76|
| 50|   1506|      92|
| 33|      1|     198|
| 35|    231|     139|
| 28|    447|     217|
|  2|      2|     380|
| 58|    121|      50|
| 43|    693|      55|
| 41|    270|     222|
| 50|    390|     137|
| 53|      6|     517|
| 58|     71|      71|
| 57|    162|     174|
| 40|    229|     353|
| 45|     13|      98|
| 57|     52|      38|
|  3|      0|     219|
|  4|      0|      54|
+---+-------+--------+

我的预期输出应该是这样的,

+---+-------+--------+-------+-----------+------------+
|age|balance|duration|age_out|balance_out|duration_out|
+---+-------+--------+-------+-----------+------------+
|  2|   2143|     261|      1|          1|           0|
| 44|     29|     151|      0|          0|           0|
| 33|      2|      76|      0|          0|           0|
| 50|   1506|      92|      0|          1|           0|
| 33|      1|     198|      0|          0|           0|
| 35|    231|     139|      0|          0|           0|
| 28|    447|     217|      0|          0|           0|
|  2|      2|     380|      1|          0|           0|
| 58|    121|      50|      0|          0|           0|
| 43|    693|      55|      0|          0|           0|
| 41|    270|     222|      0|          0|           0|
| 50|    390|     137|      0|          0|           0|
| 53|      6|     517|      0|          0|           1|
| 58|     71|      71|      0|          0|           0|
| 57|    162|     174|      0|          0|           0|
| 40|    229|     353|      0|          0|           0|
| 45|     13|      98|      0|          0|           0|
| 57|     52|      38|      0|          0|           0|
|  3|      0|     219|      1|          0|           0|
|  4|      0|      54|      0|          0|           0|
+---+-------+--------+-------+-----------+------------+

这里我的目标是使用我在下面的 python 代码中描述的四分位数方法来识别数据集中的异常记录。如果我们发现任何异常记录,那么我们需要将它们标记为 1,否则标记为 0。

我可以通过下面的代码使用 python 做同样的事情。

import numpy as np
def outliers_iqr(ys):
    quartile_1, quartile_3 = np.percentile(ys, [25, 75])
    iqr = quartile_3 - quartile_1
    lower_bound = quartile_1 - (iqr * 1.5)
    upper_bound = quartile_3 + (iqr * 1.5)
    ser = np.zeros(len(ys))
    pos =np.where((ys > upper_bound) | (ys < lower_bound))[0]
    ser[pos]=1
    return(ser)

但我想在 pyspark 中做同样的事情。有人可以帮助我吗?

我的 pyspark 代码:

def outliers_iqr(ys):
    quartile_1, quartile_3 = np.percentile(ys, [25, 75])
    iqr = quartile_3 - quartile_1
    lower_bound = quartile_1 - (iqr * 1.5)
    upper_bound = quartile_3 + (iqr * 1.5)
    ser = np.zeros(len(ys))
    pos =np.where((ys > upper_bound) | (ys < lower_bound))[0]
    ser[pos]=1
    return(float(ser))

outliers_iqr_udf = udf(outliers_iqr, FloatType())
DF.withColumn('age_out', outliers_iqr_udf(DF.select('age').collect())).show()

【问题讨论】:

    标签: python-3.x apache-spark pyspark


    【解决方案1】:

    You can use pyspark.sql.DataFrame.approxQuantile 在循环内获取每列所需的第 25 和第 75 百分位值。

    bounds = {
        c: dict(
            zip(["q1", "q3"], df.approxQuantile(c, [0.25, 0.75], 0))
        )
        for c in df.columns
    }
    

    传递的最后一个参数是相对错误,您可以在链接的帖子以及docs 上阅读。简短的版本是数字越小,您的结果就越准确,但在准确性和计算费用之间存在权衡。 (这里我使用 0 来获取确切的值,但您可能需要根据数据的大小选择不同的值。)

    一旦你有了第一个和第三个四分位数的值,你就可以很容易地计算出iqr 和上限/下限:

    for c in bounds:
        iqr = bounds[c]['q3'] - bounds[c]['q1']
        bounds[c]['lower'] = bounds[c]['q1'] - (iqr * 1.5)
        bounds[c]['upper'] = bounds[c]['q3'] + (iqr * 1.5)
    print(bounds)
    #{'age': {'lower': 3.0, 'q1': 33.0, 'q3': 53.0, 'upper': 83.0},
    # 'balance': {'lower': -570.0, 'q1': 6.0, 'q3': 390.0, 'upper': 966.0},
    # 'duration': {'lower': -143.0, 'q1': 76.0, 'q3': 222.0, 'upper': 441.0}}
    

    现在在列表推导中使用pyspark.sql.functions.when 来构建基于bounds 的异常列:

    import pyspark.sql.functions as f
    df.select(
        "*",
        *[
            f.when(
                f.col(c).between(bounds[c]['lower'], bounds[c]['upper']),
                0
            ).otherwise(1).alias(c+"_out") 
            for c in df.columns
        ]
    ).show()
    #+---+-------+--------+-------+-----------+------------+
    #|age|balance|duration|age_out|balance_out|duration_out|
    #+---+-------+--------+-------+-----------+------------+
    #|  2|   2143|     261|      1|          1|           0|
    #| 44|     29|     151|      0|          0|           0|
    #| 33|      2|      76|      0|          0|           0|
    #| 50|   1506|      92|      0|          1|           0|
    #| 33|      1|     198|      0|          0|           0|
    #| 35|    231|     139|      0|          0|           0|
    #| 28|    447|     217|      0|          0|           0|
    #|  2|      2|     380|      1|          0|           0|
    #| 58|    121|      50|      0|          0|           0|
    #| 43|    693|      55|      0|          0|           0|
    #| 41|    270|     222|      0|          0|           0|
    #| 50|    390|     137|      0|          0|           0|
    #| 53|      6|     517|      0|          0|           1|
    #| 58|     71|      71|      0|          0|           0|
    #| 57|    162|     174|      0|          0|           0|
    #| 40|    229|     353|      0|          0|           0|
    #| 45|     13|      98|      0|          0|           0|
    #| 57|     52|      38|      0|          0|           0|
    #|  3|      0|     219|      0|          0|           0|
    #|  4|      0|      54|      0|          0|           0|
    #+---+-------+--------+-------+-----------+------------+
    

    这里我使用between来检查一个值是否不是异常值,并且这个函数是包容性的(即x between a and b在逻辑上等价于x &gt;= a and x &lt;= b)。

    【讨论】:

    • 我的函数能解决这个问题吗?如果是这样,请指导我需要进行哪些更改才能使其正常工作。感谢您的帮助。
    • @RSK 我没有看到任何简单的方法来修改您的原始代码以在 spark 中完成此操作。
    • 这个 f 代表什么。就我而言,它说没有名为 f?? 的变量
    • @Saradamani 编辑的答案包括import pyspark.sql.functions as f
    • @Saradamani 每个_out 列是一个指示符,用于指定该行中对应列的值是否为异常值。
    【解决方案2】:

    请在下面找到我的解决方案:

    from pyspark.sql import functions as f
    
    
    class Outlier():
    
        def __init__(self, df):
            self.df = df
    
    
        def _calculate_bounds(self):
            bounds = {
                c: dict(
                    zip(["q1", "q3"], self.df.approxQuantile(c, [0.25, 0.75], 0))
                )
                for c, d in zip(self.df.columns, self.df.dtypes) if d[1] in ["bigint", "double"]
            }
    
            for c in bounds:
                iqr = bounds[c]['q3'] - bounds[c]['q1']
                bounds[c]['min'] = bounds[c]['q1'] - (iqr * 1.5)
                bounds[c]['max'] = bounds[c]['q3'] + (iqr * 1.5)
    
            return bounds
    
    
        def _flag_outliers_df(self):
            bounds = self._calculate_bounds()
    
            outliers_col = [
                f.when(
                    ~f.col(c).between(bounds[c]['min'], bounds[c]['max']),
                    f.col(c)
                ).alias(c + '_outlier')
                for c in bounds]
    
            return self.df.select(*outliers_col)
    
    
        def show_outliers(self):
    
            outlier_df = self._flag_outliers_df()
    
            for outlier in outlier_df.columns:
                outlier_df.select(outlier).filter(f.col(outlier).isNotNull()).show()
    

    然后按如下方式传递您的数据框:

    Outlier(df).show_outliers()
    

    【讨论】:

      猜你喜欢
      • 2018-03-04
      • 2019-07-24
      • 1970-01-01
      • 2018-12-06
      • 2020-04-03
      • 2020-02-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多