【问题标题】:Sparse Vector pyspark稀疏向量 pyspark
【发布时间】:2017-05-05 16:18:56
【问题描述】:

我想找到一种使用数据帧在 PySpark 中创建备用向量的有效方法。

假设给定事务输入:

df = spark.createDataFrame([
    (0, "a"),
    (1, "a"),
    (1, "b"),
    (1, "c"),
    (2, "a"),
    (2, "b"),
    (2, "b"),
    (2, "b"),
    (2, "c"),
    (0, "a"),
    (1, "b"),
    (1, "b"),
    (2, "cc"),
    (3, "a"),
    (4, "a"),
    (5, "c")
], ["id", "category"])
+---+--------+
| id|category|
+---+--------+
|  0|       a|
|  1|       a|
|  1|       b|
|  1|       c|
|  2|       a|
|  2|       b|
|  2|       b|
|  2|       b|
|  2|       c|
|  0|       a|
|  1|       b|
|  1|       b|
|  2|      cc|
|  3|       a|
|  4|       a|
|  5|       c|
+---+--------+

总结格式:

df.groupBy(df["id"],df["category"]).count().show()
+---+--------+-----+
| id|category|count|
+---+--------+-----+
|  1|       b|    3|
|  1|       a|    1|
|  1|       c|    1|
|  2|      cc|    1|
|  2|       c|    1|
|  2|       a|    1|
|  1|       a|    1|
|  0|       a|    2|
+---+--------+-----+

我的目标是通过 id 得到这个输出:

+---+-----------------------------------------------+
| id|                                       feature |
+---+-----------------------------------------------+
|  2|SparseVector({a: 1.0, b: 3.0, c: 1.0, cc: 1.0})|

你能指出我正确的方向吗?使用 Java 中的 mapreduce 对我来说似乎更容易。

【问题讨论】:

    标签: python apache-spark pyspark sparse-matrix


    【解决方案1】:

    这可以通过pivotVectorAssembler 轻松完成。将聚合替换为pivot

     pivoted = df.groupBy("id").pivot("category").count().na.fill(0)
    

    组装:

    from pyspark.ml.feature import VectorAssembler
    
    input_cols = [x for x in pivoted.columns if x != id]
    
    result = (VectorAssembler(inputCols=input_cols, outputCol="features")
        .transform(pivoted)
        .select("id", "features"))
    

    结果如下。这将根据稀疏性选择更有效的表示:

    +---+---------------------+
    |id |features             |
    +---+---------------------+
    |0  |(5,[1],[2.0])        |
    |5  |(5,[0,3],[5.0,1.0])  |
    |1  |[1.0,1.0,3.0,1.0,0.0]|
    |3  |(5,[0,1],[3.0,1.0])  |
    |2  |[2.0,1.0,3.0,1.0,1.0]|
    |4  |(5,[0,1],[4.0,1.0])  |
    +---+---------------------+
    

    当然您仍然可以将其转换为单个表示:

    from pyspark.ml.linalg import SparseVector, VectorUDT
    import numpy as np
    
    def to_sparse(c):
        def to_sparse_(v):
            if isinstance(v, SparseVector):
                return v
            vs = v.toArray()
            nonzero = np.nonzero(vs)[0]
            return SparseVector(v.size, nonzero, vs[nonzero])
        return udf(to_sparse_, VectorUDT())(c)
    
    +---+-------------------------------------+
    |id |features                             |
    +---+-------------------------------------+
    |0  |(5,[1],[2.0])                        |
    |5  |(5,[0,3],[5.0,1.0])                  |
    |1  |(5,[0,1,2,3],[1.0,1.0,3.0,1.0])      |
    |3  |(5,[0,1],[3.0,1.0])                  |
    |2  |(5,[0,1,2,3,4],[2.0,1.0,3.0,1.0,1.0])|
    |4  |(5,[0,1],[4.0,1.0])                  |
    +---+-------------------------------------+
    

    【讨论】:

      【解决方案2】:

      如果您将数据框转换为 RDD,则可以遵循类似 mapreduce 的框架 reduceByKey。这里唯一真正棘手的部分是格式化 spark 的日期 sparseVector

      导入包,创建数据

      from pyspark.ml.feature import StringIndexer
      from pyspark.ml.linalg import Vectors
      df = sqlContext.createDataFrame([
          (0, "a"),
          (1, "a"),
          (1, "b"),
          (1, "c"),
          (2, "a"),
          (2, "b"),
          (2, "b"),
          (2, "b"),
          (2, "c"),
          (0, "a"),
          (1, "b"),
          (1, "b"),
          (2, "cc"),
          (3, "a"),
          (4, "a"),
          (5, "c")
      ], ["id", "category"])
      

      为类别创建数字表示(稀疏向量需要)

      indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
      df = indexer.fit(df).transform(df) 
      

      按索引分组,获取计数

      df = df.groupBy(df["id"],df["categoryIndex"]).count()
      

      转换为rdd,将数据映射到id & [categoryIndex, count]的键值对

      rdd = df.rdd.map(lambda x: (x.id, [(x.categoryIndex, x['count'])]))
      

      通过 key 减少以获得 id 的键值对和该 id 的所有 [categoryIndex, count] 的列表

      rdd = rdd.reduceByKey(lambda a, b: a + b)
      

      映射数据,将每个id的所有[categoryIndex, count]列表转化为稀疏向量

      rdd = rdd.map(lambda x: (x[0], Vectors.sparse(len(x[1]), x[1])))
      

      转换回数据框

      finalDf = sqlContext.createDataFrame(rdd, ['id', 'feature'])
      

      数据检查

      finalDf.take(5)
      
       [Row(id=0, feature=SparseVector(1, {1: 2.0})),
        Row(id=1, feature=SparseVector(3, {0: 3.0, 1: 1.0, 2: 1.0})),
        Row(id=2, feature=SparseVector(4, {0: 3.0, 1: 1.0, 2: 1.0, 3: 1.0})),
        Row(id=3, feature=SparseVector(1, {1: 1.0})),
        Row(id=4, feature=SparseVector(1, {1: 1.0}))]
      

      【讨论】:

        猜你喜欢
        • 2017-03-26
        • 2019-08-03
        • 2018-12-25
        • 2017-05-18
        • 1970-01-01
        • 1970-01-01
        • 2017-05-10
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多