【问题标题】:Spark Dataframe of WrappedArray to Dataframe[Vector]WrappedArray 的 Spark Dataframe 到 Dataframe[Vector]
【发布时间】:2017-10-18 11:46:51
【问题描述】:

我有一个 spark Dataframe df 具有以下架构:

root
 |-- features: array (nullable = true)
 |    |-- element: double (containsNull = false)

我想创建一个新的数据框,其中每一行都是Doubles 的向量,并希望获得以下架构:

root
     |-- features: vector (nullable = true)

到目前为止,我有以下代码(受这篇文章的影响:Converting Spark Dataframe(with WrappedArray) to RDD[labelPoint] in scala),但我担心它有问题,因为即使计算合理数量的行也需要很长时间。 此外,如果行太多,应用程序将因堆空间异常而崩溃。

val clustSet = df.rdd.map(r => {
          val arr = r.getAs[mutable.WrappedArray[Double]]("features")
          val features: Vector = Vectors.dense(arr.toArray)
          features
          }).map(Tuple1(_)).toDF()

我怀疑指令 arr.toArray 在这种情况下不是一个好的 Spark 实践。任何澄清都会非常有帮助。

谢谢!

【问题讨论】:

    标签: scala apache-spark spark-dataframe


    【解决方案1】:

    这是因为.rdd 必须从内部内存格式中反序列化对象,这非常耗时。

    可以使用.toArray - 您在行级别上操作,而不是将所有内容收集到驱动程序节点。

    您可以使用 UDF 轻松做到这一点:

    import org.apache.spark.ml.linalg._
    val convertUDF = udf((array : Seq[Double]) => {
      Vectors.dense(array.toArray)
    })
    val withVector = dataset
      .withColumn("features", convertUDF('features))
    

    代码来自这个答案:Convert ArrayType(FloatType,false) to VectorUTD

    但是问题的作者没有询问差异

    【讨论】:

    • 非常感谢,这帮助很大,并将其标记为答案。我现在可以运行更多行,而且时间上令人满意。我仍然遇到一个异常:org.apache.spark.SparkException:Kryo 序列化失败:缓冲区溢出。当我尝试 200,000 行时,可用:0,必需:1。你对此有什么见解吗?再次感谢。
    • 我在我的代码中设置了以下内容:val conf = new SparkConf() .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set( "spark.kryoserializer.buffer.max.mb","256") 并且成功了!谢谢。
    猜你喜欢
    • 2018-01-13
    • 2017-06-02
    • 2017-05-13
    • 2016-11-01
    • 2021-06-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-02-03
    相关资源
    最近更新 更多