【发布时间】:2021-06-05 01:25:58
【问题描述】:
我正在处理 PySpark 中的一个问题。
输入:
+-------+---------------------+
|user_id|activity_timestamp |
+-------+---------------------+
| 1| 2021/06/01 19:00 |
| 1| 2021/06/01 19:01 |
| 2| 2021/06/01 19:01 |
| 1| 2021/06/01 19:02 |
| 2| 2021/06/01 19:02 |
| 1| 2021/06/01 19:10 |
| 1| 2021/06/01 19:11 |
+-------+---------------------+
期望的输出: 对于每个用户,检测连续活动时间戳的周期(时间戳之间的间隔小于例如 5 分钟)
+-------+---------------------------+---------------------------+
|user_id| activity _start | activity _stop |
+-------+---------------------------+---------------------------+
| 1| 2021/06/01 19:00 | 2021/06/01 19:02 |
| 1| 2021/06/01 19:10 | 2021/06/01 19:11 |
| 2| 2021/06/01 19:01 | 2021/06/01 19:02 |
到目前为止的进展:我使用了一个窗口函数来查找上一个活动的时间,并从中计算出自上一个活动以来经过的时间。但我正在努力创建所需的输出。
time_window = Window.partitionBy("user_id").orderBy("user_id", "activity_timestamp")
df = (df
.withColumn('prev_time', F.lag(F.col('activity_timestamp')).over(time_window))
.withColumn('time_gap', F.col('activity_timestamp').cast("long") - F.col('prev_time').cast("long"))
)
【问题讨论】:
标签: pyspark