【问题标题】:How can I use a function in dataframe withColumn function in Pyspark?如何在 Pyspark 中使用数据框中的函数 withColumn 函数?
【发布时间】:2017-10-30 18:26:41
【问题描述】:

我定义了一些字典和函数:

dict_TEMPERATURE = {(0, 70): 'Low', (70.01, 73.99): 'Normal-Low',(74, 76): 'Normal', (76.01, 80): 'Normal-High', (80.01, 300): 'High'}
...
hierarchy_dict = {'TEMP': dict_TEMPERATURE, 'PRESS': dict_PRESSURE, 'SH_SP': dict_SHAFT_SPEED, 'POI': dict_POI, 'TRIG': dict_TRIGGER}



def function_definition(valor, atributo):

    dict_atributo = hierarchy_dict[atributo]
    valor_generalizado = None

    if isinstance(valor, (int, long, float, complex)):

        for key, value in dict_atributo.items():

            if(isinstance(key, tuple)):
                lista = list(key)

                if (valor > key[0] and valor < key[1]):
                    valor_generalizado = value

    else: # if it is not numeric
        valor_generalizado = dict_atributo.get(valor)


    return valor_generalizado

这个函数主要做的是:检查作为参数传递给“function_definition”函数的值,并根据其字典的引用替换它的值。

所以,如果我调用“function_definition(60, 'TEMP')”,它将返回 'LOW'。

另一方面,我有一个具有下一个结构的数据框(这是一个示例):

+----+-----+-----+---+----+
|TEMP|SH_SP|PRESS|POI|TRIG|
+----+-----+-----+---+----+
|   0|    1|    2|  0|   0|
|   0|    2|    3|  1|   1|
|   0|    3|    4|  2|   1|
|   0|    4|    5|  3|   1|
|   0|    5|    6|  4|   1|
|   0|    1|    2|  5|   1|
+----+-----+-----+---+----+

我要做的是根据上面定义的函数替换数据框一列的值,所以我有下一个代码行:

dataframe_new = dataframe.withColumn(atribute_name, function_definition(dataframe[atribute_name], atribute_name))

但我在执行时收到下一条错误消息:

AssertionError: col should be Column

我的代码有什么问题?怎么会这样?

【问题讨论】:

    标签: function apache-spark dataframe replace pyspark


    【解决方案1】:

    您的 function_definition(valor,atributo) 为单个 valor 返回单个字符串 (valor_generalizado)。

    AssertionError: col should be Column 表示您正在向WithColumn(colName,col) 传递一个不是列的参数。 因此,您必须转换您的数据,以便拥有 Column,例如如下所示。

    以数据框为例(与您的结构相同):

    a = [(10.0,1.2),(73.0,4.0)] # like your dataframe, this is only an example
    
    dataframe = spark.createDataFrame(a,["tp", "S"]) # tp and S are random names for these columns
    
    dataframe.show()
    +----+---+
    |  tp|  S|
    +----+---+
    |10.0|1.2|
    |73.0|4.0|
    +----+---+
    

    如你所见here

    udf 创建一个表示用户定义函数 (UDF) 的 Column 表达式。

    解决方案:

    from pyspark.sql.functions import udf
    
    attr = 'TEMP'
    udf_func = udf(lambda x: function_definition(x,attr),returnType=StringType())
    
    dataframe_new = dataframe.withColumn("newCol",udf_func(dataframe.tp))
    dataframe_new.show()
    
    +----+---+----------+
    |  tp|  S|    newCol|
    +----+---+----------+
    |10.0|1.2|       Low|
    |73.0|4.0|Normal-Low|
    +----+---+----------+
    

    【讨论】:

    • 非常感谢!这就是我要找的东西!
    猜你喜欢
    • 1970-01-01
    • 2021-02-26
    • 2020-04-06
    • 2020-07-07
    • 1970-01-01
    • 2020-04-25
    • 2021-10-14
    • 2020-04-21
    • 1970-01-01
    相关资源
    最近更新 更多