【发布时间】:2021-09-05 06:32:56
【问题描述】:
Python:
我有一个数据框,我正在应用一个 lambda 函数来根据列的值检查条件。
In Pandas it looks like this(Example):
new_df = df1.merge(df2, how='left', left_on='lkey', right_on='rkey')
lkey value_x rkey value_y col1 col2 col3 col4 col5
0 foo one foo five 0 1 3 0 5
1 foo one foo NaN 1 0 2 4 0
2 bar two bar six 2 6 3 0 0
3 foo five foo five 7 2 0 0 0
4 foo five foo NaN 2 0 0 0 0
5 bbb four bar two 0 0 0 0 0
def get_final_au(row):
if row['col5'] == 0:
if row['col4'] == 0:
if row['col3'] == 0:
if row['col2'] == 0:
return 'NOT FOUND'
else:
return row['col2']
else:
return row['col3']
else:
return row['col4']
else:
return row['col5']
new_df['col6'] = new_df.apply (lambda row: get_final_au(row),axis=1)
Expected Output:
lkey value_x rkey value_y col1 col2 col3 col4 col5 col6
0 foo one foo five 0 1 3 0 5 5
1 foo one foo NaN 1 0 2 4 0 4
2 bar two bar six 2 6 3 0 0 3
3 foo five foo five 7 2 0 0 0 2
4 foo five foo NaN 2 0 0 0 0 Not FOUND
5 bbb four bar two 0 0 0 0 0 Not FOUND
Pyspark:
我如何在 Pyspark 中做类似的事情?
new_df = new_df.withColumn('col6', ?)
我已经尝试过了,但出现错误。请推荐
from pyspark.sql.functions import udf
def get_final_au(row):
if row['col5'] != 0:
return row['col5']
elif row['col4'] != 0:
return row['col4']
elif row['col3'] != 0:
return row['col3']
elif row['col2'] != 0:
return row['col2']
else:
return 'NOT FOUND'
UDF_NAME = udf(lambda row: get_final_au(row), StringType())
new_df.withColumn('col6', UDF_NAME('col5','col4','col3','col2')).show(2,False)
【问题讨论】:
标签: python pyspark apache-spark-sql user-defined-functions