【问题标题】:Iterating over for an Array Column with dynamic size in Spark Scala Dataframe在 Spark Scala Dataframe 中迭代具有动态大小的数组列
【发布时间】:2020-08-03 21:37:46
【问题描述】:

我对这种方法很熟悉——例如How to obtain the average of an array-type column in scala-spark over all row entries per entry? 中的一个例子

val array_size = 3
val avgAgg = for (i <- 0 to array_size -1) yield avg($"value".getItem(i))
df.select(array(avgAgg: _*).alias("avg_value")).show(false)

然而,3 在现实中是硬编码的。

无论我多么努力地尝试不使用 UDF,我都无法根据数据框中已存在的数组列的大小动态地执行此类操作。例如:

...
val z =  for (i <- 1 to size($"sortedCol")   ) yield array (element_at($"sortedCol._2", i), element_at($"sortedCol._3", i) )
...
...
.withColumn("Z", array(z: _*)  )

我正在研究如何通过将现有数组 col 应用于长度可变的数组来完成此操作。转换,expr?不确定。

根据要求提供完整代码:

import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

case class abc(year: Int, month: Int, item: String, quantity: Int)

val df0 = Seq(abc(2019, 1, "TV", 8), 
              abc(2019, 7, "AC", 10),  
              abc(2018, 1, "TV", 2),  
              abc(2018, 2, "AC", 3), 
              abc(2019, 2, "CO", 10)).toDS()

val df1 = df0.toDF()
// Gen some data, can be done easier, but not the point.

val itemsList= collect_list(struct("month", "item", "quantity"))

// This nn works.
val nn = 3
val z =  for (i <- 1 to nn) yield array (element_at($"sortedCol.item", i), element_at($"sortedCol.quantity", i) )
// But want this.
//val z =  for (i <- 1 to size($"sortedCol")   ) yield array (element_at($"sortedCol.item", i), element_at($"sortedCol.quantity", i) )


val df2 = df1.groupBy($"year")
   .agg(itemsList as "items")
   .withColumn("sortedCol", sort_array($"items", asc = true))  
   .withColumn("S", size($"sortedCol")) // cannot use this either
   .withColumn("Z", array(z: _*)  )
   .drop("items")
   .orderBy($"year".desc)
df2.show(false)
// Col Z is the output I want, but not the null value Array 

UPD

In apache spark SQL, how to remove the duplicate rows when using collect_list in window function? 那里我用一个非常简单的 UDF 来解决,但我正在寻找一种没有 UDF 的方法,特别是 for loop 中的 to value 的动态设置。答案证明某些构造是不可能的——这是对验证的排序。

【问题讨论】:

  • @blackbishop 这里有什么想法吗?我错过了什么
  • @LeoC 对您的见解感兴趣
  • 如果所有数组的大小都相同,那么您可以先这样获取它:val array_size = df.select(size($"sortedCol")).first.getInt(0)。其余代码保持不变。
  • @blackbishop 但这正是重点,他们没有。我注意到了您的解决方案,并且我在过去应用了这个方面,或者采用了高默认值并删除了生成的空值。但我想知道为什么存在这个问题。似乎是一个很常见的“足够”用例。
  • 好的,我现在明白了。您能否在问题中添加一个可复制的示例?

标签: arrays scala dataframe apache-spark


【解决方案1】:

如果我正确理解您的需求,您可以像这样简单地使用transform 函数:

val df2 = df1.groupBy($"year")
             .agg(itemsList as "items")
             .withColumn("sortedCol", sort_array($"items", asc = true))


val transform_expr = "transform(sortedCol, x -> array(x.item, x.quantity))"

df2.withColumn("Z", expr(transform_expr)).show(false)

//+----+--------------------------------------+--------------------------------------+-----------------------------+
//|year|items                                 |sortedCol                             |Z                            |
//+----+--------------------------------------+--------------------------------------+-----------------------------+
//|2018|[[1, TV, 2], [2, AC, 3]]              |[[1, TV, 2], [2, AC, 3]]              |[[TV, 2], [AC, 3]]           |
//|2019|[[1, TV, 8], [7, AC, 10], [2, CO, 10]]|[[1, TV, 8], [2, CO, 10], [7, AC, 10]]|[[TV, 8], [CO, 10], [AC, 10]]|
//+----+--------------------------------------+--------------------------------------+-----------------------------+

【讨论】:

  • 好的,我使用 UDF 来实现这一点,而不是使用本质上相同的转换。但我的问题的核心是我们是否可以有类似 val z = for (i
  • 答案已分配,但我正在以特定方式寻找上述观点。也许是因为有替代品,所以这是一条红鲱鱼。
  • @thebluephantom 当您执行z = for (i &lt;- 1 to size($"sortedCol")) 时,您尝试使用 Scala 进行理解,但您在枚举器中给出了 Spark Column。这样做是不可能的......因为 Column 是在 Spark DF 下评估的,而不是在 Scala 下评估的。
  • 明白了。我想我意识到了这一点,但不想接受它。它与无法在 SCALA 之外引用的序列化和 UDF 的错误解释概念密切相关。感谢那。你会在积分上像火箭一样前进!
猜你喜欢
  • 2017-11-27
  • 2013-04-29
  • 1970-01-01
  • 2018-01-02
  • 1970-01-01
  • 2020-06-03
  • 2021-08-30
  • 1970-01-01
  • 2014-05-25
相关资源
最近更新 更多