【问题标题】:Scala Spark: Flatten Array of Key/Value structsScala Spark:展平键/值结构数组
【发布时间】:2020-08-24 11:11:51
【问题描述】:

我有一个包含数组类型列的输入数据框。数组中的每个条目都是一个结构,由一个键(大约四个值之一)和一个值组成。我想把它变成一个数据框,每个可能的键有一列,并且该值不在该行的数组中的空值。任何数组中的键都不会重复,但它们可能会乱序或丢失。

到目前为止,我得到的最好的是

val wantedCols =df.columns
  .filter(_ != arrayCol)
  .filter(_ != "col")
val flattened = df
        .select((wantedCols.map(col(_)) ++ Seq(explode(col(arrayCol)))):_*)
        .groupBy(wantedCols.map(col(_)):_*)
        .pivot("col.key")
        .agg(first("col.value"))

这正是我想要的,但它很可怕,我不知道在每一列上分组的后果是什么。这样做的正确方法是什么?

编辑:示例输入/输出:

case class testStruct(name : String, number : String)
val dfExampleInput = Seq(
(0, "KY", Seq(testStruct("A", "45"))),
(1, "OR", Seq(testStruct("A", "30"), testStruct("B", "10"))))
.toDF("index", "state", "entries")
.show

+-----+-----+------------------+
|index|state|           entries|
+-----+-----+------------------+
|    0|   KY|         [[A, 45]]|
|    1|   OR|[[A, 30], [B, 10]]|
+-----+-----+------------------+

val dfExampleOutput = Seq(
  (0, "KY", "45", null),
  (1, "OR", "30", "10"))
  .toDF("index", "state", "A", "B")
  .show

+-----+-----+---+----+
|index|state|  A|   B|
+-----+-----+---+----+
|    0|   KY| 45|null|
|    1|   OR| 30|  10|
+-----+-----+---+----+

进一步编辑:

我自己提交了一个解决方案(见下文),只要您事先知道密钥(在我的情况下我知道),就可以很好地处理这个问题。如果找到密钥是一个问题,另一个答案包含处理该问题的代码。

【问题讨论】:

  • 您可以添加预期的示例输入和输出吗?
  • 将给定的输入和预期的输出添加到您的问题中会非常有帮助,请查看this 帖子了解更多详情
  • @Srinivas 编辑了它。担心它有点不清楚(没有显示我的预期) - “条目”列是一个结构数组。
  • @AlexandrosBiratsis 在中编辑了
  • @Edward 你知道条目数组的大小,它是固定的还是不固定的?此外,如果可能,pivot 是 Spark 中最繁重的操作之一,您应该避免它

标签: scala apache-spark


【解决方案1】:

没有groupBypivotaggfirst

请检查以下代码。

scala> val df = Seq((0, "KY", Seq(("A", "45"))),(1, "OR", Seq(("A", "30"),("B", "10")))).toDF("index", "state", "entries").withColumn("entries",$"entries".cast("array<struct<name:string,number:string>>"))
df: org.apache.spark.sql.DataFrame = [index: int, state: string ... 1 more field]

scala> df.printSchema
root
 |-- index: integer (nullable = false)
 |-- state: string (nullable = true)
 |-- entries: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- number: string (nullable = true)


scala> df.show(false)
+-----+-----+------------------+
|index|state|entries           |
+-----+-----+------------------+
|0    |KY   |[[A, 45]]         |
|1    |OR   |[[A, 30], [B, 10]]|
+-----+-----+------------------+


scala> val finalDFColumns = df.select(explode($"entries").as("entries")).select("entries.*").select("name").distinct.map(_.getAs[String](0)).orderBy($"value".asc).collect.foldLeft(df.limit(0))((cdf,c) => cdf.withColumn(c,lit(null))).columns
finalDFColumns: Array[String] = Array(index, state, entries, A, B)

scala> val finalDF = df.select($"*" +: (0 until max).map(i => $"entries".getItem(i)("number").as(i.toString)): _*)
finalDF: org.apache.spark.sql.DataFrame = [index: int, state: string ... 3 more fields]

scala> finalDF.show(false)
+-----+-----+------------------+---+----+
|index|state|entries           |0  |1   |
+-----+-----+------------------+---+----+
|0    |KY   |[[A, 45]]         |45 |null|
|1    |OR   |[[A, 30], [B, 10]]|30 |10  |
+-----+-----+------------------+---+----+


scala> finalDF.printSchema
root
 |-- index: integer (nullable = false)
 |-- state: string (nullable = true)
 |-- entries: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- number: string (nullable = true)
 |-- 0: string (nullable = true)
 |-- 1: string (nullable = true)

