【问题标题】:Shifting a PySpark dataframe column by a variable value in another column将 PySpark 数据框列移动另一列中的变量值
【发布时间】:2021-11-16 18:13:15
【问题描述】:

我有一个看起来像这样的 PySpark 数据框

Date Value Shift_Index
2021/02/11 50.12 0
2021/02/12 72.30 4
2021/02/15 81.87 1
2021/02/16 90.12 2
2021/02/17 91.31 1
2021/02/18 81.23 2
2021/02/19 73.45 1
2021/02/22 87.17 0

我想引导我必须传递的偏移量(基于此处 Shift_Index 列中的值)取决于整数类型的特定列。 我们能否以某种方式使用取决于 spark SQL 中领先/滞后函数中的列值的偏移值? 我想要有点像这样,它在 SQL server 中运行良好,但不幸的是在 Spark SQL 中抛出异常。

Create table test_table(ID int identity(1,1), Value float, shift_col int, New_Value float)

SELECT Value, shift_col,
ISNULL(LEAD(Value, shift_col) OVER(ORDER BY ID ASC), Value) AS New_Value
FROM test_table

我需要的最终结果类似于:

Date Value Shift_Index New_Value
2021/02/11 50.12 0 50.12
2021/02/12 72.30 4 81.23
2021/02/15 81.87 1 90.12
2021/02/16 90.12 2 81.23
2021/02/17 91.31 1 81.23
2021/02/18 81.23 2 87.17
2021/02/19 73.45 1 87.17
2021/02/22 87.17 0 87.17

遇到以下异常

  • Py4JJavaError: 调用 o77.sql 时出错。
  • :org.apache.spark.sql.AnalysisException:由于数据类型不匹配,无法解析“lead(sample_data_temp.shift_col, NULL)”:偏移表达式“shift_col#2835”必须是文字

任何帮助将不胜感激。 提前致谢。

【问题讨论】:

  • 不是火花用例。
  • 可以做到,但需要跳出框框思考。
  • 你的数据框是否总是按Date排序?
  • @snithish :是的,数据帧将始终按Date排序

标签: apache-spark pyspark apache-spark-sql window-functions


【解决方案1】:

你可以用窗户和铅做到这一点。如果 Shift_index 的值非常分散,则可以执行 select distinct 来确定需要哪些班次,而不是直到最大班次。

理想情况下,你有一些东西来划分你的窗口,否则这对于大型数据集来说可能非常繁重。 Spark 提供警告:

WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

编辑:没有连接的解决方案,仍然没有分区意味着这不能很好地并行化。

from pyspark.sql import functions as f
from pyspark.sql.window import Window

w = Window().orderBy(f.col('Date'))

max_shift = df.agg(f.max('Shift_index')).collect()[0][0]

for shift in range(1, max_shift+1):
    df = df.withColumn('Value' + str(shift), f.lead(f.col('Value'), shift).over(w))

case_shift = 'CASE Shift_index WHEN 0 THEN Value ' + ' '.join([f'WHEN {i} THEN Value{i}' for i in range(1, max_shift + 1)]) + ' ELSE NULL END'

df = df.select(
    f.col('Date'),
    f.col('Shift_index'),
    f.col('Value'),
    f.expr(case_shift).alias('New_Value')
)

df.show()

+----------+-----+-----------+---------+                                        
|      Date|Value|Shift_index|New_Value|
+----------+-----+-----------+---------+
|2021/02/11|50.12|          0|    50.12|
|2021/02/12| 72.3|          4|    81.23|
|2021/02/15|81.87|          1|    90.12|
|2021/02/16|90.12|          2|    81.23|
|2021/02/17|91.31|          1|    81.23|
|2021/02/18|81.23|          2|    87.17|
|2021/02/19|73.45|          1|    87.17|
|2021/02/22|87.17|          0|    87.17|
+----------+-----+-----------+---------+

【讨论】:

  • 在规模上这将如何执行?
  • 恐怕不太好,问题中的问题不能很好地并行化。但是很难判断用例是否未知。 Spark 有用地提供了一个警告,我将在答案中添加它以使其更清楚。
  • 确实,所以你需要做不同的事情。
  • 您可以随意添加答案
  • 但是有兴趣看看你能不能...
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多