【发布时间】:2019-03-03 22:12:45
【问题描述】:
我可以使用 spark sql 计算贴现的未来累积总和吗?下面是一个使用窗口函数计算未贴现 cum future sum 的示例,我硬编码了我所说的贴现 cum sum 的含义:
from pyspark.sql.window import Window
def undiscountedCummulativeFutureReward(df):
windowSpec = Window \
.partitionBy('user') \
.orderBy('time') \
.rangeBetween(0, Window.unboundedFollowing)
tot_reward = F.sum('reward').over(windowSpec)
df_tot_reward = df.withColumn('undiscounted', tot_reward)
return df_tot_reward
def makeData(spark, gamma=0.5):
data = [{'user': 'bob', 'time': 3, 'reward': 10, 'discounted_cum': 10 + (gamma * 9) + ((gamma ** 2) * 11)},
{'user': 'bob', 'time': 4, 'reward': 9, 'discounted_cum': 9 + gamma * 11},
{'user': 'bob', 'time': 5, 'reward': 11, 'discounted_cum': 11.0},
{'user': 'jo', 'time': 4, 'reward': 6, 'discounted_cum': 6 + gamma * 7},
{'user': 'jo', 'time': 5, 'reward': 7, 'discounted_cum': 7.0},
]
schema = T.StructType([T.StructField('user', T.StringType(), False),
T.StructField('time', T.IntegerType(), False),
T.StructField('reward', T.IntegerType(), False),
T.StructField('discounted_cum', T.FloatType(), False)])
return spark.createDataFrame(data=data, schema=schema)
def main(spark):
df = makeData(spark)
df = undiscountedCummulativeFutureReward(df)
df.orderBy('user', 'time').show()
return df
当你运行它时,你会得到:
+----+----+------+--------------+------------+
|user|time|reward|discounted_cum|undiscounted|
+----+----+------+--------------+------------+
| bob| 3| 10| 17.25| 30|
| bob| 4| 9| 14.5| 20|
| bob| 5| 11| 11.0| 11|
| jo| 4| 6| 9.5| 13|
| jo| 5| 7| 7.0| 7|
+----+----+------+--------------+------------+
打折的是sum \gamma^k r_k for k=0 to \infinity
我想知道我是否可以使用 Window 函数计算折扣列,例如引入带有等级的列、带有 gamma 的文字、将事物相乘 - 但仍然不太清楚 - 我想我可以用某种方法来做的UDF,但我认为我必须首先collect_as_list所有用户,返回一个带有暨折扣总和的新列表,然后分解列表。
【问题讨论】:
标签: sql apache-spark pyspark