【问题标题】:Pyspark: PicklingError: Could not serialize object:Pyspark:PicklingError:无法序列化对象:
【发布时间】:2018-04-25 05:56:37
【问题描述】:

我有以下两个数据框:df_whitelist 和 df_text

+-------+--------------------+
|keyword|    whitelist_terms |
+-------+--------------------+
|     LA|             LA city|
|     LA|        US LA in da |
| client|this client has i...|
| client|our client has do...|
+-------+--------------------+
+--------------------+----------+
|                Text|  Keywords|
+--------------------+----------+
|the client as ada...|client;ada|
|this client has l...| client;LA|
+--------------------+----------+

在 df_whitelist 中,每个关键字对应一组术语,例如关键词LA对应“LA city”和“US LA in da”。 在 df_text 中,我在此文本中找到了文本和一些关键字。 我想要做的是,对于每个文本,例如“客户有 ada..”,对于每个关键字“客户”和“ada”,检查该关键字的所有白名单条款,看看如何这个词多次出现在文本中。 我尝试过的如下:

import pyspark.sql.functions as F
import pyspark.sql.types as T
import re
def whitelisting(text,listOfKeyword,df_whitelist):
    keywords = listOfKeyword.split(";")
    found_whiteterms_count = 0
    for k in keywords:
        if df_whitelist.filter(df_whitelist.keyword == k).count() == 0:
            found_whiteterms_count = found_whiteterms_count + 0
        else:
            df = df_whitelist.filter(df_whitelist.keyword == k).select("whitelist_terms")
            n = df.rdd.map(lambda x:len(re.findall(x["whitelist_terms"],text))).reduce(lambda x, y: x+y)
            found_whiteterms_count = found_whiteterms_count + n    
    return found_whiteterms_count     
whitelisting_udf = F.udf(lambda text,listOfKeyword: whitelisting(text,listOfKeyword,df_whitelist),T.IntegerType())
text.withColumn("whitelist_counts", whitelisting_udf(text.Text,text.Keywords))

我得到了错误:

PicklingError: Could not serialize object: Py4JError: An error occurred while calling o1153.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
    at py4j.Gateway.invoke(Gateway.java:272)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.base/java.lang.Thread.run(Thread.java:844)

尝试了一段时间后我无法弄清楚。任何人都可以帮助指出问题以及如何解决它。谢谢。

【问题讨论】:

  • 有时 PySpark 版本错误也可能导致这种情况

标签: pyspark pickle user-defined-functions


【解决方案1】:

您正在将 pyspark 数据帧 df_whitelist 传递给 UDF,无法腌制 pyspark 数据帧。您还在 UDF 内的数据帧上进行计算,这是不可接受的(不可能)。请记住,您的函数将被调用的次数与数据框中的行数一样多,因此您应该保持计算简单。并且只有在 pyspark sql 函数无法完成时才这样做。

您需要做的是加入keyword 上的两个数据框。 让我们从您提供的两个示例数据框开始:

df_whitelist = spark.createDataFrame(
    [["LA", "LA city"], ["LA", "US LA in da"], ["client", "this client has i"], ["client", "our client"]], 
    ["keyword", "whitelist_terms"])
df_text = spark.createDataFrame(
    [["the client as ada", "client;ada"], ["this client has l", "client;LA"]], 
    ["Text", "Keywords"])

df_text 中的Keywords 列需要处理,我们必须将字符串转换为数组,然后将其分解,这样我们每行只有一项:

import pyspark.sql.functions as F
df_text = df_text.select("Text", F.explode(F.split("Keywords", ";")).alias("keyword"))

    +-----------------+-------+
    |             Text|keyword|
    +-----------------+-------+
    |the client as ada| client|
    |the client as ada|    ada|
    |this client has l| client|
    |this client has l|     LA|
    +-----------------+-------+

现在我们可以连接keyword上的两个数据框了:

df = df_text.join(df_whitelist, "keyword", "leftouter")

    +-------+-----------------+-----------------+
    |keyword|             Text|  whitelist_terms|
    +-------+-----------------+-----------------+
    |     LA|this client has l|          LA city|
    |     LA|this client has l|      US LA in da|
    |    ada|the client as ada|             null|
    | client|the client as ada|this client has i|
    | client|the client as ada|       our client|
    | client|this client has l|this client has i|
    | client|this client has l|       our client|
    +-------+-----------------+-----------------+
  • 您在UDF 中调用的第一个条件可以翻译如下:如果df_text 中的keyword 不存在于df_whitelist 中,则为0。相当于说出@987654335 的值@ 列将在 left join 中为 NULL,因为它们仅出现在左侧数据框中

  • 第二个条件:你统计whitelist_terms出现在Text的次数:Text.count(whitelist_terms)

我们将写一个UDF 来执行此操作:

from pyspark.sql.types import IntegerType
count_terms = F.udf(lambda Text, term: Text.count(term) if term is not None else 0, IntegerType())
df = df.select(
    "Text", 
    "keyword", 
    F.when(F.isnull("whitelist_terms"), 0).otherwise(count_terms("Text", "whitelist_terms")).alias("whitelist_counts"))

    +-----------------+-------+----------------+
    |             Text|keyword|whitelist_counts|
    +-----------------+-------+----------------+
    |this client has l|     LA|               0|
    |this client has l|     LA|               0|
    |the client as ada|    ada|               0|
    |the client as ada| client|               0|
    |the client as ada| client|               0|
    |this client has l| client|               0|
    |this client has l| client|               0|
    +-----------------+-------+----------------+

最后我们可以聚合以返回一个只有不同 Text 的数据框:

res = df.groupBy("Text").agg(
    F.collect_set("keyword").alias("Keywords"),
    F.sum("whitelist_counts").alias("whitelist_counts"))
res.show()

    +-----------------+-------------+----------------+
    |             Text|     Keywords|whitelist_counts|
    +-----------------+-------------+----------------+
    |this client has l| [client, LA]|               0|
    |the client as ada|[ada, client]|               0|
    +-----------------+-------------+----------------+

【讨论】:

  • 非常感谢 MaFF,非常感谢带有清晰解释的简洁解决方案。考虑到有数以百万计的文本项、50 个关键字,并且每个关键字可能包含一千个字词,只有一点。笛卡尔加入会是最有效的解决方案吗?还是在 spark 中还有一些替代方案可以提高效率?
  • 我不确定您的确切意思。笛卡尔积将比键连接慢,因为您将大大增加结果数据框中的行数。如果您的数据框之一是“小”,您应该考虑在join 表达式中使用broadcasting(pyspark.sql.functions.broadcast),它将提高效率
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2014-08-04
  • 2015-05-29
  • 2016-03-04
  • 1970-01-01
相关资源
最近更新 更多