【问题标题】:Pandas udf loop over PySpark dataframe rowsPandas udf 循环遍历 PySpark 数据帧行
【发布时间】:2021-05-16 08:30:43
【问题描述】:

我正在尝试使用pandas_udf,因为我的数据位于 PySpark 数据框中,但我想使用 pandas 库。我有很多行,所以我无法将 PySpark 数据帧转换为 Pandas 数据帧。

我使用 textdistance (pip3 install textdistance) 并导入它:import textdistance

test = spark.createDataFrame(
    [('dog cat', 'dog cat'), 
     ('cup dad', 'mug'),],
    ['value1', 'value2']
)

@pandas_udf('float', PandasUDFType.SCALAR)
def textdistance_jaro_winkler(a, b):
    return textdistance.jaro_winkler(a, b)

test = test.withColumn('jaro_winkler', textdistance_jaro_winkler(col('value1'), col('value2')))
test.show()

我收到以下错误:

ValueError:Series 的真值不明确。使用 a.empty、a.bool()、a.item()、a.any() 或 a.all()。

我试图将整个数据帧作为参数传递给函数,并在函数中传递字符串值,但我相信这会让事情变得更糟:

schema = StructType([StructField("value1", StringType(), True)
                     ,StructField("value2", StringType(), True)
                     ,StructField("jaro_winkler", FloatType(), True)
                    ])

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def textdistance_jaro_winkler(df):
    df['jaro_winkler'] = df.apply(lambda x: textdistance.jaro_winkler(x['value1'],  x['value2']))
    
    return df

【问题讨论】:

  • 如果您使用udf 会怎样?和@udf(DoubleType())一样,行吗?

标签: pandas dataframe apache-spark pyspark user-defined-functions


【解决方案1】:

一个普通的 Python UDF 就可以完成这项工作:

import pyspark.sql.functions as F
import textdistance

test2 = test.withColumn(
    'jaro_winkler',
    F.udf(textdistance.jaro_winkler)('value1', 'value2').cast('float')
)

test2.show()
+-------+-------+------------+
| value1| value2|jaro_winkler|
+-------+-------+------------+
|dog cat|dog cat|         1.0|
|cup dad|    mug|   0.4920635|
+-------+-------+------------+

但使用 pandas UDF 并非不可能......

import textdistance

def textdistance_jaro_winkler(iterator):
    for df in iterator:
        df['jaro_winkler'] = df.apply(lambda x: textdistance.jaro_winkler(x['value1'], x['value2']), axis=1)
        yield df

test2 = test.mapInPandas(textdistance_jaro_winkler, 'value1 string, value2 string, jaro_winkler float')

test2.show()
+-------+-------+------------+
| value1| value2|jaro_winkler|
+-------+-------+------------+
|dog cat|dog cat|         1.0|
|cup dad|    mug|   0.4920635|
+-------+-------+------------+

【讨论】:

    【解决方案2】:

    您需要重写您的函数以使用 pandas UDF Series to Series

    import pandas as pd
    import textdistance
    from pyspark.sql import functions as F
    
    def textdistance_jaro_winkler(a: pd.Series, b: pd.Series) -> pd.Series:
        return pd.Series([textdistance.jaro_winkler(x, y) for x, y in zip(a, b)])
    
    
    jaro_winkler_udf = F.pandas_udf(textdistance_jaro_winkler, returnType=FloatType())
    
    test = test.withColumn('jaro_winkler', jaro_winkler_udf(col('value1'), col('value2')))
    test.show()
    
    #+-------+-------+------------+
    #| value1| value2|jaro_winkler|
    #+-------+-------+------------+
    #|dog cat|dog cat|         1.0|
    #|cup dad|    mug|   0.4920635|
    #+-------+-------+------------+
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-12-16
      • 1970-01-01
      • 2022-01-20
      • 2020-11-21
      • 2021-12-15
      • 2021-12-28
      • 2021-04-27
      • 2021-11-22
      相关资源
      最近更新 更多