【问题标题】:Pyspark conditionally replace value in column with value from another columnPyspark 有条件地将列中的值替换为另一列中的值
【发布时间】:2021-07-02 23:05:56
【问题描述】:

我正在处理一些缺少某些值的天气数据(通过值代码表示)。例如,如果 SLP 数据丢失,则分配代码 99999。我能够使用窗口函数计算 7 天平均值并将其保存为新列。一个显着减少的单行示例如下所示:

SLP_ORIGIN SLP_ORIGIN_7DAY_AVG
99999 11945.823516044207

我正在尝试编写这样的代码,当SLP_ORIGIN 缺少代码时,它会使用SLP_ORIGIN_7DAY_AVG 值替换。但是,大多数代码都解释了如何根据条件将列值替换为常量值,而不是列值。我尝试使用以下内容:

train_impute = train.withColumn("SLP_ORIGIN", \
              when(train["SLP_ORIGIN"] == 99999, train["SLP_ORIGIN_7DAY_AVG"]).otherwise(train["SLP_ORIGIN"]))

其中数据帧被称为train

当我使用train.where("SLP_ORIGIN = 99999").count()SLP_ORIGIN 列执行计数时,我得到的计数与尝试替换该列中的值之前相同。我已经检查过了,我的 SLP_ORIGIN_7DAY_AVG 没有任何值与丢失的代码匹配。

那么我该如何将SLP_ORIGIN 列中的99999 值实际替换为关联的SLP_ORIGIN_7DAY_AVG 值?

甚至更好,有没有办法在不创建 7 天平均列的情况下进行此替换和窗口计算(我还有其他变量需要做同样的事情,所以我希望有是一种更有效的方法)。

【问题讨论】:

  • 你不应该指望train_impute,而不是train吗? train_impute.where("SLP_ORIGIN = 99999").count()
  • ....我确实选择了dumdum这个名字...谢谢。我想我会编辑询问第二部分(仅在需要时计算,而不是计算整个数据集的滚动平均值)
  • 我想出了第二部分。只需将窗口函数代码代替第二个位置的列值
  • @dumdum 如果解决了这个问题,你应该考虑回答你是如何解决的:)

标签: python-3.x pyspark databricks imputation


【解决方案1】:

确保仔细检查您正在验证的数据框。

我应该使用train_impute.where("SLP_ORIGIN = 99999").count(),而我使用的是train.where("SLP_ORIGIN = 99999").count()

此外,与其创建一个全新的列来存储估算的 7 天平均值,不如仅在存在缺失值代码时计算平均值:

train = train.withColumn("SLP_ORIGIN", when(train["SLP_ORIGIN"] == 99999, f.avg('SLP_ORIGIN').over(w)).otherwise(train["SLP_ORIGIN"]))\

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-05-20
    • 2012-11-06
    • 2019-12-04
    • 2018-10-18
    • 1970-01-01
    • 2023-03-15
    • 2019-03-28
    相关资源
    最近更新 更多