【问题标题】:Transforming Python Lambda function without return value to Pyspark将没有返回值的 Python Lambda 函数转换为 Pyspark
【发布时间】:2019-11-28 12:24:35
【问题描述】:

我在 Python 中有一个有效的 lambda 函数,它计算 dataset1 中的每个字符串与 dataset2 中的字符串之间的最高相似度。在迭代期间,它将字符串、最佳匹配和相似度以及其他一些信息写入 bigquery。没有返回值,因为该函数的目的是将一行插入到 bigquery 数据集中。这个过程需要相当长的时间,这就是为什么我想使用 Pyspark 和 Dataproc 来加速这个过程。

将 pandas 数据帧转换为 spark 很容易。我无法注册我的 udf,因为它没有返回值,而 pyspark 需要一个。此外,我不明白如何将 python 中的“应用”函数映射到 pyspark 变体。所以基本上我的问题是如何转换下面的 python 代码以在 spark 数据帧上工作。

以下代码可在常规 Python 环境中运行:

def embargomatch(name, code, embargo_names):
     find best match 
     insert best match and additional information to bigquery

customer_names.apply(lambda x: embargoMatch(x['name'], x['customer_code'],embargo_names),axis=1)

因为 pyspark 需要返回类型,所以我在 udf 中添加了 'return 1' 并尝试了以下操作:


customer_names = spark.createDataFrame(customer_names)

from pyspark.sql.types import IntegerType
embargo_match_udf = udf(lambda x: embargoMatch(x['name'], x['customer_code'],embargo_names), IntegerType())

现在我一直在尝试应用 select 函数,因为我不知道要给出什么参数。

【问题讨论】:

    标签: python google-cloud-platform pyspark user-defined-functions google-cloud-dataproc


    【解决方案1】:

    我怀疑您对如何将多列传递给 udf 感到困惑——这是该问题的一个很好的答案:Pyspark: Pass multiple columns in UDF

    与其基于包装函数的 lambda 创建 udf,不如考虑通过直接基于 embargomatch 创建 udf 来简化。

    embargo_names = ...
    
    # The parameters here are the columns passed into the udf
    def embargomatch(name, customer_code):
        pass
    embargo_match_udf = udf(embargomatch, IntegerType())
    customer_names.select(embargo_match_udf(array('name', 'customer_code')).alias('column_name'))
    

    话虽如此,怀疑您的 udf 没有返回任何内容——我通常认为 udfs 是一种向数据框添加列的方式,但不会产生副作用。如果您想将记录插入 bigquery,请考虑执行以下操作:

    customer_names.select('column_name').write.parquet('gs://some/path')
    os.system("bq load --source_format=PARQUET [DATASET].[TABLE] gs://some/path")
    

    【讨论】:

    • 我想在我的数据框中添加列。问题是我想添加 3 列(1 个整数和 2 个字符串)。我不能在单独的 lambda 函数中执行此操作,因为该函数会迭代数千条记录以找到最佳匹配。我需要最佳匹配的名称和要添加的相似度分数。这可能吗?
    • 是的,您可以:stackoverflow.com/questions/48979440/…。 tl;dr 返回一个复杂类型,然后使用另一个 select 语句将其分成单独的列。
    • 谢谢!使用 pyspark 的 MapType 为我解决了这个问题!
    猜你喜欢
    • 2016-03-05
    • 2019-06-16
    • 2017-01-19
    • 1970-01-01
    • 1970-01-01
    • 2021-06-25
    • 1970-01-01
    • 1970-01-01
    • 2016-03-02
    相关资源
    最近更新 更多