【发布时间】:2017-08-17 04:29:39
【问题描述】:
我在 Spark 中有一个类似于下面的数据框,我想按 id 列对其进行分组,然后对于分组数据中的每一行,我需要使用来自 weight 的元素创建一个稀疏向量index 列指定的索引处的列。稀疏向量的长度是已知的,在本例中为 1000。
数据框df:
+-----+------+-----+
| id|weight|index|
+-----+------+-----+
|11830| 1| 8|
|11113| 1| 3|
| 1081| 1| 3|
| 2654| 1| 3|
|10633| 1| 3|
|11830| 1| 28|
|11351| 1| 12|
| 2737| 1| 26|
|11113| 3| 2|
| 6590| 1| 2|
+-----+------+-----+
我已经阅读了this,这与我想要做的事情有点相似,但是对于 rdd。有谁知道使用 Scala 为 Spark 中的数据帧执行此操作的好方法?
到目前为止,我的尝试是首先将权重和索引收集为如下列表:
val dfWithLists = df
.groupBy("id")
.agg(collect_list("weight") as "weights", collect_list("index") as "indices"))
看起来像:
+-----+---------+----------+
| id| weights| indices|
+-----+---------+----------+
|11830| [1, 1]| [8, 28]|
|11113| [1, 3]| [3, 2]|
| 1081| [1]| [3]|
| 2654| [1]| [3]|
|10633| [1]| [3]|
|11351| [1]| [12]|
| 2737| [1]| [26]|
| 6590| [1]| [2]|
+-----+---------+----------+
然后我定义一个 udf 并做这样的事情:
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.sql.functions.udf
def toSparseVector: ((Array[Int], Array[BigInt]) => Vector) = {(a1, a2) => Vectors.sparse(1000, a1, a2.map(x => x.toDouble))}
val udfToSparseVector = udf(toSparseVector)
val dfWithSparseVector = dfWithLists.withColumn("SparseVector", udfToSparseVector($"indices", $"weights"))
但这似乎不起作用,感觉应该有一种更简单的方法来做到这一点,而无需先将权重和索引收集到列表中。
我对 Spark、Dataframes 和 Scala 还很陌生,因此非常感谢任何帮助。
【问题讨论】:
标签: scala apache-spark spark-dataframe