【问题标题】:How to prepare for training data in mllib如何在 mllib 中准备训练数据
【发布时间】:2015-12-16 19:32:42
【问题描述】:

TL;DR; 如何使用 mllib 训练我的 wiki 数据(文本和类别)以预测推文?

我无法弄清楚如何转换我的标记化 wiki 数据,以便可以通过 NaiveBayesLogisticRegression 对其进行训练。我的目标是使用经过训练的模型与推文进行比较*。我已经尝试将管道与 LR 和 HashingTFIDFNaiveBayes 一起使用,但我一直得到错误的预测。这是我尝试过的:

*请注意,我想将 wiki 数据中的许多类别用于我的标签...我只见过二进制分类(它是一个类别或另一个类别)....有可能做我想做的事吗?

带 LR 的管道

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.apache.spark.ml.feature.HashingTF
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.ml.feature.RegexTokenizer

case class WikiData(category: String, text: String)
case class LabeledData(category: String, text: String, label: Double)

val wikiData = sc.parallelize(List(WikiData("Spark", "this is about spark"), WikiData("Hadoop","then there is hadoop")))

val categoryMap = wikiData.map(x=>x.category).distinct.zipWithIndex.mapValues(x=>x.toDouble/1000).collectAsMap

val labeledData = wikiData.map(x=>LabeledData(x.category, x.text, categoryMap.get(x.category).getOrElse(0.0))).toDF

val tokenizer = new RegexTokenizer()
  .setInputCol("text")
  .setOutputCol("words")
  .setPattern("/W+")
val hashingTF = new HashingTF()
  .setNumFeatures(1000)
  .setInputCol(tokenizer.getOutputCol)
  .setOutputCol("features")
val lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.01)
val pipeline = new Pipeline()
  .setStages(Array(tokenizer, hashingTF, lr))

val model = pipeline.fit(labeledData)

model.transform(labeledData).show

朴素贝叶斯

val hashingTF = new HashingTF()
val tf: RDD[Vector] = hashingTF.transform(documentsAsWordSequenceAlready)

import org.apache.spark.mllib.feature.IDF

tf.cache()
val idf = new IDF().fit(tf)
val tfidf: RDD[Vector] = idf.transform(tf)

tf.cache()
val idf = new IDF(minDocFreq = 2).fit(tf)
val tfidf: RDD[Vector] = idf.transform(tf)

//to create tfidfLabeled (below) I ran a map set the labels...but again it seems to have to be 1.0 or 0.0?

NaiveBayes.train(tfidfLabeled)
  .predict(hashingTF.transform(tweet))
  .collect

【问题讨论】:

  • 我不确定我是否真的得到了贾斯汀的困扰!

标签: apache-spark apache-spark-mllib apache-spark-ml


【解决方案1】:

ML LogisticRegression 还不支持多项式分类,但 MLLib NaiveBayesLogisticRegressionWithLBFGS 都支持它。在第一种情况下,它应该默认工作:

import org.apache.spark.mllib.classification.NaiveBayes

val nbModel = new NaiveBayes()
  .setModelType("multinomial") // This is default value
  .run(train)

但对于逻辑回归,您应该提供多个类:

import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS

val model = new LogisticRegressionWithLBFGS()
  .setNumClasses(n) // Set number of classes
  .run(trainingData)

关于预处理步骤,这是一个相当广泛的话题,如果不访问您的数据,很难给您提供有意义的建议,因此您在下面找到的所有内容都只是一个疯狂的猜测:

  • 据我了解,您使用 wiki 数据进行训练,使用推文进行测试。如果这是真的,那通常是一个坏主意。您可以预期,两组使用的词汇、语法和拼写都大不相同
  • 简单的正则表达式标记器可以在标准化文本上表现良好,但根据我的经验,它不适用于像推文这样的非正式文本
  • HashingTF 可能是获得基线模型的好方法,但它是一种极其简化的方法,尤其是在您不应用任何过滤步骤的情况下。如果您决定使用它,您至少应该增加功能数量或使用默认值 (2^20)

编辑(使用 IDF 为朴素贝叶斯准备数据)

  • 使用机器学习管道:
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.ml.feature.IDF
import org.apache.spark.sql.Row

val tokenizer = ???

