【问题标题】:How to add columns in pyspark dataframe dynamically如何在pyspark数据框中动态添加列
【发布时间】:2018-06-16 11:57:26
【问题描述】:

我正在尝试根据输入变量 vIssueCols 添加几列

from pyspark.sql import HiveContext
from pyspark.sql import functions as F
from pyspark.sql.window import Window
vIssueCols=['jobid','locid']
vQuery1 = 'vSrcData2= vSrcData'
vWindow1 = Window.partitionBy("vKey").orderBy("vOrderBy")
for x in vIssueCols:
Query1=vQuery1+'.withColumn("'+x+'_prev",F.lag(vSrcData.'+x+').over(vWindow1))'

exec(vQuery1)

现在上面的查询将生成如下所示的 vQuery1,它正在工作,但是

vSrcData2= vSrcData.withColumn("jobid_prev",F.lag(vSrcData.jobid).over(vWindow1)).withColumn("locid_prev",F.lag(vSrcData.locid).over(vWindow1))

我不能写一个类似的查询

vSrcData2= vSrcData.withColumn(x+"_prev",F.lag(vSrcData.x).over(vWindow1))for x in vIssueCols

并使用循环语句生成列。一些博客建议添加一个 udf 并调用它,但是使用 udf 我将使用上面的执行字符串方法。

【问题讨论】:

    标签: pyspark window-functions


    【解决方案1】:

    您可以使用reduce 构建您的查询。

    from pyspark.sql.functions import lag
    from pyspark.sql.window import Window
    from functools import reduce
    
    #sample data
    df = sc.parallelize([[1, 200, '1234', 'asdf'],
                         [1, 50, '2345', 'qwerty'],
                         [1, 100, '4567', 'xyz'],
                         [2, 300, '123', 'prem'],
                         [2, 10, '000', 'ankur']]).\
        toDF(["vKey","vOrderBy","jobid","locid"])
    df.show()
    
    vWindow1 = Window.partitionBy("vKey").orderBy("vOrderBy")
    
    #your existing processing
    df1= df.\
        withColumn("jobid_prev",lag(df.jobid).over(vWindow1)).\
        withColumn("locid_prev",lag(df.locid).over(vWindow1))
    df1.show()
    
    #to-be processing
    vIssueCols=['jobid','locid']
    df2 = (reduce(
        lambda r_df, col_name: r_df.withColumn(col_name+"_prev", lag(r_df[col_name]).over(vWindow1)),
        vIssueCols,
        df
    ))
    df2.show()
    

    样本数据:

    +----+--------+-----+------+
    |vKey|vOrderBy|jobid| locid|
    +----+--------+-----+------+
    |   1|     200| 1234|  asdf|
    |   1|      50| 2345|qwerty|
    |   1|     100| 4567|   xyz|
    |   2|     300|  123|  prem|
    |   2|      10|  000| ankur|
    +----+--------+-----+------+
    

    输出:

    +----+--------+-----+------+----------+----------+
    |vKey|vOrderBy|jobid| locid|jobid_prev|locid_prev|
    +----+--------+-----+------+----------+----------+
    |   1|      50| 2345|qwerty|      null|      null|
    |   1|     100| 4567|   xyz|      2345|    qwerty|
    |   1|     200| 1234|  asdf|      4567|       xyz|
    |   2|      10|  000| ankur|      null|      null|
    |   2|     300|  123|  prem|       000|     ankur|
    +----+--------+-----+------+----------+----------+
    

    希望这会有所帮助!

    【讨论】:

    • 完美。动态追加列的好方法。谢谢
    • 很高兴它有帮助!
    • 如果我想从列中采样一些值,我们可以使用相同的技术来代替滞后函数吗?
    • @RajarshiBhadra 我猜是的。但是,如果您希望我们查看您的要求,那么也许您可以发布一个单独的问题并在此处分享链接。
    猜你喜欢
    • 2021-07-09
    • 1970-01-01
    • 1970-01-01
    • 2022-08-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-11-25
    • 1970-01-01
    相关资源
    最近更新 更多