【问题标题】:Error when calling UDF using broadcasted objects in PySpark在 PySpark 中使用广播对象调用 UDF 时出错
【发布时间】:2017-11-14 10:12:31
【问题描述】:

我正在尝试调用在 PySpark 中使用广播对象的 UDF。

这是一个重现情况和错误的最小示例:

import pyspark.sql.functions as sf
from pyspark.sql.types import LongType


class SquareClass:
    def compute(self, n):
        return n ** 2


square = SquareClass()
square_sc = sc.broadcast(square)

def f(n):
    return square_sc.value.compute(n)  

numbers = sc.parallelize([{'id': i} for i in range(10)]).toDF()
f_udf = sf.udf(f, LongType())  

numbers.select(f_udf(numbers.id)).show(10)

此 sn-p 生成的堆栈跟踪和错误消息:

Traceback (most recent call last)
<ipython-input-75-6e38c014e4b2> in <module>()
     13 f_udf = sf.udf(f, LongType())
     14 
---> 15 numbers.select(f_udf(numbers.id)).show(10)

/usr/hdp/current/spark-client/python/pyspark/sql/dataframe.py in show(self, n, truncate)
    255         +---+-----+
    256         """
--> 257         print(self._jdf.showString(n, truncate))
    258 
    259     def __repr__(self):

/usr/local/lib/python3.5/dist-packages/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, 

<snip>

An error occurred while calling o938.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 49.0 failed 1 times, most recent failure: Lost task 1.0 in stage 49.0 (TID 587, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):

【问题讨论】:

  • 你没有提供trace,好像是AttributeError

标签: pyspark apache-spark-sql spark-dataframe user-defined-functions


【解决方案1】:

当调用 square_sc 的属性时,您调用的是模块 SquareClass,它不存在于工作程序中。

如果您想在UDF 中使用python 包、类、函数,工作人员应该能够访问它,您可以通过将代码放入python 脚本并使用--py-files 部署它来实现此目的。跑你spark-submitpyspark

【讨论】:

    【解决方案2】:

    您可以做的一件事是将类保留为单独的模块并将模块添加到 sparkContext。

    class_module.py
    
    class SquareClass:
        def compute(self, n):
            return n ** 2
    
    pyspark-shell 
    
        import pyspark.sql.functions as sf
        from pyspark.sql.types import LongType
        from class_module import SquareClass
    
        sc.addFile('class_module.py')
    
        square = SquareClass()
        square_sc = sc.broadcast(square) 
        def f(n):
            return square_sc.value.compute(n)
    
        f_udf = sf.udf(f, LongType())
        numbers = sc.parallelize([{'id': i} for i in range(10)]).toDF()
        numbers.select(f_udf(numbers.id)).show(10)
        +-----+
        |f(id)|
        +-----+
        |    0|
        |    1|
        |    4|
        |    9|
        |   16|
        |   25|
        |   36|
        |   49|
        |   64|
        |   81|
        +-----+
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-12-31
      • 1970-01-01
      相关资源
      最近更新 更多