【问题标题】:Implement lambda function from python to pyspark-Pyspark从python实现lambda函数到pyspark-Pyspark
【发布时间】:2021-09-05 06:32:56
【问题描述】:

Python:

我有一个数据框,我正在应用一个 lambda 函数来根据列的值检查条件。

In Pandas it looks like this(Example):

new_df = df1.merge(df2, how='left', left_on='lkey', right_on='rkey')

  lkey value_x rkey value_y col1 col2 col3 col4 col5 
0  foo     one  foo    five  0    1     3    0   5    
1  foo     one  foo     NaN  1    0     2    4   0    
2  bar     two  bar     six  2    6     3    0   0    
3  foo    five  foo    five  7    2     0    0   0    
4  foo    five  foo     NaN  2    0     0    0   0   
5  bbb    four  bar    two   0    0     0    0   0      

def get_final_au(row):
    if row['col5'] == 0: 
        if row['col4'] == 0: 
            if row['col3'] == 0: 
                if row['col2'] == 0: 
                    return 'NOT FOUND'
                else:
                    return row['col2']
            else:
                return row['col3']
        else:
            return row['col4']
    else:
         return row['col5']


new_df['col6'] = new_df.apply (lambda row: get_final_au(row),axis=1)


Expected Output:

  lkey value_x rkey value_y col1 col2 col3 col4 col5 col6
0  foo     one  foo    five  0    1     3    0   5    5
1  foo     one  foo     NaN  1    0     2    4   0    4
2  bar     two  bar     six  2    6     3    0   0    3
3  foo    five  foo    five  7    2     0    0   0    2
4  foo    five  foo     NaN  2    0     0    0   0   Not FOUND
5  bbb    four  bar    two   0    0     0    0   0   Not FOUND

Pyspark:

我如何在 Pyspark 中做类似的事情?

new_df = new_df.withColumn('col6', ?)

我已经尝试过了,但出现错误。请推荐


from pyspark.sql.functions import udf
def get_final_au(row):
    if row['col5'] != 0:
        return row['col5']
    elif row['col4'] != 0:
        return row['col4']
    elif row['col3'] != 0:
        return row['col3']
    elif row['col2'] != 0:
        return row['col2']
    else:
        return 'NOT FOUND'
UDF_NAME = udf(lambda row: get_final_au(row), StringType())
new_df.withColumn('col6', UDF_NAME('col5','col4','col3','col2')).show(2,False)

【问题讨论】:

    标签: python pyspark apache-spark-sql user-defined-functions


    【解决方案1】:

    我认为你可以使用 UDF 函数 OR when 子句。 when 子句会更容易。

    UDF

    的语法如下
    from pyspark.sql.functions import udf
    
    def function_name(arg):
        # Logic
        # Return value
    
    # Register the UDF
    UDF_NAME = udf(function_name, ArgType())
    
    df.select(UDF_NAME('col').alias('new_col'))
    

    for when子句

        df.withColumn("new_column", when(condition1, value).when(condition2, value).otherwise(value))
    

    【讨论】:

      【解决方案2】:

      可能重复:Apply a function to a single column of a csv in Spark

      建议:

      get_final_au修改为此

      def get_final_au(row):
          if row['col2'] != 0:
              return row['col2']
          elif row['col3'] != 0:
              return row['col3']
          elif row['col4'] != 0:
              return row['col4']
          elif row['col5'] != 0:
              return row['col5']
          else:
              return 'NOT FOUND'
      

      【讨论】:

      • 好的,谢谢。您能否建议我是否需要在 pyspark 数据框中调用上述修改后的函数,因为它给出了错误:new_df = new_df.withColumn('col6', udf(lambda row: get_final_au(row)))
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-08-22
      • 2019-11-28
      相关资源
      最近更新 更多