【问题标题】:How to get keys and values from MapType column in SparkSQL DataFrame如何从 SparkSQL DataFrame 中的 MapType 列获取键和值
【发布时间】:2017-03-28 21:48:18
【问题描述】:

我的 parquet 文件中有两个字段:object_id: Stringalpha: Map<>

它在 sparkSQL 中被读入数据框,其架构如下所示:

scala> alphaDF.printSchema()
root
 |-- object_id: string (nullable = true)
 |-- ALPHA: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)

我正在使用 Spark 2.0,我正在尝试创建一个新的数据框,其中的列需要是 object_id 加上 ALPHA 映射的键,如 object_id, key1, key2, key2, ...

我首先想看看我是否至少可以像这样访问地图:

scala> alphaDF.map(a => a(0)).collect()
<console>:32: error: Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are 
supported by importing spark.implicits._  Support for serializing other
types will be added in future releases.
   alphaDF.map(a => a(0)).collect()

但不幸的是,我似乎无法弄清楚如何访问地图的键。

有人可以告诉我一种方法来获取 object_id 加上映射键作为列名和映射值作为新数据框中的相应值吗?

【问题讨论】:

标签: scala apache-spark dataframe apache-spark-sql apache-spark-dataset


【解决方案1】:

火花 >= 2.3

您可以使用map_keys 函数简化流程:

import org.apache.spark.sql.functions.map_keys

还有map_values函数,不过这里不会直接用。

火花

一般方法可以用几个步骤来表示。首先需要导入:

import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.Row

和示例数据:

val ds = Seq(
  (1, Map("foo" -> (1, "a"), "bar" -> (2, "b"))),
  (2, Map("foo" -> (3, "c"))),
  (3, Map("bar" -> (4, "d")))
).toDF("id", "alpha")

要提取密钥,我们可以使用 UDF (Spark

val map_keys = udf[Seq[String], Map[String, Row]](_.keys.toSeq)

或内置函数

import org.apache.spark.sql.functions.map_keys

val keysDF = df.select(map_keys($"alpha"))

找出不同的:

val distinctKeys = keysDF.as[Seq[String]].flatMap(identity).distinct
  .collect.sorted

你也可以用explode概括keys提取:

import org.apache.spark.sql.functions.explode

val distinctKeys = df
  // Flatten the column into key, value columns
 .select(explode($"alpha"))
 .select($"key")
 .as[String].distinct
 .collect.sorted

还有select:

ds.select($"id" +: distinctKeys.map(x => $"alpha".getItem(x).alias(x)): _*)

【讨论】:

  • 以及如何在 PySpaek 中实现这一点?
【解决方案2】:

如果你在 PySpark 中,我只是找到一个简单的实现:

from pyspark.sql.functions import map_keys

alphaDF.select(map_keys("ALPHA").alias("keys")).show()

您可以在here查看详情

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-12-09
    • 2021-08-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-09-07
    • 1970-01-01
    • 2013-04-21
    相关资源
    最近更新 更多