【问题标题】:Preparing data for LDA training with PySpark 1.6使用 PySpark 1.6 为 LDA 训练准备数据
【发布时间】:2017-10-26 08:41:41
【问题描述】:

我有一个文档语料库,我正在将其读入 spark 数据框。 我已经对文本进行了标记和矢量化,现在我想将矢量化数据输入到 mllib LDA 模型中。 LDA API 文档似乎要求数据是:

rdd – 文档的 RDD,它是文档 ID 和术语(单词)计数向量的元组。术语计数向量是具有固定大小词汇表的“词袋”(其中词汇表大小是向量的长度)。文档 ID 必须是唯一的并且 >= 0。

如何从我的数据框获取合适的 rdd?

from pyspark.mllib.clustering import LDA
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import CountVectorizer

#read the data
tf = sc.wholeTextFiles("20_newsgroups/*")

#transform into a data frame
df = tf.toDF(schema=['file','text'])

#tokenize
tokenizer = Tokenizer(inputCol="text", outputCol="words")
tokenized = tokenizer.transform(df)

#vectorize
cv = CountVectorizer(inputCol="words", outputCol="vectors")
model = cv.fit(tokenized)
result = model.transform(tokenized)

#transform into a suitable rdd
myrdd = ?

#LDA
model = LDA.train(myrdd, k=2, seed=1)

PS:我使用的是 Apache Spark 1.6.3

【问题讨论】:

  • 如果我可能会问,你为什么使用 MLlib 的 LDA ? LDA 可用于 spark-ml
  • 只是试图在几个教程中拼接。不反对采取不同的方法。
  • 那我建议看一下spark-ml的官方文档。这很简单。您的价值结果已准备好照常提供。
  • 刚刚检查了我的 sprk 版本。它是 1.6 并且在 pyspark.ml.clustering 中似乎没有 LDA
  • 假期周末后需要尝试一下。谢谢。

标签: apache-spark pyspark apache-spark-mllib apache-spark-ml


【解决方案1】:

让我们首先组织导入,读取数据,做一些简单的特殊字符删除并将其转换为DataFrame

import re # needed to remove special character
from pyspark import Row

from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import Tokenizer, CountVectorizer
from pyspark.mllib.clustering import LDA
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, LongType

pattern = re.compile('[\W_]+') 

rdd = sc.wholeTextFiles("./data/20news-bydate/*/*/*") \
    .mapValues(lambda x: pattern.sub(' ', x)).cache() # ref. https://stackoverflow.com/a/1277047/3415409

df = rdd.toDF(schema=['file', 'text'])

我们需要为每个Row 添加一个索引。以下代码 sn-p 的灵感来自这个关于添加 primary keys with Apache Spark 的问题:

row_with_index = Row(*["id"] + df.columns)

def make_row(columns):
    def _make_row(row, uid):
        row_dict = row.asDict()
        return row_with_index(*[uid] + [row_dict.get(c) for c in columns])

    return _make_row

f = make_row(df.columns)

indexed = (df.rdd
           .zipWithUniqueId()
           .map(lambda x: f(*x))
           .toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))

添加索引后,我们可以进行特征清理、提取和转换:

# tokenize
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
tokenized = tokenizer.transform(indexed)

# remove stop words
remover = StopWordsRemover(inputCol="tokens", outputCol="words")
cleaned = remover.transform(tokenized)

# vectorize
cv = CountVectorizer(inputCol="words", outputCol="vectors")
count_vectorizer_model = cv.fit(cleaned)
result = count_vectorizer_model.transform(cleaned)

现在,让我们将结果数据帧转换回 rdd

corpus = result.select(F.col('id').cast("long"), 'vectors').rdd \
    .map(lambda x: [x[0], x[1]])

我们的数据现在可以接受训练了:

# training data
lda_model = LDA.train(rdd=corpus, k=10, seed=12, maxIterations=50)
# extracting topics
topics = lda_model.describeTopics(maxTermsPerTopic=10)
# extraction vocabulary
vocabulary = count_vectorizer_model.vocabulary

我们现在可以打印主题描述如下:

for topic in range(len(topics)):
    print("topic {} : ".format(topic))
    words = topics[topic][0]
    scores = topics[topic][1]
    [print(vocabulary[words[word]], "->", scores[word]) for word in range(len(words))]

PS : 上面的代码是用 Spark 1.6.3 测试的。

【讨论】:

  • 现在如何用这个训练有素的模型测试看不见的数据?我在 spark 2.1.1 中实现了或多或少相同的代码。我已经搜索了答案,他们中的大多数人都推荐这个 topicDistributions() 属性,但我收到这个错误,即 LDAModel 对象没有 topicDistributions() 属性。还有其他方法可以测试看不见的数据吗?
  • 我正在使用 spark-mllib。 from pyspark.mllib.clustering import LDA, LDAModel。我已经发布了问题[问题] stackoverflow.com/questions/55808041/…>
猜你喜欢
  • 2018-03-29
  • 2019-09-30
  • 2020-04-29
  • 2016-02-06
  • 2020-04-05
  • 2016-08-26
  • 2020-03-16
  • 2015-12-16
  • 2021-12-26
相关资源
最近更新 更多