【问题标题】:Spark window function per time每次触发窗口函数
【发布时间】:2018-12-16 15:04:04
【问题描述】:

我有一些具有以下结构的数据框:

ID| Page    |   User          |    Timestamp      |
|1|Page 1   |Ericd            |2002-09-07 19:39:55|
|1|Page 1   |Liir             |2002-10-12 03:01:42|
|1|Page 1   |Tubby            |2002-10-12 03:02:23|
|1|Page 1   |Mojo             |2002-10-12 03:18:24|
|1|Page 1   |Kirf             |2002-10-12 03:19:03|
|2|Page 2   |The Epopt        |2001-11-28 22:27:37|
|2|Page 2   |Conversion script|2002-02-03 01:49:16|
|2|Page 2   |Bryan Derksen    |2002-02-25 16:51:15|
|2|Page 2   |Gear             |2002-10-04 12:46:06|
|2|Page 2   |Tim Starling     |2002-10-06 08:13:42|
|2|Page 2   |Tim Starling     |2002-10-07 03:00:54|
|2|Page 2   |Salsa Shark      |2003-03-18 01:45:32|

我想找出在一段时间内(例如每个月)访问过这些页面的用户数量。例如,对于 2002 年的第 10 个月,结果将是

|1|Page 1   |Liir             |2002-10-12 03:01:42| 
|1|Page 1   |Tubby            |2002-10-12 03:02:23|
|1|Page 1   |Mojo             |2002-10-12 03:18:24|
|1|Page 1   |Kirf             |2002-10-12 03:19:03|
|2|Page 2   |Gear             |2002-10-04 12:46:06|
|2|Page 2   |Tim Starling     |2002-10-06 08:13:42|
|2|Page 2   |Tim Starling     |2002-10-07 03:00:54|

以及页数:

              numberOfUsers (in October 2002)
|1|Page 1   |      4
|2|Page 2   |      3 

问题还在于如何将这个逻辑应用于每年的每个月。例如,我想出了如何查找过去 n 天发生的事件

days = lambda i: i * 86400
window = (Window().partitionBy(col("page"))
          .orderBy(col("timestamp").cast("timestamp").cast("long")).rangeBetween(-days(30), 0))

df = df.withColumn("monthly_occurrences", func.count("user").over(window))
df.show()

一些建议我将不胜感激

【问题讨论】:

  • 您好 metron,您的问题解决了吗?如果是这样,请考虑 accepting 将您的问题标记为已解决的答案之一。

标签: python apache-spark pyspark apache-spark-sql


【解决方案1】:

您可以先创建包含年月组合的列,然后使用该列进行分组。所以一个可行的例子是:

import pyspark.sql.functions as F

df = sc.parallelize([
    ('2018-06-02T00:00:00.000Z','tim', 'page 1' ),
    ('2018-07-20T00:00:00.000Z','tim', 'page 1' ),
    ('2018-07-20T00:00:00.000Z','john', 'page 2' ),
    ('2018-07-20T00:00:00.000Z','john', 'page 2' ),
    ('2018-08-20T00:00:00.000Z','john', 'page 2' )
]).toDF(("datetime","user","page" ))

df = df.withColumn('yearmonth',F.concat(F.year('datetime'),F.lit('-'),F.month('datetime')))    
df_agg = df.groupBy('yearmonth','page').count()
df_agg.show()

输出:

+---------+------+-----+
|yearmonth|  page|count|
+---------+------+-----+
|   2018-7|page 2|    2|
|   2018-6|page 1|    1|
|   2018-7|page 1|    1|
|   2018-8|page 2|    1|
+---------+------+-----+

希望这会有所帮助!

【讨论】:

    【解决方案2】:

    如果您正在寻找动态期间,首先将日期转换为时间戳,然后减去今天的所有时间戳,并将(整数)除以您要分组的时间间隔的时间戳。下面的代码按 5 天的间隔对行进行分组。

    import pyspark.sql.functions as F
    from datetime import datetime
    
    # todays timestamp
    Today = datetime.today().timestamp()
    # how many timestamp is today 
    DAY_TIMESTAMPS = 24 * 60 * 60
    
    df = sc.parallelize([
        ('2017-06-02 00:00:00','tim', 'page 1' ),
        ('2017-07-20 00:00:00','tim', 'page 1' ),
        ('2017-07-21 00:00:00','john', 'page 2' ),
        ('2017-07-22 00:00:00','john', 'page 2' ),
        ('2017-08-23 00:00:00','john', 'page 2' )
    ]).toDF(("datetime","user","page" ))
    
    # group by five days
    timeInterval = 5* DAY_TIMESTAMPS
    
    df \
        .withColumn('timestamp', F.unix_timestamp(F.to_date('datetime', 'yyyy-MM-dd HH:mm:ss'))) \ 
        .withColumn('timeIntervalBefore', ((Today-F.col('timestamp'))/(timeInterval)).cast('integer')) \
        .groupBy('timeIntervalBefore', 'page') \
        .agg(F.count('user').alias('number of users')).show()
    

    结果:

    +------------------+------+---------------+
    |timeIntervalBefore|  page|number of users|
    +------------------+------+---------------+
    |                70|page 2|              2|
    |                80|page 1|              1|
    |                70|page 1|              1|
    |                64|page 2|              1|
    +------------------+------+---------------+
    

    如果您需要估算时间段的日期:

    df \
        .withColumn('timestamp', F.unix_timestamp(F.to_date('datetime', 'yyyy-MM-dd HH:mm:ss'))) \
        .withColumn('timeIntervalBefore', ((Today-F.col('timestamp'))/(timeInterval)).cast('integer')) \
        .groupBy('timeIntervalBefore', 'page') \
        .agg(
            F.count('user').alias('number_of_users'), 
            F.min('timestamp').alias('FirstDay'), 
            F.max('timestamp').alias('LastDay')) \
        .select(
            'page', 
            'number_of_users', 
            F.from_unixtime('firstday').alias('firstDay'), 
            F.from_unixtime('firstday').alias('lastDay')).show()
    

    结果:

    +------+---------------+-------------------+-------------------+
    |  page|number_of_users|           firstDay|            lastDay|
    +------+---------------+-------------------+-------------------+
    |page 2|              2|2017-07-21 00:00:00|2017-07-21 00:00:00|
    |page 1|              1|2017-06-02 00:00:00|2017-06-02 00:00:00|
    |page 1|              1|2017-07-20 00:00:00|2017-07-20 00:00:00|
    |page 2|              1|2017-08-23 00:00:00|2017-08-23 00:00:00|
    +------+---------------+-------------------+-------------------+
    

    【讨论】:

      猜你喜欢
      • 2020-07-16
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-01-17
      • 2012-11-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多