【问题标题】:Divide Pyspark Dataframe Column by Column in other Pyspark Dataframe when ID Matches当 ID 匹配时,在其他 Pyspark 数据帧中按列划分 Pyspark 数据帧列
【发布时间】:2017-04-07 21:38:31
【问题描述】:

我有一个 PySpark 数据帧 df1,它看起来像:

CustomerID  CustomerValue
12          .17
14          .15
14          .25
17          .50
17          .01
17          .35

我有第二个 PySpark 数据帧 df2,它是按 CustomerID 分组并由 sum 函数聚合的 df1。它看起来像这样:

 CustomerID  CustomerValueSum
 12          .17
 14          .40
 17          .86

我想向 df1 添加第三列,即 df1['CustomerValue'] 除以 df2['CustomerValueSum'] 以获得相同的 CustomerID。这看起来像:

CustomerID  CustomerValue  NormalizedCustomerValue
12          .17            1.00
14          .15            .38
14          .25            .62
17          .50            .58
17          .01            .01
17          .35            .41

换句话说,我正在尝试将此 Python/Pandas 代码转换为 PySpark:

normalized_list = []
for idx, row in df1.iterrows():
    (
        normalized_list
        .append(
            row.CustomerValue / df2[df2.CustomerID == row.CustomerID].CustomerValueSum
        )
    )
df1['NormalizedCustomerValue'] = [val.values[0] for val in normalized_list]

我该怎么做?

【问题讨论】:

    标签: python pyspark spark-dataframe


    【解决方案1】:

    代码:

    import pyspark.sql.functions as F
    
    df1 = df1\
        .join(df2, "CustomerID")\
        .withColumn("NormalizedCustomerValue", (F.col("CustomerValue") / F.col("CustomerValueSum")))\
        .drop("CustomerValueSum")
    

    输出:

    df1.show()
    
    +----------+-------------+-----------------------+
    |CustomerID|CustomerValue|NormalizedCustomerValue|
    +----------+-------------+-----------------------+
    |        17|          0.5|     0.5813953488372093|
    |        17|         0.01|   0.011627906976744186|
    |        17|         0.35|     0.4069767441860465|
    |        12|         0.17|                    1.0|
    |        14|         0.15|    0.37499999999999994|
    |        14|         0.25|                  0.625|
    +----------+-------------+-----------------------+
    

    【讨论】:

      【解决方案2】:

      这也可以使用 Spark Window 函数来实现,您无需使用聚合值 (df2) 创建单独的数据框:

      为输入数据框创建数据:

      from pyspark.sql import HiveContext
      sqlContext = HiveContext(sc)
      
      data =[(12, 0.17), (14, 0.15), (14, 0.25), (17, 0.5), (17, 0.01), (17, 0.35)]
      df1 = sqlContext.createDataFrame(data, ['CustomerID', 'CustomerValue'])
      df1.show()
      +----------+-------------+
      |CustomerID|CustomerValue|
      +----------+-------------+
      |        12|         0.17|
      |        14|         0.15|
      |        14|         0.25|
      |        17|          0.5|
      |        17|         0.01|
      |        17|         0.35|
      +----------+-------------+
      

      定义一个按 CustomerID 分区的窗口:

      from pyspark.sql import Window
      from pyspark.sql.functions import sum
      
      w = Window.partitionBy('CustomerID')
      
      df2 = df1.withColumn('NormalizedCustomerValue', df1.CustomerValue/sum(df1.CustomerValue).over(w)).orderBy('CustomerID')
      
      df2.show()
      +----------+-------------+-----------------------+
      |CustomerID|CustomerValue|NormalizedCustomerValue|
      +----------+-------------+-----------------------+
      |        12|         0.17|                    1.0|
      |        14|         0.15|    0.37499999999999994|
      |        14|         0.25|                  0.625|
      |        17|          0.5|     0.5813953488372093|
      |        17|         0.01|   0.011627906976744186|
      |        17|         0.35|     0.4069767441860465|
      +----------+-------------+-----------------------+
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2017-05-18
        • 2020-06-10
        • 2022-12-17
        • 1970-01-01
        • 2021-06-25
        • 1970-01-01
        • 2020-07-10
        • 2020-02-10
        相关资源
        最近更新 更多