【问题标题】:Spark Scala Cosine Similarity MatrixSpark Scala 余弦相似度矩阵
【发布时间】:2019-12-23 02:17:39
【问题描述】:

scala (pyspark guy) 的新手并尝试计算行(项目)之间的余弦相似度

以创建示例 df 为例:

Spark, Scala, DataFrame: create feature vectors

import org.apache.spark.ml.feature.VectorAssembler

val df = sc.parallelize(Seq(
  (1, "cat1", 1), (1, "cat2", 3), (1, "cat9", 5), (2, "cat4", 6),
  (2, "cat9", 2), (2, "cat10", 1), (3, "cat1", 5), (3, "cat7", 16),
  (3, "cat8", 2))).toDF("userID", "category", "frequency")

// Create a sorted array of categories
val categories = df
  .select($"category")
  .distinct.map(_.getString(0))
  .collect
  .sorted

// Prepare vector assemble
val assembler =  new VectorAssembler()
  .setInputCols(categories)
  .setOutputCol("features")

// Aggregation expressions
val exprs = categories.map(
   c => sum(when($"category" === c, $"frequency").otherwise(lit(0))).alias(c))

val transformed = assembler.transform(
    df.groupBy($"userID").agg(exprs.head, exprs.tail: _*))
  .select($"userID", $"features")

transformed.show
+------+--------------------+
|userID|            features|
+------+--------------------+
|     1|(7,[0,2,6],[1.0,3...|
|     3|(7,[0,4,5],[5.0,1...|
|     2|(7,[1,3,6],[1.0,6...|
+------+--------------------+

尝试按照这篇文章将 df 转换为 IndexedRowMatrix 并在如何正确映射 rdd 时遇到 scala 语法问题

Calculate Cosine Similarity Spark Dataframe

import org.apache.spark.sql.Row

val irm = new IndexedRowMatrix(transformed.rdd.map {
  Row(_, v: org.apache.spark.ml.linalg.Vector) => 
    org.apache.spark.mllib.linalg.Vectors.fromML(v)
}.zipWithIndex.map { case (v, i) => IndexedRow(i, v) })



<console>:5: error: not a legal formal parameter.
Note: Tuples cannot be directly destructured in method or function parameters.
      Either create a single parameter accepting the Tuple1,
      or consider a pattern matching anonymous function: `{ case (param1, param1) => ... }
  Row(_, v: org.apache.spark.ml.linalg.Vector) =>
     ^

谢谢!

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    用 RowMatrix 试试这个:

    def convertDataFrameToRowMatrix(df:DataFrame):RowMatrix = {
        val rows = df.count()
        val cols = df.columns.length
        val rdd:RDD[org.apache.spark.mllib.linalg.Vector] = df.rdd.map(
        row => org.apache.spark.mllib.linalg.Vectors.dense(row.getAs[Seq[Double]](1).toArray)
        val row = new IndexedRowMatrix(rdd,rows,cols)
        row
     }
    

    还有 IndexedRowMatrix :

    def convertDataFrameToIndexedMatrix(df:DataFrame):IndexedRowMatrix = {
        val rows:Long = df.count()
        val cols = df.columns.length
        val rdd = df.rdd.map(
        row => IndexedRow(rows, org.apache.spark.mllib.linalg.Vectors.dense(row.getAs[Seq[Double]](1).toArray)))
        val row = new IndexedRowMatrix(rdd,rows,cols)
        row
     }
    

    如果您想将 IndexedRowMatrix 或 RowMatrix 转换为 RDD,这很简单:

    def convertIndexedRowMatrixToRDD(irm:IndexedRowMatrix):RDD[IndexedRow]=irm.rows
    
    def convertRowMatrixToRDD(rm:RowMatrix):RDD[org.apache.spark.mllib.linalg.Vector] =rm.rows
    

    如果要将其转换为 DataFrame,请检查this link

    作为运行函数的例子:

    val si = Seq((1,2), (3,4)) 
    val myrdd:RDD[IndexedRow] = sc.parallelize(si).map(x => new IndexedRow(x._1.asInstanceOf[Long] ,Vectors.dense(x._1, x._2)))
    val irm:IndexedRowMatrix = new IndexedRowMatrix(myrdd)
    val r = convertIndexedRowMatrixToRDD(sc,irm)
    val t = r.foreach(println)
    

    输出:

    IndexedRow(3,[3.0,4.0])
    IndexedRow(1,[1.0,2.0])
    

    【讨论】:

    • 非常感谢...您知道如何将其转换为rdddataframe 吗?
    • 我更新了答案。我想您的意思是要将 RowMatrix 或 IndexedRowMatrix 转换为 RDD 或 DataFrame。
    • 谢谢...越来越近了...但是当转换为 RDD val rdd = convertIndexedRowMatrixToRDD(irm) rdd.take(1) 时会发生此错误java.lang.ClassCastException: java.lang.String cannot be cast to scala.collection.TraversableOnce ...认为需要调整 irm 到 rdd 函数
    • 不,我认为应该调整输入。我会更新答案。
    • 您使用哪个 Scala 和 Spark 版本?
    猜你喜欢
    • 2015-07-17
    • 1970-01-01
    • 2021-08-20
    • 2012-07-09
    • 2017-06-13
    • 2016-09-08
    • 2020-10-28
    • 2014-03-25
    • 2016-10-22
    相关资源
    最近更新 更多