【问题标题】:Apache Spark: pyspark crash for large datasetApache Spark:大型数据集的 pyspark 崩溃
【发布时间】:2015-01-13 07:04:54
【问题描述】:

我是 Spark 的新手。我有训练数据 4000x1800 的输入文件。当我尝试训练这些数据(编写的python)时出现以下错误:

  1. 14/11/15 22:39:13 错误 PythonRDD:Python 工作者意外退出(崩溃) java.net.SocketException:对等方重置连接:套接字写入错误

  2. org.apache.spark.SparkException:作业因阶段失败而中止:阶段 0.0 中的任务 0 失败 1 次,最近一次失败:阶段 0.0 中丢失任务 0.0(TID 0,本地 host): java.net.SocketException: Connection reset by peer: socket write error

使用火花 1.1.0。任何建议都会有很大帮助。

代码:

 from pyspark.mllib.classification import SVMWithSGD
    from pyspark.mllib.regression import LabeledPoint
    from pyspark.mllib.linalg import Vectors 
    from pyspark import SparkContext
    from pyspark import SparkConf, SparkContext
    from numpy import array


    #Train the model using feature matrix
    # Load and parse the data
    def parsePoint(line):
        values = [float(x) for x in line.split(' ')]
        return LabeledPoint(values[0], values[1:])

    #create spark Context
    conf = (SparkConf()
         .setMaster("local")
         .setAppName("My app")
         .set("spark.executor.memory", "1g"))
    sc = SparkContext(conf = conf)

    data = sc.textFile("myfile.txt")
    parsedData = data.map(parsePoint)

    #Train SVM model
    model = SVMWithSGD.train(parsedData,100)

我收到以下错误:

14/11/15 22:38:38 INFO MemoryStore: ensureFreeSpace(32768) called with curMem=0, maxMem=278302556
14/11/15 22:38:38 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 32.0 KB, free 265.4 MB)
>>> parsedData = data.map(parsePoint)
>>> model = SVMWithSGD.train(parsedData,100)
14/11/15 22:39:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/11/15 22:39:12 WARN LoadSnappy: Snappy native library not loaded
14/11/15 22:39:12 INFO FileInputFormat: Total input paths to process : 1
14/11/15 22:39:13 INFO SparkContext: Starting job: runJob at PythonRDD.scala:296
14/11/15 22:39:13 INFO DAGScheduler: Got job 0 (runJob at PythonRDD.scala:296) with 1 output partitions (allowLocal=true)
14/11/15 22:39:13 INFO DAGScheduler: Final stage: Stage 0(runJob at PythonRDD.scala:296)
14/11/15 22:39:13 INFO DAGScheduler: Parents of final stage: List()
14/11/15 22:39:13 INFO DAGScheduler: Missing parents: List()
14/11/15 22:39:13 INFO DAGScheduler: Submitting Stage 0 (PythonRDD[3] at RDD at PythonRDD.scala:43), which has no missing parents
14/11/15 22:39:13 INFO MemoryStore: ensureFreeSpace(5088) called with curMem=32768, maxMem=278302556
14/11/15 22:39:13 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 5.0 KB, free 265.4 MB)
14/11/15 22:39:13 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (PythonRDD[3] at RDD at PythonRDD.scala:43)
14/11/15 22:39:13 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
14/11/15 22:39:13 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1221 bytes)
14/11/15 22:39:13 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
14/11/15 22:39:13 INFO HadoopRDD: Input split: file:/G:/SparkTest/spark-1.1.0/spark-1.1.0/bin/FeatureMatrix.txt:0+8103732
14/11/15 22:39:13 INFO PythonRDD: Times: total = 264, boot = 233, init = 29, finish = 2
14/11/15 22:39:13 ERROR PythonRDD: Python worker exited unexpectedly (crashed)
java.net.SocketException: Connection reset by peer: socket write error
        at java.net.SocketOutputStream.socketWrite0(Native Method)
        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
        at java.net.SocketOutputStream.write(SocketOutputStream.java:159)
        at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
        at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
        at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:533)
        at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$2.apply(PythonRDD.scala:341)
        at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$2.apply(PythonRDD.scala:340)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:340)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
        at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183)
14/11/15 22:39:13 ERROR PythonRDD: This may have been caused by a prior exception:
java.net.SocketException: Connection reset by peer: socket write error
        at java.net.SocketOutputStream.socketWrite0(Native Method)
        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
        at java.net.SocketOutputStream.write(SocketOutputStream.java:159)
        at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
        at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
        at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:533)
        at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$2.apply(PythonRDD.scala:341)
        at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$2.apply(PythonRDD.scala:340)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:340)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
        at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183)
14/11/15 22:39:13 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.net.SocketException: Connection reset by peer: socket write error
        at java.net.SocketOutputStream.socketWrite0(Native Method)
        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
        at java.net.SocketOutputStream.write(SocketOutputStream.java:159)
        at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
        at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
        at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:533)
        at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$2.apply(PythonRDD.scala:341)
        at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$2.apply(PythonRDD.scala:340)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:340)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
        at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183)
14/11/15 22:39:13 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.net.SocketException: Connection reset by peer: socket write error
        java.net.SocketOutputStream.socketWrite0(Native Method)
        java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
        java.net.SocketOutputStream.write(SocketOutputStream.java:159)
        java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
        java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
        java.io.DataOutputStream.write(DataOutputStream.java:107)
        java.io.FilterOutputStream.write(FilterOutputStream.java:97)
        org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:533)
        org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$2.apply(PythonRDD.scala:341)
        org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$2.apply(PythonRDD.scala:340)
        scala.collection.Iterator$class.foreach(Iterator.scala:727)
        scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:340)
        org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209)
        org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
        org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
        org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
        org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183)
