【问题标题】:Add RDD to DataFrame Column PySpark将 RDD 添加到 DataFrame 列 PySpark
【发布时间】:2017-05-21 20:36:49
【问题描述】:

我想创建一个包含两个 RDD 列的数据框。 第一个是我从 CSV 获得的 RDD,第二个是另一个 RDD,每行都有一个集群预测。

我的架构是:

customSchema = StructType([ \
StructField("Area", FloatType(), True), \
StructField("Perimeter", FloatType(), True), \
StructField("Compactness", FloatType(), True), \
StructField("Lenght", FloatType(), True), \
StructField("Width", FloatType(), True), \
StructField("Asymmetry", FloatType(), True), \
StructField("KernelGroove", FloatType(), True)])

映射我的 rdd 并创建 DataFrame:

FN2 = rdd.map(lambda x: (float(x[0]), float(x[1]),float(x[2]),float(x[3]),float(x[4]),float(x[5]),float(x[6])))
 df = sqlContext.createDataFrame(FN2, customSchema)

还有我的集群预测:

result = Kmodel.predict(rdd)

所以,总而言之,我想在我的 DataFrame 中包含我的 CSV 行及其最后的集群预测。

我尝试使用 .WithColumn() 添加一个新列,但我一无所获。

谢谢。

【问题讨论】:

    标签: python sql apache-spark dataframe rdd


    【解决方案1】:

    如果您在两个数据帧上都有一个公共字段,则使用键连接,否则创建一个唯一 Id 并连接两个数据帧以在单个数据帧中获取 CSV 行及其集群预测

    Scala 代码为每一行生成一个唯一的 id,尝试为 pyspark 转换。您需要生成一个递增的行 id 并将它们与行 id 连接起来

    import org.apache.spark.sql.types.{StructType, StructField, LongType}
    val df = sc.parallelize(Seq(("abc", 2), ("def", 1), ("hij", 3))).toDF("word", "count")
    val wcschema = df.schema
    val inputRows = df.rdd.zipWithUniqueId.map{
       case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}
    val wcID = sqlContext.createDataFrame(inputRows, StructType(StructField("id", LongType, false) +: wcschema.fields))
    

    或者使用sql查询

    val tmpTable1 = sqlContext.sql("select row_number() over (order by count) as rnk,word,count from wordcount")
    tmpTable1.show()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2017-06-26
      • 1970-01-01
      • 1970-01-01
      • 2018-08-04
      • 2016-05-29
      • 2017-11-07
      • 1970-01-01
      相关资源
      最近更新 更多