【发布时间】:2021-03-06 12:11:41
【问题描述】:
我在df 中有一些每日数据,这些数据可以追溯到 2020 年 1 月 1 日。它看起来与下面类似,但每天都有很多 id1s。
| yyyy_mm_dd | id1 | id2 | cost |
|------------|-----|------|-------|
| 2020-01-01 | 23 | 7253 | 5003 |
| 2020-01-01 | 23 | 7743 | 30340 |
| 2020-01-02 | 23 | 7253 | 450 |
| 2020-01-02 | 23 | 7743 | 4500 |
| ... | ... | ... | ... |
| 2021-01-01 | 23 | 7253 | 5675 |
| 2021-01-01 | 23 | 134 | 1030 |
| 2021-01-01 | 23 | 3445 | 564 |
| 2021-01-01 | 23 | 4534 | 345 |
| ... | ... | ... | ... |
我想计算 (1) 按季度和id1 分组的总成本,(2) 与去年同期相比的增长率。p>
我已经这样分组并计算了总成本:
grouped_quarterly = (
df
.withColumn('year_quarter', (F.year(sf.col('yyyy_mm_dd')) * 100 + F.quarter(F.col('yyyy_mm_dd'))
.groupby('id1', 'year_quarter')
.agg(
F.sum('cost').alias('cost')
)
)
但我不确定如何获得与上一年相比的增长。基于上述示例的预期输出:
| year_quarter | id1 | cost | cost_growth |
|--------------|-----|------|-------------|
| 202101 | 23 | 7614 | -81 |
如果 id1 在上一季度没有任何行,也可以将 cost_growth 设置为 0。
编辑:下面是进行比较的尝试,但我收到一个错误,即没有属性prev_value:
grouped_quarterly = (
df
.withColumn('year_quarter', (F.year(sf.col('yyyy_mm_dd')) * 100 + F.quarter(F.col('yyyy_mm_dd'))
.groupby('id1', 'year_quarter')
.agg(
F.sum('cost').alias('cost')
)
)
w = Window.partitionBy('id1').orderBy('year_quarter')
growth = (
grouped_quarterly
.withColumn('prev_value', sf.lag(grouped_quarterly.cost).over(w))
.withColumn('diff', sf.when(sf.isnull(grouped_quarterly.cost - grouped_quarterly.prev_value), 0).otherwise(grouped_quarterly.cost - grouped_quarterly.cost))
)
编辑#2:窗口函数似乎取自上一季度,与年份无关。这意味着我的prev_value 列是上一季度,而不是上一年的同一季度:
grouped_quarterly.where(sf.col('id1') == 222).sort('year_quarter').show(10,False)
| id1 | year_quarter | cost |
|-----|--------------|------|
| 222 | 202001 | 73 |
| 222 | 202002 | 246 |
| 222 | 202003 | 525 |
| 222 | 202004 | -27 |
| 222 | 202101 | 380 |
w = Window.partitionBy('id1').orderBy('year_quarter')
growth = (
grouped_quarterly
.withColumn('prev_value', sf.lag(sf.col('cost')).over(w))
.withColumn('diff', sf.when(sf.isnull(sf.col('cost') - sf.col('prev_value')), 0).otherwise(sf.col('cost') - sf.col('prev_value')))
)
growth.where(sf.col('id1') == 222).sort('year_quarter').show(10,False)
| id1 | year_quarter | cost | prev_value | diff |
|-----|--------------|------|------------|------|
| 222 | 202001 | 73 | null | 0 |
| 222 | 202002 | 246 | 73 | 173 |
| 222 | 202003 | 525 | 246 | 279 |
| 222 | 202004 | -27 | 525 | -522 |
| 222 | 202101 | 380 | -27 | 407 |
编辑#3:在分区中使用季度会导致所有行的 prev_value 为空:
grouped_quarterly.where(sf.col('id1') == 222).sort('year_quarter').show(10,False)
| id1 | year_quarter | cost |
|-----|--------------|------|
| 222 | 202001 | 73 |
| 222 | 202002 | 246 |
| 222 | 202003 | 525 |
| 222 | 202004 | -27 |
| 222 | 202101 | 380 |
w = Window.partitionBy(sf.col('id1'), sf.expr('substring(string(year_quarter), 2)')).orderBy('year_quarter')
growth = (
grouped_quarterly
.withColumn('prev_value', sf.lag(sf.col('cost')).over(w))
.withColumn('diff', sf.when(sf.isnull(sf.col('cost') - sf.col('prev_value')), 0).otherwise(sf.col('cost') - sf.col('prev_value')))
)
growth.where(sf.col('id1') == 222).sort('year_quarter').show(10,False)
| id1 | year_quarter | cost | prev_value | diff |
|-----|--------------|------|------------|-------|
| 222 | 202001 | 73 | null | 0 |
| 222 | 202002 | 246 | null | 0 |
| 222 | 202003 | 525 | null | 0 |
| 222 | 202004 | -27 | null | 0 |
| 222 | 202101 | 380 | null | 0 |
【问题讨论】:
-
谢谢,我尝试调整解决方案但遇到语法错误。我编辑了我的原始 Q 以包含我使用 Window 函数的尝试
-
使用
sf.col指定列名 -
解决了这个问题!但是输出不如预期,我现在添加了第二个编辑。
标签: python apache-spark pyspark apache-spark-sql