【问题标题】:Applying a Window function to calculate differences in pySpark应用 Window 函数计算 pySpark 中的差异
【发布时间】:2016-08-12 01:01:48
【问题描述】:

我正在使用pySpark,并设置了我的数据框,其中两列代表每日资产价格,如下所示:

ind = sc.parallelize(range(1,5))
prices = sc.parallelize([33.3,31.1,51.2,21.3])
data = ind.zip(prices)
df = sqlCtx.createDataFrame(data,["day","price"])

我申请df.show()

+---+-----+
|day|price|
+---+-----+
|  1| 33.3|
|  2| 31.1|
|  3| 51.2|
|  4| 21.3|
+---+-----+

这很好。我想要另一列包含价格列的日常收益,即类似

(price(day2)-price(day1))/(price(day1))

经过大量研究,我被告知这是通过应用 pyspark.sql.window 函数最有效地完成的,但我不知道如何。

【问题讨论】:

  • 我假设 sqlCtx 等同于使用 sc = SparkContext('local') spark = SparkSession(sc) 获得的 'spark' 对象

标签: pyspark spark-dataframe window-functions pyspark-sql


【解决方案1】:

您可以使用lag 函数来获取前一天的列,并添加额外的列以从这两列中实际返回,但您可能必须告诉 spark 如何分区您的数据和/或命令它做滞后,像这样:

from pyspark.sql.window import Window
import pyspark.sql.functions as func
from pyspark.sql.functions import lit

dfu = df.withColumn('user', lit('tmoore'))

df_lag = dfu.withColumn('prev_day_price',
                        func.lag(dfu['price'])
                                 .over(Window.partitionBy("user")))

result = df_lag.withColumn('daily_return', 
          (df_lag['price'] - df_lag['prev_day_price']) / df_lag['price'] )

>>> result.show()
+---+-----+-------+--------------+--------------------+
|day|price|   user|prev_day_price|        daily_return|
+---+-----+-------+--------------+--------------------+
|  1| 33.3| tmoore|          null|                null|
|  2| 31.1| tmoore|          33.3|-0.07073954983922816|
|  3| 51.2| tmoore|          31.1|         0.392578125|
|  4| 21.3| tmoore|          51.2|  -1.403755868544601|
+---+-----+-------+--------------+--------------------+

这里是Window functions in Spark的详细介绍。

【讨论】:

  • 嗨。谢谢!这是非常有用的。顺便问一下,“点亮”功能是做什么的?
  • lit - 创建一个文字值列 - spark.apache.org/docs/latest/api/python/…
  • 小注。对滞后适用的列进行排序也是一种好习惯,例如Window.partitionBy("user").orderBy("day", ascending=True)
  • 评估 df_lag,我得到一个错误:Window function lag(price#66, 1, null) requires window to beorder, dfu.withColumn('prev_day_price',func.lag(dfu['price ']).over(Window.orderBy("user"))) 解决了这个问题
  • 如何使用 spark 结构化流实现这一点?
【解决方案2】:

滞后功能可以帮助您解决您的用例。

from pyspark.sql.window import Window
import pyspark.sql.functions as func

### Defining the window 
Windowspec=Window.orderBy("day")

### Calculating lag of price at each day level
prev_day_price= df.withColumn('prev_day_price',
                        func.lag(dfu['price'])
                                .over(Windowspec))

### Calculating the average                                  
result = prev_day_price.withColumn('daily_return', 
          (prev_day_price['price'] - prev_day_price['prev_day_price']) / 
prev_day_price['price'] )

【讨论】:

    猜你喜欢
    • 2020-04-03
    • 2017-10-16
    • 1970-01-01
    • 2018-03-27
    • 1970-01-01
    • 2019-04-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多