【问题标题】:how to set date interval for counting id in pyspark?如何在pyspark中设置计数id的日期间隔?
【发布时间】:2021-03-02 16:38:46
【问题描述】:

我有一个 pyspark 数据框,其列 parsed_date (dtype: date) 和 id (dtype: bigint) 如下所示:

+-------+-----------+
|     id|parsed_date|
+-------+-----------+
|1471783| 2017-12-18|
|1471885| 2017-12-18|
|1472928| 2017-12-19|
|1476917| 2017-12-19|
|1477469| 2017-12-21|
|1478190| 2017-12-21|
|1478570| 2017-12-19|
|1481415| 2017-12-21|
|1472592| 2017-12-20|
|1474023| 2017-12-22|
|1474029| 2017-12-22|
+-------+-----------+

我有一个如下所示的函数。目的是传递一个日期(天)和 t(天数)。在 df1 中,id 计入范围(day-t,day)中,在 df2 中,id 计入范围(day,day+t)中。

from pyspark.sql import functions as F, Window

def hypo_1(df, day, t):

    df1 = (df.filter(f"parsed_date between '{day}' - interval {t} days and '{day}'")
             .withColumn('count_before', F.count('id').over(Window.partitionBy('parsed_date')))
             .orderBy('parsed_date')
          )
    df2 = (df.filter(f"parsed_date between '{day}' and '{day}' + interval {t} days")
             .withColumn('count_after', F.count('id').over(Window.partitionBy('parsed_date')))
             .orderBy('parsed_date')
          )
    return [df1, df2]

使用此代码,函数返回两个数据帧:

示例:hypo_1(df, '2017-12-20', 2)

df1

+-----------+-------+------------+
|parsed_date|     id|count_before|
+-----------+-------+------------+
| 2017-12-20|1471783|           1|
+-----------+-------+------------+

df2

+-----------+-------+-----------+
|parsed_date|     id|count_after| 
+-----------+-------+-----------+
| 2017-12-20|1472592|          1|
| 2017-12-21|1477469|          3|
| 2017-12-22|1474029|          2|
+-----------+-------+-----------+

问题:

  1. df1 的日期间隔看起来不正确。

  2. 不应该计算我通过的日期 (2017-12-20) 的 id,这在 df1 和 df2 中都发生 ->

     +-----------+-------+-----------+
     |parsed_date|     id|count_after| 
     +-----------+-------+-----------+
     | 2017-12-20|1472592|          1|
    

预期输出:

示例:hypo_1(df, '2017-12-20', 2)

df1:

+-------+-----------+------------+
|     id|parsed_date|count_before|
+-------+-----------+------------+
|1471783| 2017-12-18|           2|
|1478570| 2017-12-19|           3|
+-------+-----------+------------+

df2:

+-------+-----------+------------+
|     id|parsed_date| count_after|
+-------+-----------+------------+
|1477469| 2017-12-21|           3|
|1474023| 2017-12-22|           2|
+-------+-----------+------------+

请帮忙。

【问题讨论】:

    标签: python apache-spark date pyspark apache-spark-sql


    【解决方案1】:

    只需稍微更改您的过滤条件(添加- interval 1 day+ interval 1 day):

    from pyspark.sql import functions as F, Window
    
    def hypo_1(df, day, t):
        df1 = (df.filter(f"parsed_date between '{day}' - interval {t} days and '{day}' - interval 1 day")
                 .withColumn('count_before', F.count('id').over(Window.partitionBy('parsed_date')))
                 .orderBy('parsed_date')
              )
        df2 = (df.filter(f"parsed_date between '{day}' + interval 1 day and '{day}' + interval {t} days")
                 .withColumn('count_after', F.count('id').over(Window.partitionBy('parsed_date')))
                 .orderBy('parsed_date')
              )
        return [df1, df2]
    
    df1, df2 = hypo_1(df, '2017-12-20', 2)
    df1.show()
    +-------+-----------+------------+
    |     id|parsed_date|count_before|
    +-------+-----------+------------+
    |1471783| 2017-12-18|           2|
    |1471885| 2017-12-18|           2|
    |1472928| 2017-12-19|           3|
    |1476917| 2017-12-19|           3|
    |1478570| 2017-12-19|           3|
    +-------+-----------+------------+
    
    df2.show()
    +-------+-----------+-----------+
    |     id|parsed_date|count_after|
    +-------+-----------+-----------+
    |1481415| 2017-12-21|          3|
    |1478190| 2017-12-21|          3|
    |1477469| 2017-12-21|          3|
    |1474023| 2017-12-22|          2|
    |1474029| 2017-12-22|          2|
    +-------+-----------+-----------+
    

    如果你想得到你想要的输出,你可以删除重复,例如

    df1 = df1.dropDuplicates(['parsed_date', 'count_after'])
    

    【讨论】:

    • 感谢您的调查。它工作得很好。我想知道如果范围内缺少日期,此代码将如何反应?假设 2017-12-22 没有记录?是否有可能立即记录在案的日期?我的意思是如果 2017-12-22 和 2017-12-21 之后的下一个日期是 2017-12-24 那么有可能以某种方式接受吗?
    • 应该是可以的。您可以分配一个按 parsed_date 排序的dense_rank,并使用dense_rank <= 2 过滤行。随意打开另一个问题并提供一些示例数据框进行测试:)
    • 谢谢@mck,question 已创建。
    • 对于 df1,我将间隔 f"parsed_date between '{day}' - interval {t+1} days and '{day}' - interval 1 day" 更改为 f"parsed_date between '{day}' - interval {t} days and '{day}' - interval 1 day",因为它需要 t+1 天而不是 t
    • 是的,没错,我不知道为什么我把 t+1 放在那里......这是一个错误。
    猜你喜欢
    • 1970-01-01
    • 2021-01-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-04-29
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多