14/11/15 22:39:13 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
14/11/15 22:39:13 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
14/11/15 22:39:13 INFO TaskSchedulerImpl: Cancelling stage 0
14/11/15 22:39:13 INFO DAGScheduler: Failed to run runJob at PythonRDD.scala:296
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "G:\SparkTest\spark-1.1.0\spark-1.1.0\python\pyspark\mllib\classification.py", line 178, in train
    return _regression_train_wrapper(sc, train_func, SVMModel, data, initialWeights)
  File "G:\SparkTest\spark-1.1.0\spark-1.1.0\python\pyspark\mllib\_common.py", line 430, in _regression_train_wrapper
    initial_weights = _get_initial_weights(initial_weights, data)
  File "G:\SparkTest\spark-1.1.0\spark-1.1.0\python\pyspark\mllib\_common.py", line 415, in _get_initial_weights
    initial_weights = _convert_vector(data.first().features)
  File "G:\SparkTest\spark-1.1.0\spark-1.1.0\python\pyspark\rdd.py", line 1167, in first
    return self.take(1)[0]
  File "G:\SparkTest\spark-1.1.0\spark-1.1.0\python\pyspark\rdd.py", line 1153, in take
    res = self.context.runJob(self, takeUpToNumLeft, p, True)
  File "G:\SparkTest\spark-1.1.0\spark-1.1.0\python\pyspark\context.py", line 770, in runJob
    it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal)
  File "G:\SparkTest\spark-1.1.0\spark-1.1.0\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py", line 538, in __call__
  File "G:\SparkTest\spark-1.1.0\spark-1.1.0\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, lo
host): java.net.SocketException: Connection reset by peer: socket write error
        java.net.SocketOutputStream.socketWrite0(Native Method)
        java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
        java.net.SocketOutputStream.write(SocketOutputStream.java:159)
        java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
        java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
        java.io.DataOutputStream.write(DataOutputStream.java:107)
        java.io.FilterOutputStream.write(FilterOutputStream.java:97)
        org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:533)
        org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$2.apply(PythonRDD.scala:341)
        org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$2.apply(PythonRDD.scala:340)
        scala.collection.Iterator$class.foreach(Iterator.scala:727)
        scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:340)
        org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209)
        org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
        org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
        org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
        org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183)
Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

>>> 14/11/15 23:22:52 INFO BlockManager: Removing broadcast 1
14/11/15 23:22:52 INFO BlockManager: Removing block broadcast_1
14/11/15 23:22:52 INFO MemoryStore: Block broadcast_1 of size 5088 dropped from memory (free 278269788)
14/11/15 23:22:52 INFO ContextCleaner: Cleaned broadcast 1

问候, 穆鲁云杰

【问题讨论】:

  • 你解决了这个问题吗?我也面临同样的问题,真的很烦。
  • 我也遇到同样的错误
  • 我在使用 UCI 自行车共享数据集时也遇到了同样的错误。

标签: apache-spark


【解决方案1】:

穆鲁廷杰,

虽然我没有确定的答案。这个问题看起来与内存有关。在尝试读取 5 MB 的文件时,我也遇到了同样的问题。我删除了文件的一部分并减少到不到 1 MB 并且代码有效。

我也在下面的网站上发现了同样的问题。

http://apache-spark-user-list.1001560.n3.nabble.com/pyspark-Failed-to-run-first-td7691.html

【讨论】:

    【解决方案2】:

    我得到了同样的错误,然后我从pyspark process big datasets problems得到了一个相关的答案

    解决方案是添加一些代码 python/pyspark/worker.py

    将以下2行添加到主函数内部定义的流程函数的末尾

    for obj in iterator:
     pass
    

    所以流程函数现在看起来像这样(至少在 spark 1.5.2 中):

     def process():
            iterator = deserializer.load_stream(infile)
            serializer.dump_stream(func(split_index, iterator), outfile)
            for obj in iterator:
                pass
    

    这对我有用。

    【讨论】:

    • 什么是 python/pyspark/worker.py 以及如何找到它
    • 我在 spark 2.3.2 中尝试过这个,但是在 worker.py 中进行更改后它没有工作
    【解决方案3】:
    1. 一种可能是parsePoint中有异常,换行 try except 块中的代码并打印出异常。
    2. 检查您的--driver-memory 参数,使其更大。

    【讨论】:

    • 如何让你的--driver-memory参数更大
    【解决方案4】:

    我遇到了类似的问题,我尝试了类似的方法:

    numPartitions = 一个数字,例如 10 或 100 data = sc.textFile("myfile.txt",numPartitions)

    灵感来源:如何在 Spark 中均匀地重新分区?或在这里:https://databricks.gitbooks.io/databricks-spark-knowledge-base/content/performance_optimization/how_many_partitions_does_an_rdd_have.html

    【讨论】:

      【解决方案5】:

      就是这么简单。

      conf = SparkConf().setMaster("local").setAppName("RatingsHistogram") 
      sc = SparkContext(conf = conf) 
      lines = sc.textFile("file:///SparkCourse/filter_1.csv",2000) 
      print lines.first()
      

      在使用sc.textfile 时,再添加一个参数,将分割数设置为一个较大的值。 数据越大,值越大。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2021-10-29
        • 2019-10-06
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多