【问题标题】:Spark SQL Partition By, Window, Order By, CountSpark SQL 分区依据、窗口、排序依据、计数
【发布时间】:2018-12-11 18:16:28
【问题描述】:

假设我有一个包含杂志订阅信息的数据框:

subscription_id    user_id       created_at       expiration_date
 12384               1           2018-08-10        2018-12-10
 83294               1           2018-06-03        2018-10-03
 98234               1           2018-04-08        2018-08-08
 24903               2           2018-05-08        2018-07-08
 32843               2           2018-03-25        2018-05-25
 09283               2           2018-04-07        2018-06-07

现在我想添加一个列,说明在当前订阅开始之前用户有多少之前的订阅已过期。换句话说,与给定用户关联的到期日期在此订阅的开始日期之前有多少。这是所需的完整输出:

subscription_id    user_id       created_at       expiration_date   previous_expired
 12384               1           2018-08-10        2018-12-10          1
 83294               1           2018-06-03        2018-10-03          0
 98234               1           2018-04-08        2018-08-08          0
 24903               2           2018-05-08        2018-07-08          2
 32843               2           2018-03-25        2018-05-03          1
 09283               2           2018-01-25        2018-02-25          0

尝试:

编辑:使用 Python 尝试了各种滞后/领先/等,我现在认为这是一个 SQL 问题

df = df.withColumn('shiftlag', func.lag(df.expires_at).over(Window.partitionBy('user_id').orderBy('created_at')))

我想我用尽了滞后/领先/移位方法,发现它不起作用。我现在认为最好使用 Spark SQL 来执行此操作,也许使用 case when 来生成新列,并结合 having count,按 ID 分组?

【问题讨论】:

  • 看起来你的一位同学已经发布了这个问题。 Pyspark - GroupBy and Count combined with a WHERE 的可能重复项。 编辑:我发现这略有不同,我撤回了我的近距离投票。
  • 哇哇。那是以前的问题,这个问题在与不同的列进行比较时有点困难。我想我需要使用滞后/领先

标签: python mysql sql sql-server python-2.7


【解决方案1】:

使用 PySpark 解决了这个问题:

我首先创建了另一个列,其中包含每个用户的所有到期日期:

joined_array = df.groupBy('user_id').agg(collect_set('expiration_date'))

然后将该数组连接回原始数据框:

joined_array = joined_array.toDF('user_idDROP', 'expiration_date_array')
df = df.join(joined_array, df.user_id == joined_array.user_idDROP, how = 'left').drop('user_idDROP')

然后创建一个函数来遍历数组,如果创建日期大于到期日期,则计数加 1:

def check_expiration_count(created_at, expiration_array):
  if not expiration_array:
    return 0
  else:
   count = 0
    for i in expiration_array:
  if created_at > i:
    count += 1
return count

check_expiration_count = udf(check_expiration_count, IntegerType())

然后应用该函数创建一个具有正确计数的新列:

df = df.withColumn('count_of_subs_ending_before_creation', check_expiration_count(df.created_at, df.expiration_array))

哇啦。完毕。谢谢大家(没有人帮忙,但还是谢谢)。希望有人在 2022 年发现这很有用

【讨论】:

  • 该死的已经是 2022 年了..
猜你喜欢
  • 2021-10-23
  • 2023-03-13
  • 1970-01-01
  • 1970-01-01
  • 2010-09-06
  • 1970-01-01
  • 2011-09-24
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多