【问题标题】:Spark 2.0 filter using a UDF after a self-join在自加入后使用 UDF 的 Spark 2.0 过滤器
【发布时间】:2016-08-17 13:18:16
【问题描述】:

我需要使用我自己的用户定义函数过滤 Spark 数据帧。我的数据框是使用 jdbc 连接从数据库中读取的,然后在被过滤之前通过 spark 中的自连接操作。尝试collect过滤后的数据帧时出现错误。

我已经在 spark 1.6 中成功使用了它。但是,昨天升级到 2.0 后,它失败并出现错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o400.collectToPython.
: java.lang.UnsupportedOperationException: Cannot evaluate expression: 
<lambda>(input[0, string, true])

这是一个产生错误的最小示例(在我的环境中):

from pyspark.sql.functions import udf, col
from pyspark.sql.types import BooleanType

spark = SparkSession.builder.master('local').appName('test').getOrCreate()

# this works successfully
df = spark.createDataFrame([('Alice', 1), ('Bob', 2), ('Dan', None)], 
                           ['name', 'age'])
df.filter(udf(lambda x: 'i' in x, BooleanType())(df.name)).collect()
>>> [Row(name=u'Alice', age=1)]

# this produces the error
df_emp = spark.createDataFrame([(1, 'Alice', None), (2, 'Bob', 1), 
                                (3, 'Dan', 2), (4, 'Joe', 2)], 
                               ['id', 'name', 'manager_id'])
df1 = df_emp.alias('df1')
df2 = df_emp.alias('df2')
cols = df1.columns
# the self-join
result = df1.join(df2, col('df1.id') == col('df2.manager_id'), 'left_outer')
result.collect()
>>> [Row(id=1, name=u'Alice', manager_id=None), 
     Row(id=3, name=u'Dan', manager_id=2), Row(id=2, name=u'Bob', manager_id=1), 
     Row(id=2, name=u'Bob', manager_id=1), Row(id=4, name=u'Joe', manager_id=2)]

# simple udf filter
filtered = result.filter(udf(lambda x: 'i' in x, BooleanType())(result.name))
filtered.collect()
# the above error is produced...

在这种情况下我做错了什么吗?这是 2.0 中的错误,还是我应该考虑在两个版本之间进行一些行为变化?

【问题讨论】:

    标签: python apache-spark pyspark spark-dataframe


    【解决方案1】:

    这是 pyspark 中的一个错误。

    我在这里提交了一个错误https://issues.apache.org/jira/browse/SPARK-17100

    这个问题出现在 left_outer、right_outer 和外连接中,但不是内连接。

    一种解决方法是在过滤器之前缓存连接结果。

    例如:

    result = df1.join(df2, col('df1.id') == col('df2.manager_id'), 'left_outer').select(df2.name).cache()

    【讨论】:

    • 我把头撞到了墙上,因为在前一个会话中工作的 udf 失败了。这救了我!谢谢蒂姆!
    猜你喜欢
    • 1970-01-01
    • 2018-06-12
    • 2023-01-09
    • 2020-05-16
    • 2017-12-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-09-04
    相关资源
    最近更新 更多