【问题标题】:Compare column value with 3 array list and replace the column value with array name将列值与 3 个数组列表进行比较,并将列值替换为数组名称
【发布时间】:2020-05-27 12:03:34
【问题描述】:
from pyspark.sql.functions import when,col
from pyspark.sql.functions import udf
#Your code here to create a new variable df_kmeans_new with a new column Position_Group,..
from pyspark.sql.types import *

#Your code to complete
DEF= ["LB","LWB","RB","LCB","RCB","CB","RWB"]
FWD=  ["RF","LF","LW","RS","RW","LS","CF","ST"]
MID=  ["LCM","LM","RDM","CAM","RAM","RCM","CM","CDM","RM","LAM","LDM"]

df  = spark.createDataFrame(
    [(1, "LB", "4"), 
      (2, "LM", "0"), 
       (3, "LCB", "4"), 
        (4, "RS", "4")],
          ("id", "Position", "Position_x"))

def check_in_def(cell_val):
    if cell_val in DEF:
      return "DEF"
    elif cell_val in FWD:
      return "FWD"
    elif cell_val in MID:
      return "MID"
    else:
      return "NA"

df = df.withColumn("Position_Group",when(check_in_def(df.Position)=="DEF","DEF").when(check_in_def(df.Position)=="FWD","FWD").otherwise(0)).show()

如果在特定数组中找到 Position col 值,我想在 df 中创建一个新的 col,它将包含 3 个数组名称 DEF、FWD 和 MID 之一。

但代码不起作用..请有人帮忙!

【问题讨论】:

    标签: python apache-spark pyspark data-science


    【解决方案1】:

    您也可以创建字典,然后将其反转,然后使用create_map 将字典映射到新列:

    from itertools import chain
    import pyspark.sql.functions as F
    
    d = {"DEF":DEF,"FWD":FWD,"MID":MID}
    d1 = {i:k for k,v in d.items() for i in v}
    

    mapping = F.create_map([F.lit(x) for x in chain(*d1.items())])
    df.withColumn("Position_Group",mapping[df['Position']]).show()
    
    +---+--------+----------+--------------+
    | id|Position|Position_x|Position_Group|
    +---+--------+----------+--------------+
    |  1|      LB|         4|           DEF|
    |  2|      LM|         0|           MID|
    |  3|     LCB|         4|           DEF|
    |  4|      RS|         4|           FWD|
    +---+--------+----------+--------------+
    

    【讨论】:

      【解决方案2】:

      您的函数不适用于 withColumn,因为它传递的是整个列而不是单个值。通过矢量化代码而不是编写一堆 if 语句:

      from pyspark.sql.functions import when,col
      from pyspark.sql.functions import udf
      #Your code here to create a new variable df_kmeans_new with a new column Position_Group,..
      from pyspark.sql.types import *
      
      #Your code to complete
      dict = {
      'DEF' : ["LB","LWB","RB","LCB","RCB","CB","RWB"]
      ,'FWD' :  ["RF","LF","LW","RS","RW","LS","CF","ST"]
      ,'MID' :  ["LCM","LM","RDM","CAM","RAM","RCM","CM","CDM","RM","LAM","LDM"]
      }
      
      df_map_list = []
      for key, value in dict.items():
        for v in value:
          df_map_list.append((key, v))
      
      df_map  = spark.createDataFrame(df_map_list, ('key', 'Position'))
      
      df  = spark.createDataFrame(
          [(1, "LB", "4"), 
            (2, "LM", "0"), 
             (3, "LCB", "4"), 
              (4, "RS", "4")],
                ("id", "Position", "Position_x"))
      
      df = df.alias('a').join(df_map.alias('b'), col('a.Position') == col('b.Position'), 'left').select(['a.*'] + [col('b.key').alias('Position_Group')])
      
      df.show() 
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2021-06-23
        • 2021-05-18
        • 2021-06-10
        • 2023-01-05
        • 1970-01-01
        • 1970-01-01
        • 2010-09-21
        • 1970-01-01
        相关资源
        最近更新 更多