【发布时间】:2021-07-12 08:44:34
【问题描述】:
我有一些我想并行计算的对象,因此我认为我可以求助于 pyspark。
考虑这个例子,一个类的对象确实有一个数字i,它可以与square()平方:
class MyMathObject():
def __init__(self, i):
self.i = i
def square(self):
return self.i ** 2
print(MyMathObject(3).square()) # Test one instance with regular python - works
此外,我设置了 pyspark(在 jupyter 笔记本中),现在我想在我的对象上并行计算 0 到 4 的平方:
import findspark
findspark.init()
from pyspark import SparkContext
sc = SparkContext("local[2]")
rdd = sc.parallelize([MyMathObject(i) for i in range(5)])
rdd.map(lambda obj: obj.square()).collect() # This fails
这不起作用 - 它会导致很长且对我来说大多无用的错误消息。 我觉得有点有趣的唯一一行是:
AttributeError: Can't get attribute 'MyMathObject' on
所以,这似乎与属性square() 的调用方式有关。我在最后复制完整的消息。
Pyspark 本身似乎可以工作;例如,在普通 python 列表上执行以下操作会按预期返回平方数。
rdd = sc.parallelize([i for i in range(5)])
rdd.map(lambda i: i**2).collect()
因此,我创建或操作对象的方式似乎存在缺陷,但我无法追查错误。
完整的错误信息:
Py4JJavaError Traceback(最近一次调用最后一次) 在 1 rdd = sc.parallelize([MyMathObject(i) for i in range(5)]) ----> 2 rdd.map(lambda obj: obj.square()).collect()
/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/pyspark/rdd.py in collect(self) 第887章 888 与 SCCallSiteSync(self.context) 作为 css: --> 889 sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 890 返回列表(_load_from_socket(sock_info,self._jrdd_deserializer)) 891
/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py 在调用强>(自我,*args) 1302 第1303章 -> 1304 返回值 = 获取返回值( 1305 答案,self.gateway_client,self.target_id,self.name) 第1306章
/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id,名称) 324 值 = OUTPUT_CONVERTER[类型](答案[2:],网关客户端) 325 如果答案 [1] == REFERENCE_TYPE: --> 326 引发 Py4JJavaError( 327 “调用 {0}{1}{2} 时出错。\n”。 328 格式(target_id, ".", 名称), 值)
Py4JJavaError:调用 z:org.apache.spark.api.python.PythonRDD.collectAndServe 时出错。 :org.apache.spark.SparkException:作业因阶段失败而中止:阶段 1.0 中的任务 0 失败 1 次,最近一次失败:阶段 1.0 中丢失任务 0.0(TID 2,192.168.2.108,执行程序驱动程序):org.apache .spark.api.python.PythonException:回溯(最近一次调用最后一次): 文件“/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py”,第 605 行,主要 过程() 文件“/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py”,第 597 行,正在处理中 serializer.dump_stream(out_iter, outfile) 文件“/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py”,第 271 行,在 dump_stream vs = 列表(itertools.islice(迭代器,批处理)) load_stream 中的文件“/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py”,第 147 行 产生 self._read_with_length(stream) 文件“/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py”,第 172 行,在 _read_with_length 返回 self.loads(obj) 加载中的文件“/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py”,第 458 行 返回pickle.loads(obj,编码=编码) AttributeError:无法从 '/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark 获取
在 org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503) 在 org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638) 在 org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621) 在 org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456) 在 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) 在 scala.collection.Iterator.foreach(Iterator.scala:941) 在 scala.collection.Iterator.foreach$(Iterator.scala:941) 在 org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) 在 scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) 在 scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) 在 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105) 在 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49) 在 scala.collection.TraversableOnce.to(TraversableOnce.scala:315) 在 scala.collection.TraversableOnce.to$(TraversableOnce.scala:313) 在 org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28) 在 scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307) 在 scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307) 在 org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28) 在 scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294) 在 scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288) 在 org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28) 在 org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004) 在 org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2154) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) 在 org.apache.spark.scheduler.Task.run(Task.scala:127) 在 org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462) 在 org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465) 在 java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 在 java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 在 java.base/java.lang.Thread.run(Thread.java:834)
驱动程序堆栈跟踪: 在 org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059) 在 org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008) 在 org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007) 在 scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 在 scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 在 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) 在 org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007) 在 org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973) 在 org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973) 在 scala.Option.foreach(Option.scala:407) 在 org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188) 在 org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177) 在 org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) 在 org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:2114) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:2135) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:2154) 在 org.apache.spark.SparkContext.runJob(SparkContext.scala:2179) 在 org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 在 org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 在 org.apache.spark.rdd.RDD.withScope(RDD.scala:388) 在 org.apache.spark.rdd.RDD.collect(RDD.scala:1003) 在 org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:168) 在 org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala) 在 java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.base/java.lang.reflect.Method.invoke(Method.java:566) 在 py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 在 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 在 py4j.Gateway.invoke(Gateway.java:282) 在 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 在 py4j.commands.CallCommand.execute(CallCommand.java:79) 在 py4j.GatewayConnection.run(GatewayConnection.java:238) 在 java.base/java.lang.Thread.run(Thread.java:834) 引起:org.apache.spark.api.python.PythonException:回溯(最近一次调用最后): 文件“/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py”,第 605 行,主要 过程() 文件“/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py”,第 597 行,正在处理中 serializer.dump_stream(out_iter, outfile) 文件“/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py”,第 271 行,在 dump_stream vs = 列表(itertools.islice(迭代器,批处理)) load_stream 中的文件“/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py”,第 147 行 产生 self._read_with_length(stream) 文件“/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py”,第 172 行,在 _read_with_length 返回 self.loads(obj) 加载中的文件“/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/serializers.py”,第 458 行 返回pickle.loads(obj,编码=编码) AttributeError:无法从 '/opt/apache-spark-3/spark-3.0.2-bin-hadoop3.2/python/lib/pyspark.zip/pyspark 获取
在 org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:503) 在 org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:638) 在 org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621) 在 org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456) 在 org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) 在 scala.collection.Iterator.foreach(Iterator.scala:941) 在 scala.collection.Iterator.foreach$(Iterator.scala:941) 在 org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) 在 scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) 在 scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) 在 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105) 在 scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49) 在 scala.collection.TraversableOnce.to(TraversableOnce.scala:315) 在 scala.collection.TraversableOnce.to$(TraversableOnce.scala:313) 在 org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28) 在 scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307) 在 scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307) 在 org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28) 在 scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294) 在 scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288) 在 org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28) 在 org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1004) 在 org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2154) 在 org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) 在 org.apache.spark.scheduler.Task.run(Task.scala:127) 在 org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462) 在 org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465) 在 java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 在 java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ... 1 更多
【问题讨论】:
标签: python class apache-spark pyspark rdd