【问题标题】:Get value from an array in a map based on a key in Scala根据Scala中的键从映射中的数组中获取值
【发布时间】:2018-08-21 06:02:50
【问题描述】:

我有一个具有以下架构的数据框:

 |-- A: map (nullable = true)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- type: string (nullable = true)
 |    |    |    |-- index: boolean (nullable = false)
 |-- idkey: string (nullable = true)

由于map中的值是数组类型,所以我需要提取“外”键字段idkey中的id对应的字段索引。

例如,我有以下数据:

 {"A":{
 "innerkey_1":[{"id":"1","type":"0.01","index":true},
               {"id":"6","type":"4.3","index":false}]},
 "1"}

由于idkey为1,我们需要输出"id":1所在元素对应的index值,即index应该等于true。我真的不确定如何使用 UDF 或其他方式完成此操作。

预期输出为:

+---------+
| indexout|
+---------+
|   true  |
+---------+

【问题讨论】:

  • 你能澄清一下i.e. the index should be equal to 0 吗?你也可以分享你的预期输出吗
  • 那么 1 怎么可能是一个布尔值呢?并且类型 struct 似乎是 double 而不是字符串。 ??
  • 我已修正错别字,谢谢指出。
  • index false 的 id 为 6 。他们不将 idkey 与 id 匹配。匹配索引应该为真。
  • 这些Since the idkey is 1, we need to to output the value of index corresponding to the element where "id":1, i.e. the index should be equal to false不是互相矛盾吗?

标签: scala apache-spark apache-spark-sql user-defined-functions


【解决方案1】:

如果您的 dataframe 有关注 schema

root
 |-- A: map (nullable = true)
 |    |-- key: string
 |    |-- value: array (valueContainsNull = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- types: string (nullable = true)
 |    |    |    |-- index: boolean (nullable = false)
 |-- idkey: string (nullable = true)

然后你可以use two explode function,一个用于map,另一个用于内部array,使用filter过滤匹配,最后select索引为

import org.apache.spark.sql.functions._
df.select(col("idkey"), explode(col("A")))
  .select(col("idkey"), explode(col("value")).as("value"))
  .filter(col("idkey") === col("value.id"))
  .select(col("value.index").as("indexout"))

你应该得到

+--------+
|indexout|
+--------+
|true    |
+--------+

使用 udf 函数

您可以通过使用udf 函数来完成上述操作,这也可以避免两个explodefilter所有的分解和过滤都是在 udf 函数本身中完成的。您可以根据自己的需要进行修改。

import org.apache.spark.sql.functions._
def indexoutUdf = udf((a: Map[String, Seq[Row]], idkey: String) => {
  a.map(x => x._2.filter(y => y.getAs[String](0) == idkey).map(y => y.getAs[Boolean](2))).toList(0).head
})
df.select(indexoutUdf(col("A"), col("idkey")).as("indexout")).show(false)

希望回答对你有帮助

【讨论】:

  • 除了使用explode之外还有其他方法吗?我考虑过,但对于大型数据帧来说太贵了。
  • @PramodKumar,我已经更新了答案 :) 我希望这次的答案会被赞成和接受 ;)
猜你喜欢
  • 1970-01-01
  • 2018-09-25
  • 1970-01-01
  • 2020-03-18
  • 2017-02-04
  • 2020-11-12
  • 2020-04-06
  • 2018-09-22
  • 1970-01-01
相关资源
最近更新 更多