【发布时间】:2021-07-31 11:02:52
【问题描述】:
设置如下。
from pyspark.sql import Row, functions as F
from pyspark.sql.window import Window
import pandas as pd
data = {'A': [2,2,2,2], 'B': [0.5, 0.5,1,1.5]}
df = pd.DataFrame(data)
ddf = spark.createDataFrame(df)
我需要一个C列。计算C列的逻辑如下。
- 第一行 C = A - B。
- 所有其他行 C = lag(C) - B.
我发现很难满足第二种情况。因为上下文中不存在 lag(C)。
(
ddf
.withColumn
(
'C',
F.when(F.lag(F.col('B')).over(Window.partitionBy(F.col('A')).orderBy(F.col('A'))).isNull(), (F.col('A') - F.col('B')))
.otherwise
(
F.lag(F.col('C')).over(Window.partitionBy(F.col('A')).orderBy(F.col('A'))) - F.col('B')
# cannot resolve '`C`' given input columns: [A, B]
)
)
.show()
)
正确的结果应该如下。
+---+---+----+
| A| B| C |
+---+---+----+
| 2|0.5|1.5 |
| 2|0.5|1.0 |
| 2|1.0|0.0 |
| 2|1.5|-1.5|
+---+---+----+
希望问题很清楚。如何解决这种情况。
【问题讨论】:
标签: pyspark