【问题标题】:SparkSQL - Filter / Reduce an Array of Structs by Key and LogicSparkSQL - 按键和逻辑过滤/减少结构数组
【发布时间】:2022-01-20 06:08:26
【问题描述】:

我一直在研究如何在纯 SparkSQL 中(仅)在理想情况下使用现有的 sparkSQL ARRAY 函数和 lambda 逻辑来减少基于时间的更改(由 LineId 支持)的销售线数组,减少到仅'latest' (MAX(Occurred)) by key (LineId) - 即每行给出当前状态。

我真的希望使用特定 ARRAY 函数(SLICE、FILTER、REDUCE 等 - 也许需要 SQL UDF?)而不是 classic SQL取消嵌套,ROW_NUMBER OVER ...,过滤where rowNum = 1,将剩余的重新聚合回逻辑数组类型。

这样的结果应该是第二个数组,将最初的 5 行数组减少到重复数据和最新的 3 行。

非常感谢

[
  {
    "OrdNum": "SalesOrderHeader_H1",
    "LineId": "SalesOrderLine_L2",
    "Occurred": "2021-12-01T00:00:00.000+0000",
    "Prod": "P2",
    "Amt": 34,
    "Qty": 2
  },
  {
    "OrdNum": "SalesOrderHeader_H1",
    "LineId": "SalesOrderLine_L1",
    "Occurred": "2021-12-01T00:00:00.000+0000",
    "Prod": "P1",
    "Amt": 13,
    "Qty": 2
  },
  {
    "OrdNum": "SalesOrderHeader_H1",
    "LineId": "SalesOrderLine_L2",
    "Occurred": "2021-12-02T00:00:00.000+0000",
    "Prod": "P2",
    "Amt": 17,
    "Qty": 1
  },
  {
    "OrdNum": "SalesOrderHeader_H1",
    "LineId": "SalesOrderLine_L3",
    "Occurred": "2021-12-02T00:00:00.000+0000",
    "Prod": "P3",
    "Amt": 100,
    "Qty": 5
  },
  {
    "OrdNum": "SalesOrderHeader_H1",
    "LineId": "SalesOrderLine_L3",
    "Occurred": "2021-12-03T00:00:00.000+0000",
    "Prod": "P3",
    "Amt": 60,
    "Qty": 3
  }
]

结果应该只剩下 3 个唯一行(SalesOrderLine_L1SalesOrderLine_L2SalesOrderLine_L3 为 1 行),其中每行都是“最新”/最新行 MAX(Occurred)。 L2 和 L3 都有一个额外的行,每个都会被过滤掉。

[
  {
    "OrdNum": "SalesOrderHeader_H1",
    "LineId": "SalesOrderLine_L1",
    "Occurred": "2021-12-01T00:00:00.000+0000",
    "Prod": "P1",
    "Amt": 13,
    "Qty": 2
  },
  {
    "OrdNum": "SalesOrderHeader_H1",
    "LineId": "SalesOrderLine_L2",
    "Occurred": "2021-12-02T00:00:00.000+0000",
    "Prod": "P2",
    "Amt": 17,
    "Qty": 1
  },
  {
    "OrdNum": "SalesOrderHeader_H1",
    "LineId": "SalesOrderLine_L3",
    "Occurred": "2021-12-03T00:00:00.000+0000",
    "Prod": "P3",
    "Amt": 60,
    "Qty": 3
  }
]

【问题讨论】:

    标签: apache-spark apache-spark-sql


    【解决方案1】:

    这是一种方法,首先从原始数组中选择不同的LineId,然后转换结果,对于每个LineId,我们使用array_maxsales 数组中搜索最大元素。但是为此,我们需要改变结构中属性的顺序,将Occurred作为第一个属性(参见how spark compares StructType):

    spark.sql("""
        select transform(
                array_distinct(transform(sales, x -> x.LineId)),
                x -> array_max(
                        filter(
                            transform(sales, s -> struct(s.Occurred as Occurred, s.OrdNum as OrdNum, s.LineId as LineId, s.Prod as Prod, s.Amt as Amt, s.Qty as Qty)), 
                            y -> y.LineId = x 
                        )
                    )
                ) as sales
        from  sales_table
    """).show(truncate=False)
    
    #+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    #|sales                                                                                                                                                                                                                                                    |
    #+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    #|[[2021-12-02T00:00:00.000+0000, SalesOrderHeader_H1, SalesOrderLine_L2, P2, 17, 1], [2021-12-01T00:00:00.000+0000, SalesOrderHeader_H1, SalesOrderLine_L1, P1, 13, 2], [2021-12-03T00:00:00.000+0000, SalesOrderHeader_H1, SalesOrderLine_L3, P3, 60, 3]]|
    #+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    

    另一种方式与第一种方式类似,但我们不需要更改属性顺序:

    select transform(
                array_distinct(transform(sales, x -> x.LineId)),
                x -> filter(sales, 
                            y -> y.LineId = x 
                                 and y.Occurred = array_max(transform(filter(sales, s -> s.LineId = x), t -> t.Occurred))
                            )[0]
                ) as sales
    from  sales_table
    

    【讨论】:

    • 哇酷会喝杯咖啡,消化并测试一下!到目前为止,我已经完成了一些这样的功能组合,这非常酷,谢谢。问题:一般来说,这类转换(在有界数组上)将是非常高效的,因为它在数据共置和“预连接”的数组上工作(与标题和行的单独增量表相比) - 这就是我一直假设并且给定大量数据的形状,总是首先尝试使用数组函数。希望看到 Spark 随着时间的推移添加更多的 Array 函数。无论如何 - 非常感谢!
    • 尽管上面很酷 - 我们需要添加 Spark:FILTER(Expr,GroupingKey,Func)
    猜你喜欢
    • 1970-01-01
    • 2019-10-16
    • 2018-10-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-10-01
    • 2018-12-05
    • 1970-01-01
    相关资源
    最近更新 更多