【问题标题】:Spark (Scala) filter array of structs without explodeSpark(Scala)过滤结构数组而不爆炸
【发布时间】:2019-07-24 01:59:01
【问题描述】:

我有一个带有键的数据框和一个带有数据框列中的结构数组的列。每行包含一列,如下所示:

[
    {"id" : 1, "someProperty" : "xxx", "someOtherProperty" : "1", "propertyToFilterOn" : 1},
    {"id" : 2, "someProperty" : "yyy", "someOtherProperty" : "223", "propertyToFilterOn" : 0},
    {"id" : 3, "someProperty" : "zzz", "someOtherProperty" : "345", "propertyToFilterOn" : 1}
]

现在我想做两件事:

  1. 过滤“propertyToFilterOn”=1
  2. 对其他应用一些逻辑 属性 - 例如连接

所以结果是:

[
{"id" : 1, "newProperty" : "xxx_1"},
{"id" : 3, "newProperty" : "zzz_345"}
]

我知道如何使用 explode 来实现,但是在将它重新组合在一起时,explode 还需要 key 上的 groupBy。但由于这是一个流数据帧,我还必须在其上添加水印,这是我试图避免的。

有没有其他方法可以在不使用爆炸的情况下实现这一目标?我确信有一些 Scala 魔法可以实现这一点!

谢谢!

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    在 spark 2.4+ 中出现了许多用于数组的高阶函数。 (见https://docs.databricks.com/spark/2.x/spark-sql/language-manual/functions.html

    val dataframe = Seq(
    ("a", 1, "xxx", "1", 1),
    ("a", 2, "yyy", "223", 0),
    ("a", 3, "zzz", "345", 1)
    ).toDF( "grouping_key", "id" , "someProperty" , "someOtherProperty", "propertyToFilterOn" )
    .groupBy("grouping_key")
    .agg(collect_list(struct("id" , "someProperty" , "someOtherProperty", "propertyToFilterOn")).as("your_array"))
    
    dataframe.select("your_array").show(false)
    
    +----------------------------------------------------+
    |your_array                                          |
    +----------------------------------------------------+
    |[[1, xxx, 1, 1], [2, yyy, 223, 0], [3, zzz, 345, 1]]|
    +----------------------------------------------------+
    

    您可以使用数组过滤器高阶函数过滤数组中的元素,如下所示:

    val filteredDataframe = dataframe.select(expr("filter(your_array, your_struct -> your_struct.propertyToFilterOn == 1)").as("filtered_arrays"))
    
    filteredDataframe.show(false)
    
    +----------------------------------+
    |filtered_arrays                   |
    +----------------------------------+
    |[[1, xxx, 1, 1], [3, zzz, 345, 1]]|
    +----------------------------------+
    

    对于你所说的“其他逻辑”,你应该能够像这样使用转换高阶数组函数:

    val tranformedDataframe = filteredDataframe
    .select(expr("transform(filtered_arrays, your_struct -> struct(concat(your_struct.someProperty, '_', your_struct.someOtherProperty))"))
    

    但如本文所述,从转换函数返回结构存在问题:

    http://mail-archives.apache.org/mod_mbox/spark-user/201811.mbox/%3CCALZs8eBgWqntiPGU8N=ENW2Qvu8XJMhnViKy-225ktW+_c0czA@mail.gmail.com%3E

    所以您最好使用数据集 api 进行转换,如下所示:

    case class YourStruct(id:String, someProperty: String, someOtherProperty: String)
    case class YourArray(filtered_arrays: Seq[YourStruct])
    
    case class YourNewStruct(id:String, newProperty: String)
    
    val transformedDataset = filteredDataframe.as[YourArray].map(_.filtered_arrays.map(ys => YourNewStruct(ys.id, ys.someProperty + "_" + ys.someOtherProperty)))
    
    val transformedDataset.show(false)
    
    +--------------------------+
    |value                     |
    +--------------------------+
    |[[1, xxx_1], [3, zzz_345]]|
    +--------------------------+
    

    【讨论】:

    • 工作,但你能解释一下这个表达式:“filter(your_array, your_struct -> your_struct.propertyToFilterOn == 1)” - 它给了我我需要的结果,但我想了解如何这个表情有用吗?谢谢!
    • 这是一个高阶函数,它接受数组your_array 并将函数your_struct -> your_struct.propertyToFilterOn == 1 应用于your_array 中的每个元素。如果函数为your_array 中的给定元素返回false,则从数组中删除/过滤该元素。
    • 谢谢您的好心先生!
    猜你喜欢
    • 1970-01-01
    • 2017-08-08
    • 2018-04-27
    • 1970-01-01
    • 1970-01-01
    • 2022-01-16
    • 2020-05-16
    • 2022-12-29
    • 1970-01-01
    相关资源
    最近更新 更多