【发布时间】:2017-03-08 11:00:52
【问题描述】:
我正在使用 PCA 进行数据分析,我用 PySpark 编写了这段代码,它运行良好,但它仅适用于从 csv 文件中读取的数据,该文件有 5 列 ["a","b","c" ,"d","e"],我想编写一个通用代码来计算从 csv 文件中读取的任何数量的列的 PCA。我应该添加什么? 这是我的代码:
#########################! importing libraries !########################
from __future__ import print_function
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.ml.feature import PCA, VectorAssembler
from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.mllib.feature import Normalizer
import timeit
########################! main script !#################################
sc = SparkContext("local", "pca-app")
sqlContext = SQLContext(sc)
if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("PCAExample")\
.getOrCreate()
data = sc.textFile('dataset.csv') \
.map(lambda line: [float(k) for k in line.split(';')])\
.collect()
df = spark.createDataFrame(data, ["a","b","c","d","e"])
df.show()
vecAssembler = VectorAssembler(inputCols=["a","b","c","d","e"], outputCol="features")
pca = PCA(k=2, inputCol="features", outputCol="pcaFeatures")
pipeline = Pipeline(stages=[vecAssembler, pca]
model = pipeline.fit(df)
result = model.transform(df).select("pcaFeatures")
result.show(truncate=False))
spark.stop()
【问题讨论】:
标签: python csv apache-spark pyspark pca