【问题标题】:How to access element of a VectorUDT column in a Spark DataFrame?如何访问 Spark DataFrame 中 VectorUDT 列的元素?
【发布时间】:2017-01-26 02:55:14
【问题描述】:

我有一个数据框 df,其中有一个名为 featuresVectorUDT 列。如何获取列的元素,比如第一个元素?

我尝试过以下操作

from pyspark.sql.functions import udf
first_elem_udf = udf(lambda row: row.values[0])
df.select(first_elem_udf(df.features)).show()

但我收到 net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict(for numpy.dtype) 错误。如果我改为使用first_elem_udf = first_elem_udf(lambda row: row.toArray()[0]),则会出现同样的错误。

我也尝试了explode(),但我收到错误,因为它需要数组或映射类型。

我认为这应该是一种常见的操作。

【问题讨论】:

    标签: apache-spark dataframe pyspark apache-spark-sql apache-spark-ml


    【解决方案1】:

    将输出转换为float:

    from pyspark.sql.types import DoubleType
    from pyspark.sql.functions import lit, udf
    
    def ith_(v, i):
        try:
            return float(v[i])
        except ValueError:
            return None
    
    ith = udf(ith_, DoubleType())
    

    示例用法:

    from pyspark.ml.linalg import Vectors
    
    df = sc.parallelize([
        (1, Vectors.dense([1, 2, 3])),
        (2, Vectors.sparse(3, [1], [9]))
    ]).toDF(["id", "features"])
    
    df.select(ith("features", lit(1))).show()
    
    ## +-----------------+
    ## |ith_(features, 1)|
    ## +-----------------+
    ## |              2.0|
    ## |              9.0|
    ## +-----------------+
    

    解释:

    必须将输出值重新序列化为等效的 Java 对象。如果你想访问values(注意SparseVectors)你应该使用item方法:

    v.values.item(0)
    

    返回标准 Python 标量。同样,如果您想以密集结构访问所有值:

    v.toArray().tolist()
    

    【讨论】:

    • 我收到Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)。有什么线索吗?
    【解决方案2】:

    如果您更喜欢使用 spark.sql,您可以使用以下自定义函数 'to_array' 将向量转换为数组。然后你可以把它当作一个数组来操作。

     from pyspark.sql.types import ArrayType, DoubleType
     def to_array_(v):
            return v.toArray().tolist()
     from pyspark.sql import SQLContext
     sqlContext=SQLContext(spark.sparkContext, sparkSession=spark, jsqlContext=None) 
     sqlContext.udf.register("to_array",to_array_,  ArrayType(DoubleType()))
    

    示例

        from pyspark.ml.linalg import Vectors
        
        df = sc.parallelize([
            (1, Vectors.dense([1, 2, 3])),
            (2, Vectors.sparse(3, [1], [9]))
        ]).toDF(["id", "features"])
        
        df.createOrReplaceTempView("tb")
        
        spark.sql("""select * , to_array(features)[1] Second from  tb   """).toPandas()
    

    输出

        id  features    Second
    0   1   [1.0, 2.0, 3.0] 2.0
    1   2   (0.0, 9.0, 0.0) 9.0
    

    【讨论】:

      【解决方案3】:

      我遇到了同样的问题,无法使用explode()。您可以做的一件事是使用 pyspark.ml.feature 库中的 VectorSlice。像这样:

      from pyspark.ml.feature import VectorSlicer
      from pyspark.ml.linalg import Vectors
      from pyspark.sql.types import Row
      
      slicer = VectorSlicer(inputCol="features", outputCol="features_one", indices=[0])
      
      output = slicer.transform(df)
      
      output.select("features", "features_one").show()
      

      【讨论】:

      • 我最喜欢这个解决方案,但它仍然导致“features_one”列是一个单元素列表。
      • 我也有同样的问题。有什么快速的方法可以提取出 1 元素吗?我们也可以编写管道来“分解”向量中的多个元素吗?
      【解决方案4】:

      适用于任何尝试将 PySpark ML 模型训练后生成的概率列拆分为可用列的人。这不使用 UDF 或 numpy。这仅适用于二进制分类。这里 lr_pred 是具有逻辑回归模型预测的数据帧。

      prob_df1=lr_pred.withColumn("概率",lr_pred["概率"].cast("字符串"))

      prob_df =prob_df1.withColumn('probabilityre',split(regexp_replace("probability", "^[|]", ""), ",")[1].cast(DoubleType()))

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2016-12-05
        • 2016-10-31
        • 2020-08-26
        • 1970-01-01
        • 1970-01-01
        • 2021-08-29
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多