【问题标题】:How to filter out rows based on previous consecutive rows?如何根据先前的连续行过滤掉行?
【发布时间】:2018-05-30 04:15:06
【问题描述】:

我有一个要求,其中数据帧按 col1(时间戳)排序,我需要按 col2 过滤。

col2 值小于前一行的 col2 值的任何行,我需要过滤掉该行。结果应该是 col2 值单调递增。

请注意,这不仅仅是两行。

例如,假设 4 行的 col2 的值为 4,2,3,5。结果应该是 4,5,因为第 2 行和第 3 行都小于 4(第一行值)。

val input = Seq(
  (1,4), (2,2), (3,3), (4,5), (5, 1), (6, 9), (7, 6)
).toDF("timestamp", "value")
scala> input.show
+---------+-----+
|timestamp|value|
+---------+-----+
|        1|    4|
|        2|    2|
|        3|    3|
|        4|    5|
|        5|    1|
|        6|    9|
|        7|    6|
+---------+-----+

val expected = Seq((1,4), (4,5), (6, 9)).toDF("timestamp", "value")
scala> expected.show
+---------+-----+
|timestamp|value|
+---------+-----+
|        1|    4|
|        4|    5|
|        6|    9|
+---------+-----+

请注意:

  • 第 2 行和第 3 行被过滤掉,因为它的值小于第 1 行中的值,即 4
  • 第 5 行被过滤掉,因为它的值小于第 4 行中的值,即 6

一般来说,有没有一种方法可以根据一行的值与前一行中的值的比较来过滤行?

【问题讨论】:

    标签: apache-spark apache-spark-sql


    【解决方案1】:

    我认为您所追求的称为running maximum(在running total 之后)。这总是让我使用窗口聚合

    // I made the input a bit more tricky
    val input = Seq(
      (1,4), (2,2), (3,3), (4,5), (5, 1), (6, 9), (7, 6)
    ).toDF("timestamp", "value")
    scala> input.show
    +---------+-----+
    |timestamp|value|
    +---------+-----+
    |        1|    4|
    |        2|    2|
    |        3|    3|
    |        4|    5|
    |        5|    1|
    |        6|    9|
    |        7|    6|
    +---------+-----+
    

    我的目标是以下预期结果。如果我错了,请纠正我。

    val expected = Seq((1,4), (4,5), (6, 9)).toDF("timestamp", "value")
    scala> expected.show
    +---------+-----+
    |timestamp|value|
    +---------+-----+
    |        1|    4|
    |        4|    5|
    |        6|    9|
    +---------+-----+
    

    “运行”问题的技巧是在定义窗口规范时使用rangeBetween

    import org.apache.spark.sql.expressions.Window
    val ts = Window
      .orderBy("timestamp")
      .rangeBetween(Window.unboundedPreceding, Window.currentRow)
    

    使用窗口规范,您可以从结果中过滤掉要删除的内容,然后就完成了。

    val result = input
      .withColumn("running_max", max("value") over ts)
      .where($"running_max" === $"value")
      .select("timestamp", "value")
    
    scala> result.show
    18/05/29 22:09:18 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
    +---------+-----+
    |timestamp|value|
    +---------+-----+
    |        1|    4|
    |        4|    5|
    |        6|    9|
    +---------+-----+
    

    正如您所见,它效率不高,因为它只使用单个分区(这会导致单线程执行效果不佳,因此与在单台机器上运行实验没有太大区别)。

    我认为我们可以对输入进行部分分区计算运行最大值,然后合并部分结果并再次重新运行运行最大值计算。只是我自己没有尝试过的想法。

    【讨论】:

    • rangeBetween 不需要,如果窗口指定了排序,它会自动使用“运行”最大值
    • 我想我可能已经看到需要它的 Spark 版本,因此我将它作为安全网包含在内。它不会造成伤害并使事情变得更加可预测。
    • 好的,但我建议使用rowsBetween 使其更具可读性
    • 感谢 Jacek 的详细回答。也感谢您回答我关于分区警告的后续问题。跑最大是关键。
    • @JacekLaskowski 它们根本不同,rowsBetween 是在“行数”中设置一个框架,rangeBetween 使用其他措施来定义一个框架,例如无论数据是如何采样的(在这种情况下,行数是动态计算的),这对于计算 10 米以上的滑动平均值很有用。
    【解决方案2】:

    检查与运行最大值的相等性应该可以解决问题:

    val input = Seq((1,4), (2,2), (3,3), (4,5), (5, 1), (6, 9), (7, 6)).toDF("timestamp", "value")
    
    input.show()
    
    +---------+-----+
    |timestamp|value|
    +---------+-----+
    |        1|    4|
    |        2|    2|
    |        3|    3|
    |        4|    5|
    |        5|    1|
    |        6|    9|
    |        7|    6|
    +---------+-----+
    
    
    input
      .withColumn("max",max($"value").over(Window.orderBy($"timestamp")))
      .where($"value"===$"max").drop($"max")
      .show()
    
    +---------+-----+
    |timestamp|value|
    +---------+-----+
    |        1|    4|
    |        4|    5|
    |        6|    9|
    +---------+-----+
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2018-01-16
      • 1970-01-01
      • 1970-01-01
      • 2021-08-01
      • 2011-01-26
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多