【发布时间】:2018-04-25 05:56:37
【问题描述】:
我有以下两个数据框:df_whitelist 和 df_text
+-------+--------------------+
|keyword| whitelist_terms |
+-------+--------------------+
| LA| LA city|
| LA| US LA in da |
| client|this client has i...|
| client|our client has do...|
+-------+--------------------+
+--------------------+----------+
| Text| Keywords|
+--------------------+----------+
|the client as ada...|client;ada|
|this client has l...| client;LA|
+--------------------+----------+
在 df_whitelist 中,每个关键字对应一组术语,例如关键词LA对应“LA city”和“US LA in da”。 在 df_text 中,我在此文本中找到了文本和一些关键字。 我想要做的是,对于每个文本,例如“客户有 ada..”,对于每个关键字“客户”和“ada”,检查该关键字的所有白名单条款,看看如何这个词多次出现在文本中。 我尝试过的如下:
import pyspark.sql.functions as F
import pyspark.sql.types as T
import re
def whitelisting(text,listOfKeyword,df_whitelist):
keywords = listOfKeyword.split(";")
found_whiteterms_count = 0
for k in keywords:
if df_whitelist.filter(df_whitelist.keyword == k).count() == 0:
found_whiteterms_count = found_whiteterms_count + 0
else:
df = df_whitelist.filter(df_whitelist.keyword == k).select("whitelist_terms")
n = df.rdd.map(lambda x:len(re.findall(x["whitelist_terms"],text))).reduce(lambda x, y: x+y)
found_whiteterms_count = found_whiteterms_count + n
return found_whiteterms_count
whitelisting_udf = F.udf(lambda text,listOfKeyword: whitelisting(text,listOfKeyword,df_whitelist),T.IntegerType())
text.withColumn("whitelist_counts", whitelisting_udf(text.Text,text.Keywords))
我得到了错误:
PicklingError: Could not serialize object: Py4JError: An error occurred while calling o1153.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:272)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.base/java.lang.Thread.run(Thread.java:844)
尝试了一段时间后我无法弄清楚。任何人都可以帮助指出问题以及如何解决它。谢谢。
【问题讨论】:
-
有时 PySpark 版本错误也可能导致这种情况
标签: pyspark pickle user-defined-functions