【问题标题】:PySpark Lag functionPySpark 滞后函数
【发布时间】:2021-07-31 11:02:52
【问题描述】:

设置如下。

from pyspark.sql import Row, functions as F
from pyspark.sql.window import Window
import pandas as pd

data = {'A': [2,2,2,2], 'B': [0.5, 0.5,1,1.5]}
df = pd.DataFrame(data)
ddf = spark.createDataFrame(df)

我需要一个C列。计算C列的逻辑如下。

  • 第一行 C = A - B。
  • 所有其他行 C = lag(C) - B.

我发现很难满足第二种情况。因为上下文中不存在 lag(C)。

(
    ddf  
    .withColumn
    (
        'C',
         F.when(F.lag(F.col('B')).over(Window.partitionBy(F.col('A')).orderBy(F.col('A'))).isNull(), (F.col('A') - F.col('B')))
        .otherwise
        (               
           F.lag(F.col('C')).over(Window.partitionBy(F.col('A')).orderBy(F.col('A'))) - F.col('B') 
            # cannot resolve '`C`' given input columns: [A, B]
        )                
    )  
   .show()
)

正确的结果应该如下。

+---+---+----+
|  A|  B| C  |
+---+---+----+
|  2|0.5|1.5 |
|  2|0.5|1.0 |
|  2|1.0|0.0 |
|  2|1.5|-1.5|
+---+---+----+

希望问题很清楚。如何解决这种情况。

【问题讨论】:

    标签: pyspark


    【解决方案1】:

    您希望从 A 中减去 B 列的累积总和。试试下面的方法。

    请注意,我将顺序作为单调递增的ID,如果您有任何分区列,您可以将其替换为您要保留和分区的排序列。

    w = Window.orderBy(F.monotonically_increasing_id())\
              .rangeBetween(Window.unboundedPreceding,0)
    
    ddf.withColumn("C",F.col("A")-F.sum("B").over(w)).show()
    
    +---+---+----+
    |  A|  B|   C|
    +---+---+----+
    |  2|0.5| 1.5|
    |  2|0.5| 1.0|
    |  2|1.0| 0.0|
    |  2|1.5|-1.5|
    +---+---+----+
    

    【讨论】:

      猜你喜欢
      • 2018-02-08
      • 1970-01-01
      • 2021-12-28
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多