【问题标题】:pyspark get latest non-null element of every column in one rowpyspark 获取一行中每一列的最新非空元素
【发布时间】:2022-01-23 14:24:32
【问题描述】:

让我用一个例子来解释我的问题: 我有一个数据框:

pd_1 = pd.DataFrame({'day':[1,2,3,2,1,3], 
                     'code': [10, 10, 20,20,30,30],
                     'A': [44, 55, 66,77,88,99],
                     'B':['a',None,'c',None,'d', None],
                     'C':[None,None,'12',None,None, None]
                    })
df_1 = sc.createDataFrame(pd_1)
df_1.show()

输出:

+---+----+---+----+----+
|day|code|  A|   B|   C|
+---+----+---+----+----+
|  1|  10| 44|   a|null|
|  2|  10| 55|null|null|
|  3|  20| 66|   c|  12|
|  2|  20| 77|null|null|
|  1|  30| 88|   d|null|
|  3|  30| 99|null|null|
+---+----+---+----+----+

我想要实现的是一个新的数据框,每一行对应一个code,并且对于每一列,我想要拥有最新的非空值(最高的day)。

在 pandas 中,我可以简单地做

pd_2 = pd_1.sort_values('day', ascending=True).groupby('code').last()
pd_2.reset_index()

得到

    code    day A   B   C
0   10       2  55  a   None
1   20       3  66  c   12
2   30       3  99  d   None

我的问题是,如何在 pyspark(最好是版本


到目前为止我尝试过的是:

from pyspark.sql import Window
import pyspark.sql.functions as F

w = Window.partitionBy('code').orderBy(F.desc('day')).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

## Update: after applying @Steven's idea to remove for loop:
df_1 = df_1 .select([F.collect_list(x).over(w).getItem(0).alias(x) for x in df_.columns])

##for x in df_1.columns:
##    df_1 = df_1.withColumn(x, F.collect_list(x).over(w).getItem(0))

df_1 = df_1.distinct()
df_1.show()

输出

+---+----+---+---+----+
|day|code|  A|  B|   C|
+---+----+---+---+----+
|  2|  10| 55|  a|null|
|  3|  30| 99|  d|null|
|  3|  20| 66|  c|  12|
+---+----+---+---+----+

我不太满意,尤其是因为for loop

【问题讨论】:

  • @Steven 谢谢。但我认为它会有同样的循环遍历所有列的问题:|
  • 如果您的问题只是 for 循环,请将其更改为 select 中的列表理解
  • 改进代码是个好主意:)
  • 问题是最后一行将包含来自不同行的元素(使用相同的代码),而不仅仅是选择一行。我认为它不能使用单个 row_number 函数来完成。但如果我错过了什么,请告诉我或写一个答案:)

标签: dataframe apache-spark pyspark apache-spark-sql


【解决方案1】:

这是另一种使用数组函数和结构排序而不是窗口的方法:

from pyspark.sql import functions as F

other_cols = ["day", "A", "B", "C"]

df_1 = df_1.groupBy("code").agg(
    F.collect_list(F.struct(*other_cols)).alias("values")
).selectExpr(
    "code",
    *[f"array_max(filter(values, x-> x.{c} is not null))['{c}'] as {c}" for c in other_cols]
)

df_1.show()
#+----+---+---+---+----+
#|code|day|  A|  B|   C|
#+----+---+---+---+----+
#|  10|  2| 55|  a|null|
#|  30|  3| 99|  d|null|
#|  20|  3| 66|  c|  12|
#+----+---+---+---+----+

【讨论】:

    【解决方案2】:

    我认为您当前的解决方案非常好。如果您想要其他解决方案,可以尝试使用first/last 窗口函数:

    from pyspark.sql import functions as F, Window
    
    w = Window.partitionBy("code").orderBy(F.col("day").desc())
    
    
    df2 = (
        df.select(
            "day",
            "code",
            F.row_number().over(w).alias("rwnb"),
            *(
                F.first(F.col(col), ignorenulls=True)
                .over(w.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))
                .alias(col)
                for col in ("A", "B", "C")
            ),
        )
        .where("rwnb = 1")
        .drop("rwnb")
    )
    

    结果:

    df2.show()
    
    +---+----+---+---+----+
    |day|code|  A|  B|   C|
    +---+----+---+---+----+
    |  2|  10| 55|  a|null|
    |  3|  30| 99|  d|null|
    |  3|  20| 66|  c|  12|
    +---+----+---+---+----+
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2014-11-10
      • 2017-10-08
      • 2019-12-26
      • 1970-01-01
      • 1970-01-01
      • 2018-08-17
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多