【问题标题】:Pyspark: finding blocks of timestamp data without pausePyspark:在没有暂停的情况下查找时间戳数据块
【发布时间】:2021-06-05 01:25:58
【问题描述】:

我正在处理 PySpark 中的一个问题。

输入:

+-------+---------------------+
|user_id|activity_timestamp   |
+-------+---------------------+
|      1|  2021/06/01 19:00   |
|      1|  2021/06/01 19:01   |
|      2|  2021/06/01 19:01   |
|      1|  2021/06/01 19:02   |
|      2|  2021/06/01 19:02   |
|      1|  2021/06/01 19:10   |
|      1|  2021/06/01 19:11   |
+-------+---------------------+

期望的输出: 对于每个用户,检测连续活动时间戳的周期(时间戳之间的间隔小于例如 5 分钟)

+-------+---------------------------+---------------------------+
|user_id|  activity _start          |  activity _stop           |
+-------+---------------------------+---------------------------+
|      1|  2021/06/01 19:00         |  2021/06/01 19:02         |
|      1|  2021/06/01 19:10         |  2021/06/01 19:11         |
|      2|  2021/06/01 19:01         |  2021/06/01 19:02         |

到目前为止的进展:我使用了一个窗口函数来查找上一个活动的时间,并从中计算出自上一个活动以来经过的时间。但我正在努力创建所需的输出。

time_window = Window.partitionBy("user_id").orderBy("user_id", "activity_timestamp")

df = (df
.withColumn('prev_time', F.lag(F.col('activity_timestamp')).over(time_window))
.withColumn('time_gap', F.col('activity_timestamp').cast("long") - F.col('prev_time').cast("long"))
)

【问题讨论】:

    标签: pyspark


    【解决方案1】:

    从一个窗口内的两个后续行之间的时间差开始,就像问题中的time_gap一样。

    接下来的步骤是:

    • 创建一个新列,其值取决于此差异:如果差异小于 5 分钟,则新列获取值 0,否则为 1
    • 对窗口中的新列求和。属于同一时期的所有行都将获得相同的编号:只有时期中的第一行包含1,所有其他行都包含0。在下面的代码中,此列称为id
    • 将数据帧按user_idid 分组,并将activity_timestamp 的最小值和最大值作为周期的开始和结束
    from pyspark.sql import functions as F
    
    df.selectExpr("*", """
    sum(
          case
                when lag(activity_timestamp) over (PARTITION BY user_id ORDER BY activity_timestamp) is null then 1
                when cast(activity_timestamp as long)-cast(lag(activity_timestamp) over (PARTITION BY user_id ORDER BY activity_timestamp) as long) > 60 * 5 then 1
                else 0 
          end
    ) over (PARTITION BY user_id ORDER BY activity_timestamp) as id
    """) \
    .groupBy("user_id", "id") \
           .agg(F.min("activity_timestamp").alias("activity_start"),
                F.max("activity_timestamp").alias("activity_end")) \
    .drop("id") \
    .show()
    

    输出:

    +-------+-------------------+-------------------+
    |user_id|     activity_start|       activity_end|
    +-------+-------------------+-------------------+
    |      1|2021-06-01 19:00:00|2021-06-01 19:02:00|
    |      1|2021-06-01 19:10:00|2021-06-01 19:11:00|
    |      2|2021-06-01 19:01:00|2021-06-01 19:02:00|
    +-------+-------------------+-------------------+
    

    【讨论】:

      【解决方案2】:

      先将时间换算成分钟,再获取同一行的相邻时间。从这里我们可以计算时间差,然后进行转换,如果时间差小于 5 分钟,则添加一列,如果时间差小于 5 分钟,则添加一列。只需对这个新列进行求和,重用该窗口即可获得所需的结果。

      from pyspark.sql.window import Window
      from pyspark.sql import functions as F
      from pyspark.sql import types as T
      
      w = Window.partitionBy('user_id').orderBy('unixtime')
      
      df.withColumn('unixtime', F.unix_timestamp('activity_timestamp') / 60)\
         .withColumn('lag', F.lag('unixtime').over(w))\
         .withColumn('diff', F.col('lag') - F.col('unixtime'))\
         .withColumn('group', F.sum(F.when(F.col('diff') > -5., 0).otherwise(1)).over(w))\
         .groupBy('user_id', 'group').agg(F.first('activity_timestamp').alias('activity_start'), 
                                          F.last('activity_timestamp').alias('activity_end'))\
         .drop('group').show()
      

      输出

      +-------+-------------------+-------------------+
      |user_id|     activity_start|       activity_end|
      +-------+-------------------+-------------------+
      |      1|2021-06-01 19:00:00|2021-06-01 19:02:00|
      |      1|2021-06-01 19:10:00|2021-06-01 19:11:00|
      |      2|2021-06-01 19:01:00|2021-06-01 19:02:00|
      +-------+-------------------+-------------------+
      

      【讨论】:

        猜你喜欢
        • 2011-03-27
        • 2018-06-22
        • 2014-10-28
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-09-04
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多