【问题标题】:Spark-Scala: Filtering Spark Dataset based on multiple column and conditionsSpark-Scala:基于多列和条件过滤 Spark 数据集
【发布时间】:2019-07-10 23:47:52
【问题描述】:

我很难找到过滤火花数据集的好方法。我已经描述了下面的基本问题:

  1. 对于每个键检查是否有 statusCode === UV
  2. 如果没有与该键关联的 UV 状态代码,则完全忽略该键。
    • 请注意:每个键应该有一个 UV
  3. 如果存在,则搜索 UV 时间戳之后最近的 OA 事件。
    • 请注意:UV 时间戳之后可能有多个 OA 事件。我想要最接近 UV 时间戳的那个。
  4. 如果唯一的 OA 事件发生在过去(即在 UV 之前,我仍然希望保留该记录,因为预期的 OA 将进来,但我仍想捕获状态代码为 OA 的行,但将值替换为 null

输入

+-----------+----------+-------------------+
|key        |statusCode|statusTimestamp    |
+-----------+----------+-------------------+
|AAAAAABBBBB|OA        |2019-05-24 14:46:00|
|AAAAAABBBBB|VD        |2019-05-31 19:31:00|
|AAAAAABBBBB|VA        |2019-06-26 00:00:00|
|AAAAAABBBBB|E         |2019-06-26 02:00:00|
|AAAAAABBBBB|UV        |2019-06-29 00:00:00|
|AAAAAABBBBB|OA        |2019-07-01 00:00:00|
|AAAAAABBBBB|EE        |2019-07-03 01:00:00|
+-----------+----------+-------------------+

预期输出

+-----------+----------+-------------------+
|key        |statusCode|statusTimestamp    |
+-----------+----------+-------------------+
|AAAAAABBBBB|UV        |2019-06-29 00:00:00|
|AAAAAABBBBB|OA        |2019-07-01 00:00:00|
+-----------+----------+-------------------+

我知道我可以通过像这样设置数据来解决问题,但是有人对如何解决上述过滤器有任何建议吗?

someDS
  .groupBy("key")
  .pivot("statusCode", Seq("UV", "OA"))
  .agg(collect_set($"statusTimestamp"))
  .thenSomeOtherStuff...

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    虽然groupBy/pivot 方法可以很好地对时间戳进行分组,但它需要非平凡的步骤(很可能是 UDF)来执行必要的过滤,然后再进行重新扩展。这是一种不同的方法,包括以下步骤:

    1. 仅针对statusCode“UV”或“OA”过滤数据集
    2. 对于每一行,使用窗口函数从previous, current, and next 2 rows 创建一个statusCode 的字符串
    3. 使用Regex 模式匹配来识别想要的行

    示例代码如下:

    import java.sql.Timestamp
    import org.apache.spark.sql.expressions.Window
    import org.apache.spark.sql.functions._
    import spark.implicits._
    
    // Sample data:
    //   key `A`: requirement #3
    //   key `B`: requirement #2
    //   key `C`: requirement #4  
    val df = Seq(
      ("A", "OA", Timestamp.valueOf("2019-05-20 00:00:00")),
      ("A", "E",  Timestamp.valueOf("2019-05-30 00:00:00")),
      ("A", "UV", Timestamp.valueOf("2019-06-22 00:00:00")),
      ("A", "OA", Timestamp.valueOf("2019-07-01 00:00:00")),
      ("A", "OA", Timestamp.valueOf("2019-07-03 00:00:00")),
      ("B", "C",  Timestamp.valueOf("2019-06-15 00:00:00")),
      ("B", "OA", Timestamp.valueOf("2019-06-25 00:00:00")),
      ("C", "D",  Timestamp.valueOf("2019-06-01 00:00:00")),
      ("C", "OA", Timestamp.valueOf("2019-06-30 00:00:00")),
      ("C", "UV", Timestamp.valueOf("2019-07-02 00:00:00"))
    ).toDF("key", "statusCode", "statusTimestamp")
    
    val win = Window.partitionBy("key").orderBy("statusTimestamp")
    
    val df2 = df.
      where($"statusCode" === "UV" || $"statusCode" === "OA").
      withColumn("statusPrevCurrNext2", concat(
        coalesce(lag($"statusCode", 1).over(win), lit("")),
        lit("#"),
        $"statusCode",
        lit("#"),
        coalesce(lead($"statusCode", 1).over(win), lit("")),
        lit("#"),
        coalesce(lead($"statusCode", 2).over(win), lit(""))
      ))
    

    让我们看看df2(步骤12的结果):

    df2.show(false)
    // +---+----------+-------------------+-------------------+
    // |key|statusCode|statusTimestamp    |statusPrevCurrNext2|
    // +---+----------+-------------------+-------------------+
    // |B  |OA        |2019-06-25 00:00:00|#OA##              |
    // |C  |OA        |2019-06-30 00:00:00|#OA#UV#            | <-- Req #4: Ends with `#UV#`
    // |C  |UV        |2019-07-02 00:00:00|OA#UV##            | <-- Req #4: Ends with `#UV##`
    // |A  |OA        |2019-05-20 00:00:00|#OA#UV#OA          |
    // |A  |UV        |2019-06-22 00:00:00|OA#UV#OA#OA        | <-- Req #3: Starts with `[^#]*#UV#`
    // |A  |OA        |2019-07-01 00:00:00|UV#OA#OA#          | <-- Req #3: starts with `UV#`
    // |A  |OA        |2019-07-03 00:00:00|OA#OA##            |
    // +---+----------+-------------------+-------------------+
    

    申请步骤3:

    df2.
      where($"statusPrevCurrNext2".rlike("^[^#]*#UV#.*|^UV#.*|.*#UV#+$")).
      orderBy("key", "statusTimestamp").
      show(false)
    // +---+----------+-------------------+-------------------+
    // |key|statusCode|statusTimestamp    |statusPrevCurrNext2|
    // +---+----------+-------------------+-------------------+
    // |A  |UV        |2019-06-22 00:00:00|OA#UV#OA#OA        |
    // |A  |OA        |2019-07-01 00:00:00|UV#OA#OA#          |
    // |C  |OA        |2019-06-30 00:00:00|#OA#UV#            |
    // |C  |UV        |2019-07-02 00:00:00|OA#UV##            |
    // +---+----------+-------------------+-------------------+
    

    【讨论】:

      猜你喜欢
      • 2019-11-24
      • 2022-01-22
      • 2020-09-04
      • 1970-01-01
      • 2021-07-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多