【问题标题】:How to split pyspark dataframe and create new columns如何拆分pyspark数据框并创建新列
【发布时间】:2020-08-04 17:11:59
【问题描述】:

我有如下示例输入数据框,但值(以 m 开头的 clm)列可以是 n 个数字。我还使用 customer_id 作为主键(但是,根据输入数据,我可以拥有更多的主键)。

customer_id|month_id|m1    |m2 |m3 ....to....m_n
1001      |  01    |10     |20    
1002      |  01    |20     |30    
1003      |  01    |30     |40
1001      |  02    |40     |50    
1002      |  02    |50     |60    
1003      |  02    |60     |70
1001      |  03    |70     |80    
1002      |  03    |80     |90    
1003      |  03    |90     |100

现在,根据输入值列 - 我必须根据累积总和或平均值计算新列。让我们考虑一个例子:

cumulative sum on [m1, ......, m10] and 
cumulative avg on [m11, ......., m20] columns 

基于此,我必须计算新列。我已经根据 windows 函数进行了尝试,并且能够计算新列。但是,我的问题是由于数据的大小,我正在使用更新的数据框和新列一个接一个地进行计算。

我的尝试:

a = [m1, ......, m10]
b = [m11, ......, m20]
rnum = (Window.partitionBy("partner_id").orderBy("month_id").rangeBetween(Window.unboundedPreceding, 0))
for item in a:
   var = n
   df = df.withColumn(var + item[1:], F.sum(item).over(rnum))
for item in b:
   var = n
   df = df.withColumn(var + item[1:], F.avg(item).over(rnum))

输出数据:

customer_id|month_id|m1     |m2    |m11     |m12   |n1   |n2  |n11  |n12
1001       |  01    |10     |20    |10      |20    |10   |20  |10   |20
1002       |  01    |20     |30    |10      |20    |20   |30  |10   |20
1003       |  01    |30     |40    |10      |20    |30   |40  |10   |20
1001       |  02    |40     |50    |10      |20    |50   |35  |10   |20
1002       |  02    |50     |60    |10      |20    |70   |55  |10   |20
1003       |  02    |60     |70    |10      |20    |90   |75  |10   |20
1001       |  03    |70     |80    |10      |20    |120  |75  |10   |20
1002       |  03    |80     |90    |10      |20    |150  |105 |10   |20
1003       |  03    |90     |100   |10      |20    |180  |135 |10   |20

但是,我们可以通过将数据框分成两部分来执行相同的操作,其中一个中包含累积总和列,另一个数据框中包含累积平均列以及主键,然后执行操作然后合并计算出的数据框????

