【问题标题】:Spark 1.5.1, MLLib Random Forest ProbabilitySpark 1.5.1,MLLib 随机森林概率
【发布时间】:2016-01-28 20:08:57
【问题描述】:

我正在使用带有 MLLib 的 Spark 1.5.1。我使用 MLLib 构建了一个随机森林模型,现在使用该模型进行预测。我可以使用 .predict 函数找到预测类别(0.0 或 1.0)。但是,我找不到检索概率的功能(请参阅随附的屏幕截图)。我认为 spark 1.5.1 随机森林会提供概率,我在这里遗漏了什么吗?

【问题讨论】:

    标签: scala apache-spark random-forest apache-spark-mllib


    【解决方案1】:

    很遗憾,该功能在旧版 Spark MLlib 1.5.1 中不可用。

    但是,您可以在 Spark MLlib 2.x 中最近的 Pipeline API 中以 RandomForestClassifier 的形式找到它:

    import org.apache.spark.ml.Pipeline
    import org.apache.spark.ml.classification.RandomForestClassifier
    import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
    import org.apache.spark.mllib.util.MLUtils
    
    // Load and parse the data file, converting it to a DataFrame.
    val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt").toDF
    
    // Index labels, adding metadata to the label column.
    // Fit on whole dataset to include all labels in index.
    val labelIndexer = new StringIndexer()
      .setInputCol("label")
      .setOutputCol("indexedLabel").fit(data)
    
    // Automatically identify categorical features, and index them.
    // Set maxCategories so features with > 4 distinct values are treated as continuous.
    val featureIndexer = new VectorIndexer()
      .setInputCol("features")
      .setOutputCol("indexedFeatures")
      .setMaxCategories(4).fit(data)
    
    // Split the data into training and test sets (30% held out for testing)
    val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
    
    // Train a RandomForest model.
    val rf = new RandomForestClassifier()
      .setLabelCol(labelIndexer.getOutputCol)
      .setFeaturesCol(featureIndexer.getOutputCol)
      .setNumTrees(10)
    
    // Convert indexed labels back to original labels.
    val labelConverter = new IndexToString()
      .setInputCol("prediction")
      .setOutputCol("predictedLabel")
      .setLabels(labelIndexer.labels)
    
    // Chain indexers and forest in a Pipeline
    val pipeline = new Pipeline()
      .setStages(Array(labelIndexer, featureIndexer, rf, labelConverter))
    
    // Fit model. This also runs the indexers.
    val model = pipeline.fit(trainingData)
    
    // Make predictions.
    val predictions = model.transform(testData)
    // predictions: org.apache.spark.sql.DataFrame = [label: double, features: vector, indexedLabel: double, indexedFeatures: vector, rawPrediction: vector, probability: vector, prediction: double, predictedLabel: string]
    
    predictions.show(10)
    // +-----+--------------------+------------+--------------------+-------------+-----------+----------+--------------+
    // |label|            features|indexedLabel|     indexedFeatures|rawPrediction|probability|prediction|predictedLabel|
    // +-----+--------------------+------------+--------------------+-------------+-----------+----------+--------------+
    // |  0.0|(692,[124,125,126...|         1.0|(692,[124,125,126...|   [0.0,10.0]|  [0.0,1.0]|       1.0|           0.0|
    // |  0.0|(692,[124,125,126...|         1.0|(692,[124,125,126...|    [1.0,9.0]|  [0.1,0.9]|       1.0|           0.0|
    // |  0.0|(692,[129,130,131...|         1.0|(692,[129,130,131...|    [1.0,9.0]|  [0.1,0.9]|       1.0|           0.0|
    // |  0.0|(692,[154,155,156...|         1.0|(692,[154,155,156...|    [1.0,9.0]|  [0.1,0.9]|       1.0|           0.0|
    // |  0.0|(692,[154,155,156...|         1.0|(692,[154,155,156...|    [1.0,9.0]|  [0.1,0.9]|       1.0|           0.0|
    // |  0.0|(692,[181,182,183...|         1.0|(692,[181,182,183...|    [1.0,9.0]|  [0.1,0.9]|       1.0|           0.0|
    // |  1.0|(692,[99,100,101,...|         0.0|(692,[99,100,101,...|    [4.0,6.0]|  [0.4,0.6]|       1.0|           0.0|
    // |  1.0|(692,[123,124,125...|         0.0|(692,[123,124,125...|   [10.0,0.0]|  [1.0,0.0]|       0.0|           1.0|
    // |  1.0|(692,[124,125,126...|         0.0|(692,[124,125,126...|   [10.0,0.0]|  [1.0,0.0]|       0.0|           1.0|
    // |  1.0|(692,[125,126,127...|         0.0|(692,[125,126,127...|   [10.0,0.0]|  [1.0,0.0]|       0.0|           1.0|
    // +-----+--------------------+------------+--------------------+-------------+-----------+----------+--------------+
    // only showing top 10 rows
    

    注意:此示例来自 Spark MLlib 的ML - Random forest classifier 的官方文档。

    这里是一些输出列的一些解释:

    • predictionCol 代表预测标签。
    • rawPredictionCol 表示长度为 # 个类别的向量,其中包含进行预测的树节点处的训练实例标签计数(仅适用于分类)。
    • probabilityCol 表示长度 # 类等于 rawPrediction 的概率向量,归一化为多项分布(仅适用于分类)。

    【讨论】:

    • 我明白了……为什么要制作 ml-randomForest 和 mlliv-randomForest?这两个库有什么区别?为什么不合二为一呢?
    • spark.ml 包旨在提供一套基于 DataFrame 的统一高级 API,帮助用户创建和调整实用的机器学习管道,而原始 MLlib 库处理 RDD。在 DataFrame 上构建算法在概念上与传统的 RDD 上的 map reduce 操作非常不同。两个库的统一并不像声音那么简单。
    • 我的训练数据非常大并且存储在 RDD 中。有没有办法可以用 RDD 训练 ml-randomForest?或者无论如何我可以使用 mllib-randomForest 检索概率?谢谢!
    • 您需要将其转换为 DataFrame,别无他法。这不会是非常昂贵的想法,即使类型转换仍未优化。但是在 Spark 中的 Tungsten 项目中,对 DataFrame 进行的操作得到了优化,从而在时间上得到了更好的执行。这意味着当您应用算法时,您可能会因转换而浪费的时间会在计算级别上获得。
    • DataFrame 是对 RDD 的结构抽象。这不是 RDD。从概念上讲,它与 R/Pandas 中的相同,但它是分布式的,因为它的内部结构实际上是一个 RDD。换句话说,如果它适合你的 RDD,它就会适合你的 DataFrame。
    【解决方案2】:

    你不能直接得到分类概率,但自己计算相对容易。 RandomForest 是树的集合,其输出概率是这些树的多数票除以树的总数。

    由于 MLib 中的 RandomForestModel 为您提供了经过训练的树,因此您自己很容易做到。以下代码给出了二元分类问题的概率。它对多类分类的推广很简单。

      def predict(points: RDD[LabeledPoint], model: RandomForestModel) = {
        val numTrees = model.trees.length
        val trees = points.sparkContext.broadcast(model.trees)
        points.map { point =>
        trees.value
        .map(_.predict(point.features))
        .sum / numTrees
      }
    

    }

    对于多类情况,您只需将 map 替换为 .map(_.predict(point.features)-> 1.0) 并按键而不是 sum 分组,最后取最大值。

    【讨论】:

    • 感谢 TNM!但在我的用例中,我将预测函数应用于数据点,而不是 RDD。即我有 modelObject.predict(myPoint) ,其中 myPoint 的类型为: org.apache.spark.mllib.linalg.Vector 。我还能计算这种情况的概率吗?谢谢!
    • 是的,你可以。只需将函数的输入类型替换为 point:Vector 并删除 points.map 部分,而只需使用 trees.map{...
    • 感谢 TNM!当我尝试这样做时: val trees = point.sparkContext.broadcast(modelObject.trees) where point is of type: org.apache.spark.mllib.linalg.Vector ,我收到错误消息: value sparkContext is not a member of org.apache.spark.mllib.linalg.Vector。我错过了什么吗?有没有办法来解决这个问题?谢谢!
    • 如果我这样做似乎可以工作: val prob = modelObject.trees.map(_.predict(point)).sum / modelObject.trees.length 这是正确的吗?谢谢!
    • 是正确的。在原始答案中使用广播只是一种优化。
    猜你喜欢
    • 2015-05-03
    • 2015-12-13
    • 2012-12-20
    • 2018-10-31
    • 2017-06-13
    • 2016-06-09
    • 2012-10-24
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多