【问题标题】:How to preprocess csv data for Spark 2.0 clustering?如何为 Spark 2.0 集群预处理 csv 数据?
【发布时间】:2018-05-26 06:07:53
【问题描述】:

我有一个非常简单的 csv 文件,如下所示:

time,is_boy,is_girl
135,1,0
136,0,1
137,0,1

我也将这个 csv 文件放在 Hive 表中,其中所有值都已在表中创建为双精度值。

在幕后,这张表实际上是巨大的,并且有大量的行,所以我选择使用 Spark 2 来解决这个问题。

我想在 Python 中使用这个集群库: https://spark.apache.org/docs/2.2.0/ml-clustering.html

如果有人知道如何直接从 csv 或使用一些 Spark SQL 魔法加载这些数据,并使用 Python 正确地对其进行预处理,以便可以将其传递到 kmeans fit() 方法并计算模型, 我会很感激。我也认为它对其他人有用,因为我还没有找到 csvs 和这个库的示例。

【问题讨论】:

  • Spark 有一个内置的 CSV 阅读器,SparkSQL 可以与 Hive 交互(不是魔法,它有据可查)。请展示您的尝试
  • 投反对票有什么原因吗?
  • 您仍然可以edit 提出您的问题以包含您尝试过的内容,并展示您所做的研究,因为正如所写的那样,这并不清楚
  • 关于您的数据集,假设性别是二元特征是否安全?如果是这样,您只需要is_boy = {0, 1}。换句话说,你什么时候有is_boy = 0 & is_girl = 0
  • 感谢板球 007 的回答

标签: python csv apache-spark pyspark


【解决方案1】:

所以我猜了很多次,终于解决了这个问题,我必须做很多奇怪的事情才能让它工作,所以我觉得值得分享:

我像这样创建了一个简单的 csv:

time,is_boy,is_girl
123,1.0,0.0
132,1.0,0.0
135,0.0,1.0
139,0.0,1.0
140,1.0,0.0

然后我创建了一个 hive 表,在 hue 中执行这个查询:

    CREATE EXTERNAL TABLE pollab02.experiment_raw(  
        `time` double,
        `is_boy` double,
        `is_girl` double) 
   ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' with 
   serdeproperties( 'separatorChar' = ',' ) 
   STORED AS TEXTFILE LOCATION "/user/me/hive/experiment" 
   TBLPROPERTIES ("skip.header.line.count"="1", "skip.footer.line.count"="0")

然后我的pyspark脚本如下: (我假设已经创建了一个名为“spark”的 SparkSession)

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler

raw_data = spark.sql("select * from dbname.experiment_raw")

#filter out row of null values that were added for some reason
raw_data_filtered=raw_data.filter(raw_data.time>-1)

#convert rows of strings to doubles for kmeans:
data=raw_data_filtered.select([col(c).cast("double") for c in raw_data_filtered.columns])
cols = data.columns

#Merge data frame with column called features, that contains all data as a vector in each row
vectorAss = VectorAssembler(inputCols=cols, outputCol="features")
vdf=vectorAss.transform(data)
kmeans = KMeans(k=2, maxIter=10, seed=1)
model = kmeans.fit(vdf)

剩下的就是历史了。我没有在这里做过最佳实践。我们可能会从 vdf DataFrame 中删除一些我们不需要的列以节省空间并提高性能,但这很有效。

【讨论】:

    【解决方案2】:

    fit 方法只需要一个向量/Dataframe

    spark.read().csvspark.sql 都返回一个数据框。

    无论你想预处理你的数据,在进入 MlLib / Kmeans 示例之前阅读 Dataframe 文档

    【讨论】:

      猜你喜欢
      • 2017-12-11
      • 2011-01-25
      • 1970-01-01
      • 2020-07-15
      • 1970-01-01
      • 2018-04-24
      • 1970-01-01
      • 2022-11-17
      • 2020-10-29
      相关资源
      最近更新 更多