【问题标题】:How to replace nulls in Vector column?如何替换向量列中的空值?
【发布时间】:2018-11-17 10:29:31
【问题描述】:

我有一列 [vector] 类型的列,其中包含无法删除的空值,这是一个示例

import org.apache.spark.mllib.linalg.Vectors

val sv1: Vector = Vectors.sparse(58, Array(8, 45), Array(1.0, 1.0))
val df_1 = sc.parallelize(List(("id_1", sv1))).toDF("id", "feature_vector")
val df_2 = sc.parallelize(List(("id_1", 10.0), ("id_2", 10.0))).toDF("id", "numeric_feature")

val df_joined = df_1.join(df_2, Seq("id"), "right")

df_joined.show()

+----+--------------------+---------------+
|  id|      feature_vector|numeric_feature|
+----+--------------------+---------------+
|id_1|(58,[8,45],[1.0,1...|           10.0|
|id_2|                null|           10.0|
+----+--------------------+---------------+

我想做的事:

val map = Map("feature_vector" -> sv1)
val result = df_joined.na.fill(map)

但这会引发错误:

Message: Unsupported value type org.apache.spark.mllib.linalg.SparseVector ((58,[8,45],[1.0,1.0])).

我尝试过的其他事情:

df_joined.withColumn("feature_vector", when(col("feature_vector").isNull, sv1).otherwise(sv1)).show

来自how to filter out a null value from spark dataframe

我正在努力寻找适用于 Spark 1.6 的解决方案

【问题讨论】:

  • 为了增加您的问题,我认为您不能从 1.6 中的 UDF 返回向量。
  • @philantrovert 我想我在一次尝试中也碰到了那堵墙。幸运的是,user8371915 的建议奏效了!
  • @user8371915 的回答肯定更好,不需要在 RDD 和 DF 之间切换。请接受。
  • @philantrovert 我的错,出于某种原因,我认为您可以接受多种解决方案。谢谢!

标签: scala apache-spark apache-spark-sql apache-spark-1.6


【解决方案1】:

如果您愿意,可以在这里借助 RDD:

val naFillRDD = df_joined.map{ r => r match{
  case Row(id, feature_vector: Vector, numeric_feature ) => Row(id, feature_vector, numeric_feature )
  case Row(id, _, numeric_feature) => Row(id, sv1, numeric_feature)
}}

然后切换回dataframe:

val naFillDF = sqlContext.createDataFrame(naFillRDD, df_joined.schema)

naFillDF.show(false)
//+----+---------------------+---------------+
//|id  |feature_vector       |numeric_feature|
//+----+---------------------+---------------+
//|id_1|(58,[8,45],[1.0,1.0])|10.0           |
//|id_2|(58,[8,45],[1.0,1.0])|10.0           |
//+----+---------------------+---------------+

【讨论】:

    【解决方案2】:

    合并和加入应该可以解决问题

    import org.apache.spark.sql.functions.{coalesce, broadcast}
    
    val fill = Seq(
      Tuple1(Vectors.sparse(58, Array(8, 45), Array(1.0, 1.0)))
    ).toDF("fill")
    
    
    df_joined
      .join(broadcast(fill))
      .withColumn("feature_vector", coalesce($"feature_vector", $"fill"))
      .drop("fill")
    

    【讨论】:

    • 在 Spark > 2.X 中你需要使用 crossJoin 而不是 join
    猜你喜欢
    • 1970-01-01
    • 2016-06-10
    • 1970-01-01
    • 1970-01-01
    • 2020-01-17
    • 2015-01-28
    • 2015-07-09
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多