【问题标题】:How to maintain sort order in PySpark collect_list and collect multiple lists如何在 PySpark collect_list 中维护排序顺序并收集多个列表
【发布时间】:2019-04-11 12:06:27
【问题描述】:

我想维护日期排序顺序,对多个列使用 collect_list,所有列都具有相同的日期顺序。我将需要它们在同一个数据框中,以便我可以用来创建时间序列模型输入。以下是“train_data”的示例:

我正在使用带有 PartitionBy 的窗口来确保每个 Syscode_Stn 的 Tuning_evnt_start_dt 的排序顺序。我可以用这段代码创建一列:

from pyspark.sql import functions as F
from pyspark.sql import Window

w = Window.partitionBy('Syscode_Stn').orderBy('tuning_evnt_start_dt')

sorted_list_df = train_data
.withColumn('spp_imp_daily', F.collect_list('spp_imp_daily').over(w)
           )\
.groupBy('Syscode_Stn')\
.agg(F.max('spp_imp_daily').alias('spp_imp_daily'))

但是如何在同一个新数据框中创建两列?

w = Window.partitionBy('Syscode_Stn').orderBy('tuning_evnt_start_dt')

sorted_list_df = train_data
.withColumn('spp_imp_daily',F.collect_list('spp_imp_daily').over(w))
.withColumn('MarchMadInd', F.collect_list('MarchMadInd').over(w))
.groupBy('Syscode_Stn')
.agg(F.max('spp_imp_daily').alias('spp_imp_daily')))

请注意,MarchMadInd 未显示在屏幕截图中,但包含在 train_data 中。解释我是如何到达我现在的位置的:https://stackoverflow.com/a/49255498/8691976

【问题讨论】:

  • 类似的代码对我来说效果很好。因此,在窗口上创建 2 个带有 withColumn 的新列似乎不是问题。我刚刚看到你的最后一行有 3 个右括号。这可能是语法问题。

标签: pyspark pyspark-sql


【解决方案1】:

是的,正确的方法是添加连续的 .withColumn 语句,然后添加一个 .agg 语句来删除每个数组的重复项。

w = Window.partitionBy('Syscode_Stn').orderBy('tuning_evnt_start_dt')

sorted_list_df = train_data.withColumn('spp_imp_daily', 
F.collect_list('spp_imp_daily').over(w)
                                  )\
.withColumn('MarchMadInd', F.collect_list('MarchMadInd').over(w))\

.groupBy('Syscode_Stn')\
.agg(F.max('spp_imp_daily').alias('spp_imp_daily'), 
 F.max('MarchMadInd').alias('MarchMadInd')
)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2022-07-20
    • 2020-07-31
    • 1970-01-01
    • 1970-01-01
    • 2015-06-27
    • 2022-01-17
    • 1970-01-01
    • 2017-06-10
    相关资源
    最近更新 更多