【问题标题】:pyspark lag function (based on column)pyspark 滞后函数(基于列)
【发布时间】:2018-02-08 05:05:44
【问题描述】:

我想实现以下目标

lag(column1,datediff(column2,column3)).over(window)

偏移量是动态的。我也尝试过使用UDF,但它没有用。

对于如何实现上述任何想法?

【问题讨论】:

标签: pyspark


【解决方案1】:

lag 函数的参数count 采用整数而不是列对象:

psf.lag(col, count=1, default=None)

因此它不能是“动态”值。 相反,您可以在列中构建滞后,然后将表与自身连接。

首先让我们创建我们的数据框:

df = spark.createDataFrame(
    sc.parallelize(
        [[1, "2011-01-01"], [1, "2012-01-01"], [2, "2013-01-01"], [1, "2014-01-01"]]
    ), 
    ["int", "date"]
)

我们要枚举行:

from pyspark.sql import Window
import pyspark.sql.functions as psf
df = df.withColumn(
    "id", 
    psf.monotonically_increasing_id()
)
w = Window.orderBy("id")
df = df.withColumn("rn", psf.row_number().over(w))
    +---+----------+-----------+---+
    |int|      date|         id| rn|
    +---+----------+-----------+---+
    |  1|2011-01-01|17179869184|  1|
    |  1|2012-01-01|42949672960|  2|
    |  2|2013-01-01|68719476736|  3|
    |  1|2014-01-01|94489280512|  4|
    +---+----------+-----------+---+

现在构建滞后:

df1 = df.select(
    "int", 
    df.date.alias("date1"), 
    (df.rn - df.int).alias("rn")
)
df2 = df.select(
    df.date.alias("date2"), 
    'rn'
)

最后我们可以加入它们并计算日期差:

df1.join(df2, "rn", "inner").withColumn(
    "date_diff", 
    psf.datediff("date1", "date2")
).drop("rn")

    +---+----------+----------+---------+
    |int|     date1|     date2|date_diff|
    +---+----------+----------+---------+
    |  1|2012-01-01|2011-01-01|      365|
    |  2|2013-01-01|2011-01-01|      731|
    |  1|2014-01-01|2013-01-01|      365|
    +---+----------+----------+---------+

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-06-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-10-02
    • 2020-02-06
    相关资源
    最近更新 更多