【发布时间】:2018-10-15 15:34:16
【问题描述】:
我有一个包含事件的 Spark 数据帧 (Pyspark 2.2.0),每个事件都有一个时间戳。还有一个包含一系列标签(A、B、C 或 Null)的附加列。我想为每一行计算 - 按事件组,按时间戳排序 - 当前最长的非 Null 标签更改的计数(Null 应将此计数重置为 0)。带有我理想计算列的 df 示例,称为stretch:
event timestamp tag stretch
G1 09:59:00 Null 0
G1 10:00:00 A 1 ---> first non Null tag starts the count
G1 10:01:00 A 1 ---> no change of tag
G1 10:02:00 B 2 ---> change of tag (A to B)
G1 10:03:00 A 3 ---> change of tag (B to A)
G1 10:04:00 Null 0 ---> Null resets the count
G1 10:05:00 A 1 ---> first non Null tag restarts the count
G2 10:00:00 B 1 ---> first non Null tag starts the count
G2 10:01:00 C 2 ---> change of tag (B to C)
在 Pyspark 中我可以这样定义一个窗口:
window = Window.partitionBy("event").orderBy(col("timestamp").asc())
并计算例如标签的变化:
df = df.withColumn("change_of_tag",col("tag")!=lag("tag",1).over(window))
但我找不到如何计算每次遇到 Null 标记时都会重置的这些更改的累积总和。我怀疑我应该定义一个按事件和标签类型(Null 或非 Null)分区的新窗口,但我不知道如何按事件分区,按时间戳排序,然后按标签类型分组。
【问题讨论】:
-
您是否尝试过使用 .groupBy 并基于 aggs 求和?
-
你能添加预期的结果吗?
-
@AliYesilli 预期结果已经存在,是拉伸列,如果不清楚,请见谅。
-
@Prazy 问题是我不知道如何分组 after 一个 orderby (我必须按事件分组,按时间戳排序,然后按非 null 分组/空标签)。
标签: apache-spark pyspark