scala> finalDF.columns.zip(finalDFColumns).foldLeft(finalDF)((fdf,column) => fdf.withColumnRenamed(column._1,column._2)).show(false)
+-----+-----+------------------+---+----+
|index|state|entries           |A  |B   |
+-----+-----+------------------+---+----+
|0    |KY   |[[A, 45]]         |45 |null|
|1    |OR   |[[A, 30], [B, 10]]|30 |10  |
+-----+-----+------------------+---+----+



scala>

最终输出


scala> finalDF.columns.zip(finalDFColumns).foldLeft(finalDF)((fdf,column) => fdf.withColumnRenamed(column._1,column._2)).drop($"entries").show(false)
+-----+-----+---+----+
|index|state|A  |B   |
+-----+-----+---+----+
|0    |KY   |45 |null|
|1    |OR   |30 |10  |
+-----+-----+---+----+

【讨论】:

  • 我认为这几乎正是我想要改进的版本? (不过,我从列列表中生成“分组依据”参数。)一件事是我正在使用的数据框有数百列......按所有列分组似乎是错误的。
  • 我已经更新了答案,这次我没有使用任何这些 - groupBy、pivot、agg、第一个函数
  • 嗨 Srinivas,你有这个的 pyspark 版本吗?
【解决方案2】:

除了可能使事情变得混乱之外,我不会担心按几列分组。在这种情况下,如果有更简单、更可维护的方法,那就去吧。如果没有示例输入/输出,我不确定这是否能让你到达你想要去的地方,但也许它会有用:

Seq(Seq("k1" -> "v1", "k2" -> "v2")).toDS() // some basic input based on my understanding of your description
  .select(explode($"value")) // flatten the array
  .select("col.*") // de-nest the struct
  .groupBy("_2") // one row per distinct value
  .pivot("_1") // one column per distinct key
  .count // or agg(first) if you want the value in each column
  .show
+---+----+----+
| _2|  k1|  k2|
+---+----+----+
| v2|null|   1|
| v1|   1|null|
+---+----+----+

根据您现在所说的,我的印象是有许多列,例如“状态”,它们不是聚合所必需的,但需要在最终结果中。

作为参考,如果您不需要旋转,您可以添加一个结构列,其中嵌套了所有此类字段,然后将其添加到您的聚合中,例如:.agg(first($"myStruct"), first($"number"))。主要优点是只有在groubBy 中引用的实际键列。但是当使用枢轴时,事情会变得有点奇怪,所以我们将把这个选项放在一边。

在这个用例中,我能想到的最简单的方法是拆分您的数据框并在聚合后使用一些行键将其重新连接在一起。在此示例中,我假设 "index" 适合该目的:

 val mehCols = dfExampleInput.columns.filter(_ != "entries").map(col)
 val mehDF = dfExampleInput.select(mehCols:_*)
 val aggDF = dfExampleInput
   .select($"index", explode($"entries").as("entry"))
   .select($"index", $"entry.*")
   .groupBy("index")
   .pivot("name")
   .agg(first($"number"))

 scala> mehDF.join(aggDF, Seq("index")).show
 +-----+-----+---+----+
 |index|state|  A|   B|
 +-----+-----+---+----+
 |    0|   KY| 45|null|
 |    1|   OR| 30|  10|
 +-----+-----+---+----+

如果有的话,我怀疑您会看到性能上的很大差异。也许在极端情况下,例如:非常多的meh 列,或者非常多的枢轴列,或者类似的东西,或者可能什么都没有。就个人而言,我会使用适当大小的输入来测试两者,如果没有显着差异,则使用看起来更易于维护的任何一个。

【讨论】:

  • 添加了示例输入/输出。担心它有点不清楚(没有显示我的期望) - “条目”列是一个结构数组。
  • @EdwardPeters,我最初的回答只关心键和值。我添加了一个基于连接的解决方案,现在有一些示例输入/输出可以从中绘制上下文。
  • @EdwardPeters 您可以研究的另一个选项是数据框窗口函数。我还没有真正尝试过它们,但据我了解,它们与 groupBy 非常相似,但除其他差异外,还包括所有原始列。
【解决方案3】:

这是另一种方法,它基于entries 列上没有重复项的假设,即Seq(testStruct("A", "30"), testStruct("A", "70"), testStruct("B", "10")) 将导致错误。下一个解决方案结合了 RDD 和 Dataframe API 来实现:

import org.apache.spark.sql.functions.explode
import org.apache.spark.sql.types.StructType

case class testStruct(name : String, number : String)
val df = Seq(
  (0, "KY", Seq(testStruct("A", "45"))),
  (1, "OR", Seq(testStruct("A", "30"), testStruct("B", "10"))),
  (2, "FL", Seq(testStruct("A", "30"), testStruct("B", "10"), testStruct("C", "20"))),
  (3, "TX", Seq(testStruct("B", "60"), testStruct("A", "19"), testStruct("C", "40")))
)
.toDF("index", "state", "entries")
.cache

