【问题标题】:Java/Spark: how to find key with max value in a col with array of struct of mapJava/Spark:如何在具有映射结构数组的列中找到具有最大值的键
【发布时间】:2020-08-10 21:21:46
【问题描述】:

我有一个数据框,我想在地图中获取具有最大值的键。

数据框创建:

Dataset<Row> data = spark.read()
                .option("header", "true")
                .option("inferSchema", "true")
                .csv("/home/path/to/file/verify.csv");
//loading Spark ML model
PipelineModel gloveModel = PipelineModel.load("models/gloveModel");
Dataset<Row> df = gloveModel.transform(data);

df.printSchema();

 |-- id: integer (nullable = true)
 |-- description: string (nullable = true)
 |-- class: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- result: string (nullable = true)     
 |    |    |-- metadata: map (nullable = true)      
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)

//映射条目的字段如下:

df.select("class.metadata").show(10,50);

+-----------------------------------------------------------------------------------------------------------------+
|                                                                                                         metadata|
+-----------------------------------------------------------------------------------------------------------------+
|  [[Sports -> 3.2911853E-9, Business -> 5.1852658E-6, World -> 3.96135E-9, Sci/Tech -> 0.9999949, sentence -> 0]]|
|      [[Sports -> 1.9902605E-10, Business -> 1.0305631E-8, World -> 1.0, Sci/Tech -> 3.543277E-9, sentence -> 0]]|
|    [[Sports -> 1.0, Business -> 8.1944885E-12, World -> 4.554111E-13, Sci/Tech -> 1.7239962E-12, sentence -> 0]]|
+-----------------------------------------------------------------------------------------------------------------+

我想达到以下结果(每行地图中的最高值):

+--------------+
|    prediction|
+--------------+
|      Sci/Tech|
|         World|
|        Sports|
+--------------+

我试过了:

df.select(map_values(col("class.metadata"))).show(10, 50); 但最终出现错误:

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'map_values(`class`.`metadata`)' due to data type mismatch: argument 1 requires map type, however, '`class`.`metadata`' is of array<map<string,string>> type.;;
'Project [map_values(class#95.metadata) AS map_values(class.metadata)#106]...

df.select(flatten(col("class"))).show(); 错误:

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'flatten(`class`)' due to data type mismatch: The argument should be an array of arrays, but '`class`' is of array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>> type.;;
'Project [flatten(class#95) AS flatten(class)#106]

我的 Spark SQL 版本是 2.4.0(不推荐使用 explode 功能)

非常感谢任何建议/建议!谢谢!

【问题讨论】:

  • 你能发布数据框创建代码吗?
  • 是的,根据我的最新编辑添加
  • 另外,请从您的 csv 文件中发布相同的记录。
  • explode 函数已弃用,您可以使用 org.apache.spark.sql.functions.explode 方法取而代之。请检查以下我使用过的答案。
  • 嗨@Sandesh Mendon,你检查我的答案了吗?如果它适合你,请接受+投票。

标签: java dataframe apache-spark-sql aggregation apache-spark-mllib


【解决方案1】:

我就是这样解决的-

 val rdd = sc.parallelize(Seq(
      Row(Seq(Map("Sports" -> "3.291185", "Business" -> "5.18526",
      "World" -> "3.9613", "Sci/Tech" -> "0.9999949", "sentence" -> "0"))),
      Row( Seq( Map("Sports" -> "1.9902605", "Business" -> "1.030563", "World" -> "1.0",
        "Sci/Tech" -> "3.54327", "sentence" -> "0")),
      Row(Seq( Map("Sports" -> "1.0", "Business" -> "8.194488", "World" -> "4.5541",
        "Sci/Tech" -> "1.723996", "sentence" -> "0")))
    )))
   val df = sqlContext.createDataFrame(rdd,
     StructType(Array(StructField("metadata", ArrayType(MapType(StringType, StringType))))))

    df.show(false)
    df.printSchema()
    df.selectExpr()

    +--------------------------------------------------------------------------------------------------+
    |metadata                                                                                          |
    +--------------------------------------------------------------------------------------------------+
    |[[Sports -> 3.291185, Business -> 5.18526, World -> 3.9613, Sci/Tech -> 0.9999949, sentence -> 0]]|
    |[[Sports -> 1.9902605, Business -> 1.030563, World -> 1.0, Sci/Tech -> 3.54327, sentence -> 0]]   |
    +--------------------------------------------------------------------------------------------------+

    root
    |-- metadata: array (nullable = true)
    |    |-- element: map (containsNull = true)
    |    |    |-- key: string
    |    |    |-- value: string (valueContainsNull = true)


请试试这个-

 val keyWithMaxVal = udf((am: Map[String, String]) =>
      am.mapValues(_.toDouble).toSeq.maxBy(_._2)._1,
      StringType
    )
    df.select(explode(col("metadata")).as("exploded_meta"))
      .select(keyWithMaxVal(col("exploded_meta")).as("prediction"))
      .show(false)
    +----------+
    |prediction|
    +----------+
    |Business  |
    |Sci/Tech  |
    +----------+

【讨论】:

    【解决方案2】:

    class.metadata 是 Map 类型的 Array 类型。但是map_values函数只接受Map类型。

    使用explode 从数据数组中提取地图,然后将该地图数据传递给 map_values 函数。请在下方查看。

    import org.apache.spark.sql.functions.explode
    
    df.select(explode($"class.metadata").as("metadata")).select(map_values($"metadata")).show(false)
    
    

    【讨论】:

    • 正如我提到的,我的最终目标是在地图值中找到最大值,你能帮我搞定吗?
    • 列“class.metadata”是由 Spark ML 方法生成的数据帧的一部分,称为“.transform()” - 所以给我的输入数据集没有意义
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-10-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-06-20
    • 2020-01-27
    相关资源
    最近更新 更多