【问题标题】:pyspark extract ROC curve?pyspark提取ROC曲线?
【发布时间】:2019-03-21 16:36:29
【问题描述】:

有没有办法从 pyspark 中的 Spark ML 获取 ROC 曲线上的点?在文档中,我看到了 Scala 但不是 python 的示例:https://spark.apache.org/docs/2.1.0/mllib-evaluation-metrics.html

对吗?我当然可以想到实现它的方法,但我必须想象如果有一个预先构建的功能会更快。我正在处理 300 万个分数和几十个模型,所以速度很重要。

【问题讨论】:

    标签: pyspark apache-spark-ml


    【解决方案1】:

    只要 ROC 曲线是 FPR 与 TPR 的图,就可以提取所需的值如下:

    your_model.summary.roc.select('FPR').collect()
    your_model.summary.roc.select('TPR').collect())
    

    your_model 可以是例如您从以下内容中获得的模型:

    from pyspark.ml.classification import LogisticRegression
    log_reg = LogisticRegression()
    your_model = log_reg.fit(df)
    

    现在您应该只针对 TPR 绘制 FPR,例如使用 matplotlib

    附言

    这是一个使用名为your_model(以及其他任何东西!)的模型绘制 ROC 曲线的完整示例。我还在 ROC 图中绘制了一条参考“随机猜测”线。

    import matplotlib.pyplot as plt
    plt.figure(figsize=(5,5))
    plt.plot([0, 1], [0, 1], 'r--')
    plt.plot(your_model.summary.roc.select('FPR').collect(),
             your_model.summary.roc.select('TPR').collect())
    plt.xlabel('FPR')
    plt.ylabel('TPR')
    plt.show()
    

    【讨论】:

    • 谢谢,这很有帮助。但就我而言,我没有实际的模型。我有一个带有概率和二进制标签的两列 rdd。在 scala 文档中,您可以执行 metrics = BinaryClassificationMetrics(predictionAndLabels) 然后 metrics.roc 给您积分。但这对 pyspark 不起作用。我想这是一个万岁,该功能存在于其他地方?
    • 另外,我可以使用from pyspark.ml.classification import LogisticRegressionModelmdl = LogisticRegressionModel.load(loc) 获取一些 模型对象并重新加载它们,但是当我尝试你的电话时,我得到@987654331 @。对此有什么想法吗?
    • 您在加载模型之前“拟合”了它吗?一旦你训练了模型,摘要就应该在那里。如果你运行mdl.hasSummary 会发生什么?关于管道我还有一个猜测:如果你把那个模型作为管道的最后阶段,你可以用mdl.stages[-1].summary而不是mdl.summary来访问它。
    • 这是一个在另一个程序中训练的模型,然后使用mdl.write().save(s3_path) 保存到 s3。我想它以某种方式丢失了摘要。我想我只需要回到另一个程序并在保存模型之前提取 ROC 点,然后将它们保存为 json 或其他格式。这并不理想,但我想这是最好的选择。感谢您的帮助。
    • 我明白了。通常我不使用保存/加载,所以我对这方面没有帮助。很高兴能帮到你!
    【解决方案2】:

    对于除逻辑回归之外适用于模型的更通用解决方案(例如缺少模型摘要的决策树或随机森林),您可以使用 Spark MLlib 中的BinaryClassificationMetrics 获取 ROC 曲线。

    请注意,PySpark 版本并未实现 Scala version 所做的所有方法,因此您需要使用来自 JavaModelWrapper.call(name) 函数。看来py4j也不支持解析scala.Tuple2类,所以必须手动处理。

    例子:

    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    
    # Scala version implements .roc() and .pr()
    # Python: https://spark.apache.org/docs/latest/api/python/_modules/pyspark/mllib/common.html
    # Scala: https://spark.apache.org/docs/latest/api/java/org/apache/spark/mllib/evaluation/BinaryClassificationMetrics.html
    class CurveMetrics(BinaryClassificationMetrics):
        def __init__(self, *args):
            super(CurveMetrics, self).__init__(*args)
    
        def _to_list(self, rdd):
            points = []
            # Note this collect could be inefficient for large datasets 
            # considering there may be one probability per datapoint (at most)
            # The Scala version takes a numBins parameter, 
            # but it doesn't seem possible to pass this from Python to Java
            for row in rdd.collect():
                # Results are returned as type scala.Tuple2, 
                # which doesn't appear to have a py4j mapping
                points += [(float(row._1()), float(row._2()))]
            return points
    
        def get_curve(self, method):
            rdd = getattr(self._java_model, method)().toJavaRDD()
            return self._to_list(rdd)
    

    用法:

    import matplotlib.pyplot as plt
    
    # Create a Pipeline estimator and fit on train DF, predict on test DF
    model = estimator.fit(train)
    predictions = model.transform(test)
    
    # Returns as a list (false positive rate, true positive rate)
    preds = predictions.select('label','probability').rdd.map(lambda row: (float(row['probability'][1]), float(row['label'])))
    points = CurveMetrics(preds).get_curve('roc')
    
    plt.figure()
    x_val = [x[0] for x in points]
    y_val = [x[1] for x in points]
    plt.title(title)
    plt.xlabel(xlabel)
    plt.ylabel(ylabel)
    plt.plot(x_val, y_val)
    

    Scala 中的 BinaryClassificationMetrics 还实现了其他几个有用的方法:

    metrics = CurveMetrics(preds)
    metrics.get_curve('fMeasureByThreshold')
    metrics.get_curve('precisionByThreshold')
    metrics.get_curve('recallByThreshold')
    

    【讨论】:

    • 此代码给出错误 NameError: name 'points' is not defined。知道为什么吗?
    • 您需要用“roc”替换点,因为您在此处创建列表:roc = CurveMetrics(preds).get_curve('roc') 并替换绘图函数中的变量名称。
    • 感谢@AlexRoss 这真的很有帮助!我希望我可以多次升级!
    • 真的没有办法在不收集整个RDD的情况下转换scala.Tuple2s吗?这让我整天发疯
    • 在给出这个答案的时候,我也花了一天时间研究它,但没有找到办法。但我可能错过了一些东西,或者 API 可能已经改变。同意不适合大型数据集。
    【解决方案3】:

    要获得训练数据(训练模型)的 ROC 指标,我们可以使用 your_model.summary.roc,这是一个包含 FPRTPR 列的 DataFrame。请参阅 Andrea 的回答。

    对于在任意测试数据上评估的 ROC,我们可以使用 labelprobability 列传递给 sklearn 的 roc_curve 以获取 FPR 和 TPR。这里我们假设一个二元分类问题,其中 y 分数是预测为 1 的概率。另见How to split Vector into columns - using PySparkHow to convert a pyspark dataframe column to numpy array

    例子

    from sklearn.metrics import roc_curve
    
    model = lr.fit(train_df)
    test_df_predict = model.transform(test_df)
    
    y_score = test_df_predict.select(vector_to_array("probability")[1]).rdd.keys().collect()
    y_true = test_df_predict.select("label").rdd.keys().collect()
    fpr, tpr, thresholds = roc_curve(y_true, y_score)
    

    【讨论】:

      猜你喜欢
      • 2018-01-03
      • 2019-12-26
      • 2019-06-22
      • 2013-04-27
      • 2019-02-27
      • 2014-07-20
      • 2012-05-02
      • 2018-04-25
      • 2019-02-09
      相关资源
      最近更新 更多