// get all possible keys from entries i.e Seq[A, B, C]
val finalCols = df.select(explode($"entries").as("entry"))
                  .select($"entry".getField("name").as("entry_name"))
                  .distinct
                  .collect
                  .map{_.getAs[String]("entry_name")}
                  .sorted // Attention: we need to retain the order of the columns 
                          // 1. when generating row values and
                          // 2. when creating the schema

val rdd = df.rdd.map{ r =>
  // transform the entries array into a map i.e Map(A -> 30, B -> 10)
  val entriesMap = r.getSeq[Row](2).map{r => (r.getString(0), r.getString(1))}.toMap

  // transform finalCols into a map with null value i.e Map(A -> null, B -> null, C -> null)
  val finalColsMap = finalCols.map{c => (c, null)}.toMap

  // replace null values with those that are present from the current row by merging the two previous maps
  // Attention: this should retain the order of finalColsMap
  val merged = finalColsMap ++ entriesMap

  // concatenate the two first row values ["index", "state"] with the values from merged
  val finalValues = Seq(r(0), r(1)) ++ merged.values

  Row.fromSeq(finalValues)
}

val extraCols = finalCols.map{c => s"`${c}` STRING"}
val schema = StructType.fromDDL("`index` INT, `state` STRING," + extraCols.mkString(","))

val finalDf = spark.createDataFrame(rdd, schema)

finalDf.show
// +-----+-----+---+----+----+
// |index|state|  A|   B|   C|
// +-----+-----+---+----+----+
// |    0|   KY| 45|null|null|
// |    1|   OR| 30|  10|null|
// |    2|   FL| 30|  10|  20|
// |    3|   TX| 19|  60|  40|
// +-----+-----+---+----+----+

注意:该解决方案需要一个额外的操作来检索唯一键,尽管它不会导致任何洗牌,因为它仅基于窄转换。

【讨论】:

  • 我自己添加了一个解决方案 - 你介意看一下吗?在我选择一个作为答案之前,我希望得到您的反馈。
  • 确定@Edward 我稍后会检查它
【解决方案4】:

我自己想出了一个解决方案:

def extractFromArray(colName : String, key : String, numKeys : Int, keyName : String) = {
  val indexCols = (0 to numKeys-1).map(col(colName).getItem(_))
  indexCols.foldLeft(lit(null))((innerCol : Column, indexCol : Column) =>
      when(indexCol.isNotNull && (indexCol.getItem(keyName) === key), indexCol)
      .otherwise(innerCol))
}

例子:

case class testStruct(name : String, number : String)
val df = Seq(
  (0, "KY", Seq(testStruct("A", "45"))),
  (1, "OR", Seq(testStruct("A", "30"), testStruct("B", "10"))),
  (2, "FL", Seq(testStruct("A", "30"), testStruct("B", "10"), testStruct("C", "20"))),
  (3, "TX", Seq(testStruct("B", "60"), testStruct("A", "19"), testStruct("C", "40")))
)
.toDF("index", "state", "entries")
.withColumn("A", extractFromArray("entries", "B", 3, "name"))
.show

产生:

+-----+-----+--------------------+-------+
|index|state|             entries|      A|
+-----+-----+--------------------+-------+
|    0|   KY|           [[A, 45]]|   null|
|    1|   OR|  [[A, 30], [B, 10]]|[B, 10]|
|    2|   FL|[[A, 30], [B, 10]...|[B, 10]|
|    3|   TX|[[B, 60], [A, 19]...|[B, 60]|
+-----+-----+--------------------+-------+

此解决方案与其他答案略有不同:

  • 一次只能使用一个键
  • 需要事先知道键名和键数
  • 它生成一列结构,而不是执行提取特定值的额外步骤
  • 它可以作为简单的列到列操作,而不需要对整个 DF 进行转换
  • 可以懒惰地评估

前三个问题可以通过调用代码来处理,对于您已经知道键或结构包含要提取的附加值的情况,它会更加灵活。

【讨论】:

  • 嗨@Edward 我检查了你的解决方案,确实正如你提到的那样,它可以在一个键上工作。尽管这可能是最困难的部分,也就是动态有效地展平,因此我认为您的解决方案可以在特定条件下和给定键下工作。虽然它不能从整体上解决问题,即如何为每个键执行此操作?您将再次面临发现密钥的相同问题。
猜你喜欢
  • 2021-03-05
  • 1970-01-01
  • 1970-01-01
  • 2017-08-25
  • 1970-01-01
  • 2019-10-31
  • 1970-01-01
  • 2016-12-09
  • 2020-05-16
相关资源
最近更新 更多