【问题标题】:Preparing data for LDA in spark在 Spark 中为 LDA 准备数据
【发布时间】:2016-02-06 16:36:24
【问题描述】:

我正在实现一个 Spark LDA 模型(通过 Scala API),但在为我的数据进行必要的格式化步骤时遇到了麻烦。我的原始数据(存储在文本文件中)采用以下格式,本质上是令牌列表及其对应的文档。一个简化的例子:

doc XXXXX   term    XXXXX
1   x       'a'     x
1   x       'a'     x
1   x       'b'     x
2   x       'b'     x
2   x       'd'     x
...

XXXXX 列是我不关心的垃圾数据。我意识到这是一种非典型的存储语料库数据的方式,但这就是我所拥有的。我希望从示例中可以清楚地看出,原始数据中每个 token 有一行(因此,如果给定术语在文档中出现 5 次,则对应于 5 行文本)。

无论如何,我需要将此数据格式化为稀疏词频向量以运行 Spark LDA 模型,但我不熟悉 Scala,因此遇到了一些麻烦。

我开始:

import org.apache.spark.mllib.clustering.{LDA, DistributedLDAModel}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.rdd.RDD

val corpus:RDD[Array[String]] = sc.textFile("path/to/data")
    .map(_.split('\t')).map(x => Array(x(0),x(2)))

然后我得到生成稀疏向量所需的词汇数据:

val vocab: RDD[String] = corpus.map(_(1)).distinct()
val vocabMap: Map[String, Int] = vocab.collect().zipWithIndex.toMap

我不知道在这里使用正确的映射函数,这样我就可以为每个文档得到一个稀疏的词频向量,然后我可以将其输入 LDA 模型。我想我需要一些类似的东西......

val documents: RDD[(Long, Vector)] = corpus.groupBy(_(0)).zipWithIndex
    .map(x =>(x._2,Vectors.sparse(vocabMap.size, ???)))

此时我可以运行实际的 LDA:

val lda = new LDA().setK(n_topics)
val ldaModel = lda.run(documents)

基本上,我不会对每个组应用什么函数,这样我就可以将词频数据(大概是map?)输入到稀疏向量中。也就是说,上面代码sn -p中的???怎么填写才能达到想要的效果呢?

【问题讨论】:

    标签: scala apache-spark apache-spark-mllib lda


    【解决方案1】:

    一种处理方法:

    • 确保spark-csv 包可用
    • 将数据加载到 DataFrame 并选择感兴趣的列

      val df = sqlContext.read
          .format("com.databricks.spark.csv")
          .option("header", "true")
          .option("inferSchema", "true") // Optional, providing schema is prefered
          .option("delimiter", "\t")
          .load("foo.csv")
          .select($"doc".cast("long").alias("doc"), $"term")
      
    • 索引term 列:

      import org.apache.spark.ml.feature.StringIndexer
      
      val indexer = new StringIndexer()
        .setInputCol("term")
        .setOutputCol("termIndexed")
      
      val indexed = indexer.fit(df)
        .transform(df)
        .drop("term")
        .withColumn("termIndexed", $"termIndexed".cast("integer"))
        .groupBy($"doc", $"termIndexed")
        .agg(count(lit(1)).alias("cnt").cast("double"))
      
    • 转换为PairwiseRDD

      import org.apache.spark.sql.Row
      
      val pairs = indexed.map{case Row(doc: Long, term: Int, cnt: Double) => 
        (doc, (term, cnt))}
      
    • 按文档分组:

      val docs = pairs.groupByKey
      
    • 创建特征向量

      import org.apache.spark.mllib.linalg.Vectors
      import org.apache.spark.sql.functions.max
      
      val n = indexed.select(max($"termIndexed")).first.getInt(0) + 1
      
      val docsWithFeatures = docs.mapValues(vs => Vectors.sparse(n, vs.toSeq))
      
    • 现在您拥有创建LabeledPoints 或应用额外处理所需的一切

    【讨论】:

    • 这很棒!但是,经过一些初步实验后,我想尝试应用 TF-IDF 作为预处理步骤。问题是 Spark 的 TF-IDF 的 documentation 表明考虑到这种稀疏矢量格式的数据,没有直接的方法可以做到这一点。有什么建议吗?
    猜你喜欢
    • 2017-10-26
    • 1970-01-01
    • 1970-01-01
    • 2013-11-08
    • 2016-02-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多