【问题标题】:Spark Multiclass Classification ExampleSpark 多类分类示例
【发布时间】:2015-11-08 20:18:29
【问题描述】:

你们知道在哪里可以找到 Spark 中的多类分类示例吗?我花了很多时间在书籍和网络上搜索,到目前为止我只知道根据文档的最新版本是可能的。

【问题讨论】:

    标签: scala apache-spark apache-spark-mllib random-forest apache-spark-ml


    【解决方案1】:

    机器学习

    在 Spark 2.0+ 中推荐

    我们将使用与下面 MLlib 中相同的数据。有两个基本选项。如果Estimator 支持开箱即用的多类分类(例如随机森林),您可以直接使用它:

    val trainRawDf = trainRaw.toDF
    
    import org.apache.spark.ml.feature.{Tokenizer, CountVectorizer, StringIndexer}
    import org.apache.spark.ml.Pipeline
    
    import org.apache.spark.ml.classification.RandomForestClassifier
    
    val transformers = Array(
      new StringIndexer().setInputCol("group").setOutputCol("label"),
      new Tokenizer().setInputCol("text").setOutputCol("tokens"),
      new CountVectorizer().setInputCol("tokens").setOutputCol("features")
    )
    
    
    val rf = new RandomForestClassifier() 
      .setLabelCol("label")
      .setFeaturesCol("features")
    
    val model = new Pipeline().setStages(transformers :+ rf).fit(trainRawDf)
    
    model.transform(trainRawDf)
    

    如果模型仅支持二元分类(逻辑回归)并扩展o.a.s.ml.classification.Classifier,您可以使用 one-vs-rest 策略:

    import org.apache.spark.ml.classification.OneVsRest
    import org.apache.spark.ml.classification.LogisticRegression
    
    val lr = new LogisticRegression() 
      .setLabelCol("label")
      .setFeaturesCol("features")
    
    val ovr = new OneVsRest().setClassifier(lr)
    
    val ovrModel = new Pipeline().setStages(transformers :+ ovr).fit(trainRawDf)
    

    MLLib

    目前根据official documentation(MLlib 1.6.0)以下方法支持多类分类:

    • 逻辑回归,
    • 决策树,
    • 随机森林,
    • 朴素贝叶斯

    至少有一些例子使用了多类分类:

    忽略方法特定参数的通用框架与 MLlib 中的所有其他方法几乎相同。您必须对输入进行预处理,以创建具有代表labelfeatures 的列的任一数据框:

    root
     |-- label: double (nullable = true)
     |-- features: vector (nullable = true)
    

    RDD[LabeledPoint]

    Spark 提供了广泛的有用工具,旨在促进这一过程,包括 Feature ExtractorsFeature Transformerspipelines

    您会在下面找到一个使用随机森林的相当幼稚的示例。

    首先让我们导入所需的包并创建虚拟数据:

    import sqlContext.implicits._
    import org.apache.spark.ml.feature.{HashingTF, Tokenizer} 
    import org.apache.spark.mllib.regression.LabeledPoint
    import org.apache.spark.ml.feature.StringIndexer
    import org.apache.spark.mllib.tree.RandomForest
    import org.apache.spark.mllib.tree.model.RandomForestModel
    import org.apache.spark.mllib.linalg.{Vectors, Vector}
    import org.apache.spark.mllib.evaluation.MulticlassMetrics
    import org.apache.spark.sql.Row
    import org.apache.spark.rdd.RDD
    
    case class LabeledRecord(group: String, text: String)
    
    val trainRaw = sc.parallelize(
        LabeledRecord("foo", "foo v a y b  foo") ::
        LabeledRecord("bar", "x bar y bar v") ::
        LabeledRecord("bar", "x a y bar z") ::
        LabeledRecord("foobar", "foo v b bar z") ::
        LabeledRecord("foo", "foo x") ::
        LabeledRecord("foobar", "z y x foo a b bar v") ::
        Nil
    )
    

    现在让我们定义所需的转换器和流程列车Dataset

    // Tokenizer to process text fields
    val tokenizer = new Tokenizer()
        .setInputCol("text")
        .setOutputCol("words")
    
    // HashingTF to convert tokens to the feature vector
    val hashingTF = new HashingTF()
        .setInputCol("words")
        .setOutputCol("features")
        .setNumFeatures(10)
    
    // Indexer to convert String labels to Double
    val indexer = new StringIndexer()
        .setInputCol("group")
        .setOutputCol("label")
        .fit(trainRaw.toDF)
    
    
    def transfom(rdd: RDD[LabeledRecord]) = {
        val tokenized = tokenizer.transform(rdd.toDF)
        val hashed = hashingTF.transform(tokenized)
        val indexed = indexer.transform(hashed)
        indexed
            .select($"label", $"features")
            .map{case Row(label: Double, features: Vector) =>
                LabeledPoint(label, features)}
    }
    
    val train: RDD[LabeledPoint] = transfom(trainRaw)
    

    请注意,indexer 已“拟合”在火车数据上。它只是意味着用作标签的分类值被转换为doubles。要对新数据使用分类器,您必须先使用 indexer 对其进行转换。

    接下来我们可以训练 RF 模型:

    val numClasses = 3
    val categoricalFeaturesInfo = Map[Int, Int]()
    val numTrees = 10
    val featureSubsetStrategy = "auto"
    val impurity = "gini"
    val maxDepth = 4
    val maxBins = 16
    
    val model = RandomForest.trainClassifier(
        train, numClasses, categoricalFeaturesInfo, 
        numTrees, featureSubsetStrategy, impurity,
        maxDepth, maxBins
    )
    

    最后测试一下:

    val testRaw = sc.parallelize(
        LabeledRecord("foo", "foo  foo z z z") ::
        LabeledRecord("bar", "z bar y y v") ::
        LabeledRecord("bar", "a a  bar a z") ::
        LabeledRecord("foobar", "foo v b bar z") ::
        LabeledRecord("foobar", "a foo a bar") ::
        Nil
    )
    
    val test: RDD[LabeledPoint] = transfom(testRaw)
    
    val predsAndLabs = test.map(lp => (model.predict(lp.features), lp.label))
    val metrics = new MulticlassMetrics(predsAndLabs)
    
    metrics.precision
    metrics.recall
    

    【讨论】:

    • 你有python的例子吗..还是只有scala支持?
    【解决方案2】:

    您使用的是 Spark 1.6 而不是 Spark 2.1? 我认为问题在于,在 spark 2.1 中,transform 方法返回一个数据集,该数据集可以隐式转换为类型化的 RDD,而在此之前,它返回一个数据帧或行。

    尝试将转换函数的返回类型指定为 RDD[LabeledPoint] 作为诊断,并查看是否遇到相同的错误。

    【讨论】:

    • 这更像是一个评论而不是一个答案,不是吗?
    猜你喜欢
    • 2016-03-03
    • 2017-01-03
    • 1970-01-01
    • 2017-07-16
    • 2016-08-17
    • 2018-07-22
    • 2019-07-09
    • 1970-01-01
    • 2019-05-12
    相关资源
    最近更新 更多