val hashingTF = new HashingTF()
  .setNumFeatures(1000)
  .setInputCol(tokenizer.getOutputCol)
  .setOutputCol("rawFeatures")

val idf = new IDF()
  .setInputCol(hashingTF.getOutputCol)
  .setOutputCol("features")

val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, idf))
val model = pipeline.fit(labeledData)

model
 .transform(labeledData)
 .select($"label", $"features")
 .map{case Row(label: Double, features: Vector) => LabeledPoint(label, features)}
  • 使用 MLlib 转换器:
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.feature.{IDF, IDFModel}

val labeledData = wikiData.map(x => 
  LabeledData(x.category, x.text, categoryMap.get(x.category).getOrElse(0.0)))

val p = "\\W+".r
val raw = labeledData.map{
    case LabeledData(_, text, label) => (label, p.split(text))}

val hashingTF: org.apache.spark.mllib.feature.HashingTF = new HashingTF(1000)
val tf = raw.map{case (label, text) => (label, hashingTF.transform(text))}

val idf: org.apache.spark.mllib.feature.IDFModel = new IDF().fit(tf.map(_._2))
tf.map{
  case (label, rawFeatures) => LabeledPoint(label, idf.transform(rawFeatures))}

注意:由于转换器需要 JVM 访问权限,因此 MLlib 版本在 PySpark 中不起作用。如果你更喜欢 Python,你必须split data transform and zip

编辑(为机器学习算法准备数据):

虽然下面这段代码乍一看是有效的

val categoryMap = wikiData
  .map(x=>x.category)
  .distinct
  .zipWithIndex
  .mapValues(x=>x.toDouble/1000)
  .collectAsMap

val labeledData = wikiData.map(x=>LabeledData(
    x.category, x.text, categoryMap.get(x.category).getOrElse(0.0))).toDF

它不会为ML 算法生成有效标签。

首先ML 期望标签位于 (0.0, 1.0, ..., n.0) 中,其中 n 是类数。如果您的示例管道其中一个类获得标签 0.001,您将收到如下错误:

ERROR LogisticRegression:分类标签应该在 {0 到 0 发现 1 个无效标签。

显而易见的解决方案是在生成映射时避免除法

.mapValues(x=>x.toDouble)

虽然它适用于 LogisticRegression,但其他 ML 算法仍然会失败。例如RandomForestClassifier 你会得到

RandomForestClassifier 输入的标签列标签无效,但未指定类数。请参阅字符串索引器。

有趣的是 RandomForestClassifier 的 ML 版本与 MLlib 对应的版本不同,它不提供设置多个类的方法。原来它期望在DataFrame 列上设置特殊属性。最简单的方法是使用错误信息中提到的StringIndexer

import org.apache.spark.ml.feature.StringIndexer

val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("label")

val pipeline = new Pipeline()
  .setStages(Array(indexer, tokenizer, hashingTF, idf, lr))

val model = pipeline.fit(wikiData.toDF)

【讨论】:

  • 好的,这是我的问题...也许我没有仔细考虑...也许我没有正确考虑这个问题。但是,如果我使用 NaiveBayes,那么它需要 LabeledPoint 数据。但是,我似乎无法找到一种方法来通过 TFIDF 获取我的文档,然后知道哪个向量是哪个向量,以便我可以用它们的类别标记它们?
  • 您可以使用 ML 或 MLlib。我已经编辑了答案。
  • 万岁!这绝对是我在 Spark 中最薄弱的领域,也是我将在未来几年大力加强的领域:)。但是,对于我现在需要的东西,这太棒了!我会尽量记住明天添加一个赏金来给你额外的努力。
  • @JustinPihony 哦,现在我很失望......我实际上写了一个你最新问题的答案,并意识到它在提交时已被删除:D
  • 当然,我实际上是在使用 KMeans 来解决它...呵呵,只是未删除,所以继续我会投票(一旦我成功解决了这个愚蠢的问题,就接受 :p)
猜你喜欢
  • 2018-03-29
  • 2019-09-30
  • 1970-01-01
  • 1970-01-01
  • 2020-04-05
  • 2020-03-16
  • 1970-01-01
  • 2011-12-31
  • 1970-01-01
相关资源
最近更新 更多