【问题标题】:How to aggregate a Spark data frame to get a sparse vector using Scala?如何使用 Scala 聚合 Spark 数据帧以获取稀疏向量?
【发布时间】:2017-08-17 04:29:39
【问题描述】:

我在 Spark 中有一个类似于下面的数据框,我想按 id 列对其进行分组,然后对于分组数据中的每一行,我需要使用来自 weight 的元素创建一个稀疏向量index 列指定的索引处的列。稀疏向量的长度是已知的,在本例中为 1000。

数据框df:

+-----+------+-----+
|   id|weight|index|
+-----+------+-----+
|11830|     1|    8|
|11113|     1|    3|
| 1081|     1|    3|
| 2654|     1|    3|
|10633|     1|    3|
|11830|     1|   28|
|11351|     1|   12|
| 2737|     1|   26|
|11113|     3|    2|
| 6590|     1|    2|
+-----+------+-----+

我已经阅读了this,这与我想要做的事情有点相似,但是对于 rdd。有谁知道使用 Scala 为 Spark 中的数据帧执行此操作的好方法?

到目前为止,我的尝试是首先将权重和索引收集为如下列表:

val dfWithLists = df
    .groupBy("id")
    .agg(collect_list("weight") as "weights", collect_list("index") as "indices"))

看起来像:

+-----+---------+----------+
|   id|  weights|   indices|
+-----+---------+----------+
|11830|   [1, 1]|   [8, 28]|
|11113|   [1, 3]|    [3, 2]|
| 1081|      [1]|       [3]|
| 2654|      [1]|       [3]|
|10633|      [1]|       [3]|
|11351|      [1]|      [12]|
| 2737|      [1]|      [26]|
| 6590|      [1]|       [2]|
+-----+---------+----------+

然后我定义一个 udf 并做这样的事情:

import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.sql.functions.udf

def toSparseVector: ((Array[Int], Array[BigInt]) => Vector) = {(a1, a2) => Vectors.sparse(1000, a1, a2.map(x => x.toDouble))}
val udfToSparseVector = udf(toSparseVector)

val dfWithSparseVector = dfWithLists.withColumn("SparseVector", udfToSparseVector($"indices", $"weights"))

但这似乎不起作用,感觉应该有一种更简单的方法来做到这一点,而无需先将权重和索引收集到列表中。

我对 Spark、Dataframes 和 Scala 还很陌生,因此非常感谢任何帮助。

【问题讨论】:

    标签: scala apache-spark spark-dataframe


    【解决方案1】:

    您必须收集它们,因为向量必须是本地的,单机:https://spark.apache.org/docs/latest/mllib-data-types.html#local-vector

    要创建稀疏向量,您有 2 个选项,使用无序 (index, value) 对或指定索引和值数组: https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.linalg.Vectors$

    如果您可以将数据转换为不同的格式(旋转),您还可以使用 VectorAssembler: https://spark.apache.org/docs/latest/ml-features.html#vectorassembler

    通过一些小的调整,您可以让您的方法发挥作用:

    :paste
    // Entering paste mode (ctrl-D to finish)
    
    import org.apache.spark.mllib.linalg.Vectors
    import org.apache.spark.mllib.regression.LabeledPoint
    
    val df = Seq((11830,1,8), (11113, 1, 3), (1081, 1,3), (2654, 1, 3), (10633, 1, 3), (11830, 1, 28), (11351, 1, 12), (2737, 1, 26), (11113, 3, 2), (6590, 1, 2)).toDF("id", "weight", "index")
    
    val dfWithFeat = df
      .rdd
      .map(r => (r.getInt(0), (r.getInt(2), r.getInt(1).toDouble)))
      .groupByKey()
      .map(r => LabeledPoint(r._1, Vectors.sparse(1000, r._2.toSeq)))
      .toDS
    
    dfWithFeat.printSchema
    dfWithFeat.show(10, false)
    
    
    // Exiting paste mode, now interpreting.
    
    root
    |-- label: double (nullable = true)
    |-- features: vector (nullable = true)
    
    +-------+-----------------------+
    |label  |features               |
    +-------+-----------------------+
    |11113.0|(1000,[2,3],[3.0,1.0]) |
    |2737.0 |(1000,[26],[1.0])      |
    |10633.0|(1000,[3],[1.0])       |
    |1081.0 |(1000,[3],[1.0])       |
    |6590.0 |(1000,[2],[1.0])       |
    |11830.0|(1000,[8,28],[1.0,1.0])|
    |2654.0 |(1000,[3],[1.0])       |
    |11351.0|(1000,[12],[1.0])      |
    +-------+-----------------------+
    
    dfWithFeat: org.apache.spark.sql.Dataset[org.apache.spark.mllib.regression.LabeledPoint] = [label: double, features: vector]
    

    【讨论】:

    • 谢谢!当索引向量以严格的递增顺序排列时,这是有效的。如果索引向量没有排序,有没有办法做到这一点?我收到此错误:java.lang.IllegalArgumentException:要求失败:索引 324 跟随 660 并且没有严格增加
    • 它现在使用一系列未排序的对(索引、权重)来创建向量,它们的顺序不再重要。
    猜你喜欢
    • 1970-01-01
    • 2017-08-30
    • 1970-01-01
    • 1970-01-01
    • 2023-03-20
    • 2017-05-18
    • 2021-11-02
    • 2016-06-02
    • 2021-04-14
    相关资源
    最近更新 更多