-
导入所有你需要的库
from pyspark.mllib.classification import LogisticRegressionWithSGD, LogisticRegressionModel
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.regression import LabeledPoint
import re
-
将您的数据加载到RDD
msgs = [("I love Star Wars but I can't watch it today", 1.0),
("I don't love Star Wars and people want to watch it today", 0.0),
("I dislike not being able to watch Star Wars", 1.0),
("People who love Star Wars are my friends", 1.0),
("I preffer to watch Star Wars on Netflix", 0.0),
("George Lucas shouldn't have sold the franchise", 1.0),
("Disney makes better movies than everyone else", 0.0)]
rdd = sc.parallelize(msgs)
-
标记您的数据(如果您使用ML 可能会更容易)和
rdd = rdd.map(lambda (text, label): ([w.lower() for w in re.split(" +", text)], label))
-
删除所有不必要的词(广泛称为停用词)和符号,例如,.&
commons = ["and", "but", "to"]
rdd = rdd.map(lambda (tokens, label): (filter(lambda token: token not in commons, tokens), label))
-
在所有数据集中创建一个包含所有distinct 单词的字典,这听起来很庞大,但它们并没有您期望的那么多,我敢打赌它们会适合您的主节点(但是还有其他方法可以解决这个问题,但为简单起见,我将保持这种方式)。
# finds different words
words = rdd.flatMap(lambda (tokens, label): tokens).distinct().collect()
diffwords = len(words)
-
将您的features 转换为DenseVector 或SparseVector,我显然会推荐第二种方式,因为通常SparseVector 需要更少的空间来表示,但这取决于数据。请注意,有更好的选择,例如 hashing,但我试图保持对我冗长的方法的忠诚。之后将tuple 转换为LabeledPoint
def sparsify(length, tokens):
indices = [words.index(t) for t in set(tokens)]
quantities = [tokens.count(words[i]) for i in indices]
return SparseVector(length, [(indices[i], quantities[i]) for i in xrange(len(indices))])
rdd = rdd.map(lambda (tokens, label): LabeledPoint(label, sparsify(diffwords, tokens)))
-
适合你最喜欢的模型,在这种情况下我使用了LogisticRegressionWithSGD,因为别有用心。
lrm = LogisticRegressionWithSGD.train(rdd)
-
Save你的模特。
lrm.save(sc, "mylovelymodel.model")
-
Load 你的LogisticRegressionModel 在另一个应用程序中。
lrm = LogisticRegressionModel.load(sc, "mylovelymodel.model")
-
Predict 类别。
lrm.predict(SparseVector(37,[2,4,5,13,15,19,23,26,27,29],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]))
# outputs 0