【问题讨论】:

    标签: python dataframe pyspark hive pyspark-dataframes


    【解决方案1】:

    DF1 方法优化逻辑计划

    == Optimized Logical Plan ==
    GlobalLimit 21
    +- LocalLimit 21
       +- Project [m1#15, m2#16, sum1#27, sum2#38, customer_id#5334, month_id#5335, m3#5338, m4#5339, avg3#465, avg4#474]
          +- Join Inner, ((customer_id#13 = customer_id#5334) && (month_id#14 = month_id#5335))
             :- Project [customer_id#13, month_id#14, m1#15, m2#16, sum1#27, sum2#38]
             :  +- Filter isnotnull(month_id#14)
             :     +- Window [sum(_w0#39) windowspecdefinition(customer_id#13, month_id#14 ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS sum2#38], [customer_id#13], [month_id#14 ASC NULLS FIRST]
             :        +- Project [customer_id#13, month_id#14, m1#15, m2#16, sum1#27, cast(m2#16 as double) AS _w0#39]
             :           +- Window [sum(_w0#28) windowspecdefinition(customer_id#13, month_id#14 ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS sum1#27], [customer_id#13], [month_id#14 ASC NULLS FIRST]
             :              +- Project [customer_id#13, month_id#14, m1#15, m2#16, cast(m1#15 as double) AS _w0#28]
             :                 +- Filter isnotnull(customer_id#13)
             :                    +- LogicalRDD [customer_id#13, month_id#14, m1#15, m2#16, m3#17, m4#18]
             +- Project [customer_id#5334, month_id#5335, m3#5338, m4#5339, avg3#465, avg4#474]
                +- Filter isnotnull(month_id#5335)
                   +- Window [avg(_w0#475) windowspecdefinition(customer_id#5334, month_id#5335 ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS avg4#474], [customer_id#5334], [month_id#5335 ASC NULLS FIRST]
                      +- Project [customer_id#5334, month_id#5335, m3#5338, m4#5339, avg3#465, cast(m4#5339 as double) AS _w0#475]
                         +- Window [avg(_w0#466) windowspecdefinition(customer_id#5334, month_id#5335 ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS avg3#465], [customer_id#5334], [month_id#5335 ASC NULLS FIRST]
                            +- Project [customer_id#5334, month_id#5335, m3#5338, m4#5339, cast(m3#5338 as double) AS _w0#466]
                               +- Filter isnotnull(customer_id#5334)
                                  +- LogicalRDD [customer_id#5334, month_id#5335, m1#5336, m2#5337, m3#5338, m4#5339]
    
    

    DF 方法优化逻辑计划

    == Optimized Logical Plan ==
    GlobalLimit 21
    +- LocalLimit 21
       +- Project [customer_id#0, month_id#1, m1#2, m2#3, m3#4, m4#5, sum1#14, sum2#25, avg3#447, avg4#460]
          +- Window [avg(_w0#461) windowspecdefinition(customer_id#0, month_id#1 ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS avg4#460], [customer_id#0], [month_id#1 ASC NULLS FIRST]
             +- Project [customer_id#0, month_id#1, m1#2, m2#3, m3#4, m4#5, sum1#14, sum2#25, avg3#447, cast(m4#5 as double) AS _w0#461]
                +- Window [avg(_w0#448) windowspecdefinition(customer_id#0, month_id#1 ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS avg3#447], [customer_id#0], [month_id#1 ASC NULLS FIRST]
                   +- Project [customer_id#0, month_id#1, m1#2, m2#3, m3#4, m4#5, sum1#14, sum2#25, cast(m3#4 as double) AS _w0#448]
                      +- Window [sum(_w0#26) windowspecdefinition(customer_id#0, month_id#1 ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS sum2#25], [customer_id#0], [month_id#1 ASC NULLS FIRST]
                         +- Project [customer_id#0, month_id#1, m1#2, m2#3, m3#4, m4#5, sum1#14, cast(m2#3 as double) AS _w0#26]
                            +- Window [sum(_w0#15) windowspecdefinition(customer_id#0, month_id#1 ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS sum1#14], [customer_id#0], [month_id#1 ASC NULLS FIRST]
                               +- Project [customer_id#0, month_id#1, m1#2, m2#3, m3#4, m4#5, cast(m1#2 as double) AS _w0#15]
                                  +- LogicalRDD [customer_id#0, month_id#1, m1#2, m2#3, m3#4, m4#5]
    

    如果你看到上面的DF Approach Optimized Logical Plan,它在AVG计算时有SUM计算计划,可能效率低。

    +- Window [sum(_w0#26) windowspecdefinition(customer_id#0, month_id#1 ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS sum2#25], [customer_id#0], [month_id#1 ASC NULLS FIRST]
                         +- Project [customer_id#0, month_id#1, m1#2, m2#3, m3#4, m4#5, sum1#14, cast(m2#3 as double) AS _w0#26]
                            +- Window [sum(_w0#15) windowspecdefinition(customer_id#0, month_id#1 ASC NULLS FIRST, RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS sum1#14],  
    

    您可以尽可能缩小数据框大小并继续计算。同时为 DF1 优化逻辑计划中的两个数据集添加了 join 计划。在许多情况下,连接总是很慢,因此最好尝试通过以下方式调整 Spark 引擎执行环境的性能:

    • code - repartition & cache
    • configs - executor, driver, memoryOverhead, number of cores

    我尝试使用 m1,m2,m3,m4 列的代码。

    # pyspark --driver-memory 1G --executor-memory 2G --executor-cores 1 --num-executors 1
    from pyspark.sql import Row
    import pyspark.sql.functions as F
    from pyspark.sql.window import Window
    
    drow = Row("customer_id","month_id","m1","m2","m3","m4")
    
    data=[drow("1001","01","10","20","10","20"),drow("1002","01","20","30","20","30"),drow("1003","01","30","40","30","40"),drow("1001","02","40","50","40","50"),drow("1002","02","50","60","50","60"),drow("1003","02","60","70","60","70"),drow("1001","03","70","80","70","80"),drow("1002","03","80","90","80","90"),drow("1003","03","90","100","90","100")]
    
    df = spark.createDataFrame(data)
    
    df1=df.select("customer_id","month_id","m3","m4")
    
    a = ["m1","m2"]
    b = ["m3","m4"]
    rnum = (Window.partitionBy("customer_id").orderBy("month_id").rangeBetween(Window.unboundedPreceding, 0))
    for item in a:
        var = "sum"
        df = df.withColumn(var + item[1:], F.sum(item).over(rnum))
    df.show()
    '''
    +-----------+--------+---+---+---+---+-----+-----+
    |customer_id|month_id| m1| m2| m3| m4| sum1| sum2|
    +-----------+--------+---+---+---+---+-----+-----+
    |       1003|      01| 30| 40| 30| 40| 30.0| 40.0|
    |       1003|      02| 60| 70| 60| 70| 90.0|110.0|
    |       1003|      03| 90|100| 90|100|180.0|210.0|
    |       1002|      01| 20| 30| 20| 30| 20.0| 30.0|
    |       1002|      02| 50| 60| 50| 60| 70.0| 90.0|
    |       1002|      03| 80| 90| 80| 90|150.0|180.0|
    |       1001|      01| 10| 20| 10| 20| 10.0| 20.0|
    |       1001|      02| 40| 50| 40| 50| 50.0| 70.0|
    |       1001|      03| 70| 80| 70| 80|120.0|150.0|
    +-----------+--------+---+---+---+---+-----+-----+
    '''
    for item in b:
        var = "avg"
        df = df.withColumn(var + item[1:], F.avg(item).over(rnum))
    df.show()
    
    '''
    +-----------+--------+---+---+---+---+-----+-----+----+----+
    |customer_id|month_id| m1| m2| m3| m4| sum1| sum2|avg3|avg4|
    +-----------+--------+---+---+---+---+-----+-----+----+----+
    |       1003|      01| 30| 40| 30| 40| 30.0| 40.0|30.0|40.0|
    |       1003|      02| 60| 70| 60| 70| 90.0|110.0|45.0|55.0|
    |       1003|      03| 90|100| 90|100|180.0|210.0|60.0|70.0|
    |       1002|      01| 20| 30| 20| 30| 20.0| 30.0|20.0|30.0|
    |       1002|      02| 50| 60| 50| 60| 70.0| 90.0|35.0|45.0|
    |       1002|      03| 80| 90| 80| 90|150.0|180.0|50.0|60.0|
    |       1001|      01| 10| 20| 10| 20| 10.0| 20.0|10.0|20.0|
    |       1001|      02| 40| 50| 40| 50| 50.0| 70.0|25.0|35.0|
    |       1001|      03| 70| 80| 70| 80|120.0|150.0|40.0|50.0|
    +-----------+--------+---+---+---+---+-----+-----+----+----+
    '''
    
    for item in b:
        var = "avg"
        df1 = df1.withColumn(var + item[1:], F.avg(item).over(rnum))
    
    '''
    +-----------+--------+---+---+----+----+
    |customer_id|month_id| m3| m4|avg3|avg4|
    +-----------+--------+---+---+----+----+
    |       1003|      01| 30| 40|30.0|40.0|
    |       1003|      02| 60| 70|45.0|55.0|
    |       1003|      03| 90|100|60.0|70.0|
    |       1002|      01| 20| 30|20.0|30.0|
    |       1002|      02| 50| 60|35.0|45.0|
    |       1002|      03| 80| 90|50.0|60.0|
    |       1001|      01| 10| 20|10.0|20.0|
    |       1001|      02| 40| 50|25.0|35.0|
    |       1001|      03| 70| 80|40.0|50.0|
    +-----------+--------+---+---+----+----+
    '''
    #join the DFs after DF1 avg & DF sum calculation.
    
    df2=df.join(df1,(df1.customer_id == df.customer_id)& (df1.month_id == df.month_id)).drop(df.m3).drop(df.m4).drop(df1.month_id).drop(df1.customer_id)
    
    '''
    df2.show()
    +---+---+-----+-----+-----------+--------+---+---+----+----+
    | m1| m2| sum1| sum2|customer_id|month_id| m3| m4|avg3|avg4|
    +---+---+-----+-----+-----------+--------+---+---+----+----+
    | 10| 20| 10.0| 20.0|       1001|      01| 10| 20|10.0|20.0|
    | 70| 80|120.0|150.0|       1001|      03| 70| 80|40.0|50.0|
    | 40| 50| 50.0| 70.0|       1001|      02| 40| 50|25.0|35.0|
    | 80| 90|150.0|180.0|       1002|      03| 80| 90|50.0|60.0|
    | 50| 60| 70.0| 90.0|       1002|      02| 50| 60|35.0|45.0|
    | 20| 30| 20.0| 30.0|       1002|      01| 20| 30|20.0|30.0|
    | 30| 40| 30.0| 40.0|       1003|      01| 30| 40|30.0|40.0|
    | 90|100|180.0|210.0|       1003|      03| 90|100|60.0|70.0|
    | 60| 70| 90.0|110.0|       1003|      02| 60| 70|45.0|55.0|
    +---+---+-----+-----+-----------+--------+---+---+----+----+
    '''
    
    

    【讨论】:

    • 感谢您的回复。我有一点。我们可以准备一个表达式来计算所有新列,然后将 expr 传递给 selectExpr() 以一次计算所有新列吗???
    • 仅供参考 - Crt 一:stackoverflow.com/questions/63265792/…
    • 今天晚些时候让我看看。
    【解决方案2】:

    根据您的问题,我的理解是您正在尝试拆分操作以并行执行任务并节省时间。

    您不必并行化执行,因为当您执行任何操作(例如 collect()、show()、count()、写入您创建的数据帧时,会在 spark 中自动并行化执行)。这是由于 spark 的延迟执行

    如果您出于其他原因仍想拆分操作,可以使用线程。下面的文章将为您提供有关 pyspark 中线程的更多信息:https://medium.com/@everisUS/threads-in-pyspark-a6e8005f6017

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-08-02
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-01-13
      • 1970-01-01
      • 2018-12-10
      • 2020-07-05
      相关资源
      最近更新 更多