【问题标题】:Add new column in Pyspark dataframe based on where condition on other column根据其他列上的条件在 Pyspark 数据框中添加新列
【发布时间】:2019-01-31 03:07:49
【问题描述】:

我有一个 Pyspark 数据框如下:

+------------+-------------+--------------------+
|package_id  | location    | package_scan_code  | 
+------------+-------------+--------------------+
|123         | Denver      |05                  |  
|123         | LosAngeles  |03                  |  
|123         | Dallas      |09                  |  
|123         | Vail        |02                  | 
|456         | Jacksonville|05                  |  
|456         | Nashville   |09                  |
|456         | Memphis     |03                  |

"package_scan_code" 03 代表包裹的来源。

我想在这个数据帧中添加一列“origin”,这样对于每个包(由“package_id”标识),新添加的 origin 列中的值将与对应于“package_scan_code”03 的位置相同。

在上面的例子中,有两个唯一的包 123 和 456,它们的来源分别是 LosAngeles 和 Memphis(对应 package_scan_code 03)。

所以我希望我的输出如下:

+------------+-------------+--------------------+------------+
| package_id |location     | package_scan_code  |origin      |
+------------+-------------+--------------------+------------+
|123         | Denver      |05                  | LosAngeles |
|123         | LosAngeles  |03                  | LosAngeles |
|123         | Dallas      |09                  | LosAngeles |
|123         | Vail        |02                  | LosAngeles |
|456         | Jacksonville|05                  |  Memphis   |
|456         | Nashville   |09                  |  Memphis   |
|456         | Memphis     |03                  |  Memphis   |

如何在 Pyspark 中实现这一点?我尝试了.withColumn 方法,但我无法获得正确的条件。

【问题讨论】:

    标签: python apache-spark pyspark apache-spark-sql pyspark-sql


    【解决方案1】:

    通过package_scan_code == '03'过滤数据框,然后加入原始数据框:

    (df.filter(df.package_scan_code == '03')
       .selectExpr('package_id', 'location as origin')
       .join(df, ['package_id'], how='right')
       .show())
    +----------+----------+------------+-----------------+
    |package_id|    origin|    location|package_scan_code|
    +----------+----------+------------+-----------------+
    |       123|LosAngeles|      Denver|               05|
    |       123|LosAngeles|  LosAngeles|               03|
    |       123|LosAngeles|      Dallas|               09|
    |       123|LosAngeles|        Vail|               02|
    |       456|   Memphis|Jacksonville|               05|
    |       456|   Memphis|   Nashville|               09|
    |       456|   Memphis|     Memphis|               03|
    +----------+----------+------------+-----------------+
    

    注意:这里假设每个package_id 最多有一个package_scan_code 等于03,否则逻辑将不正确,您需要重新考虑如何定义origin

    【讨论】:

    • 你以最简单的方式接近它。不错。
    • @Psidom 我收到一条错误消息:AnalysisException: u'Cannot resolve column name "package_id" among (_col0, _col2,....);' 我尝试打印出架构并且列名正确显示,我还尝试使用修剪功能删除空格,但它们都不能解决问题
    • 所以你连df.select('package_id')都做不到?
    • 我可以选择。这是连接语句失败。看起来很奇怪。
    • 我解决了这个问题。 Spark 或 Pyspark 中似乎存在某种错误,我不得不使用 alias 重命名列名。所以声明变成了(df.filter(df.package_scan_code == '03') .select(col('package_id').alias('package_id'), col('location').alias('origin')) .join(df, ['package_id'], how='right') .show())。这有效!
    【解决方案2】:

    无论数据框中的每个package_id 出现多少次package_scan_code=03,此代码都应该有效。我又添加了一个 (123,'LosAngeles','03') 来证明 -

    第一步:创建DataFrame

    values = [(123,'Denver','05'),(123,'LosAngeles','03'),(123,'Dallas','09'),(123,'Vail','02'),(123,'LosAngeles','03'),
              (456,'Jacksonville','05'),(456,'Nashville','09'),(456,'Memphis','03')]
    df = sqlContext.createDataFrame(values,['package_id','location','package_scan_code'])
    

    第 2 步:创建package_idlocation 的字典。

    df_count = df.where(col('package_scan_code')=='03').groupby('package_id','location').count()
    dict_location_scan_code = dict(df_count.rdd.map(lambda x: (x['package_id'], x['location'])).collect())
    print(dict_location_scan_code)
        {456: 'Memphis', 123: 'LosAngeles'}
    

    第 3 步:创建列,映射字典。

    from pyspark.sql.functions import col, create_map, lit
    from itertools import chain
    mapping_expr = create_map([lit(x) for x in chain(*dict_location_scan_code.items())])
    df = df.withColumn('origin', mapping_expr.getItem(col('package_id')))
    df.show()
    +----------+------------+-----------------+----------+
    |package_id|    location|package_scan_code|    origin|
    +----------+------------+-----------------+----------+
    |       123|      Denver|               05|LosAngeles|
    |       123|  LosAngeles|               03|LosAngeles|
    |       123|      Dallas|               09|LosAngeles|
    |       123|        Vail|               02|LosAngeles|
    |       123|  LosAngeles|               03|LosAngeles|
    |       456|Jacksonville|               05|   Memphis|
    |       456|   Nashville|               09|   Memphis|
    |       456|     Memphis|               03|   Memphis|
    +----------+------------+-----------------+----------+
    

    【讨论】:

    • 步骤“第 2 步:创建 package_id 和位置的字典”。将数据移动到驱动程序,这对于大型数据集非常危险并且可能需要相当长的时间。你为什么不做一个简单的连接(给定你的withColumnmapping_expr)?
    • 尽管您希望教育和帮助 Jacek,但感谢您投反对票。深表感谢。
    • 感谢您的解决方案@cph_sto。正如我评论@Psidom 的解决方案时,有一个错误要求我使用alias 重命名列。重命名后,您的解决方案也会奏效。他只是一种更简单的方法。
    • PreethiS 当然,继续进行您认为最适合您需求的方法。诚挚的,
    • @cph_sto,我有一个问题stackoverflow.com/questions/62318004/…
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-04-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-08-25
    • 1970-01-01
    相关资源
    最近更新 更多