【问题标题】:Python version error in Jupyter of Google DataProcGoogle DataProc 的 Jupyter 中的 Python 版本错误
【发布时间】:2019-04-17 00:35:57
【问题描述】:

我使用 Jupyter 初始化创建了一个 DataProc 集群。我使用的映像版本是 1.4。我 ssh 到主节点和工作节点并运行 python --version,并且都显示 Python 3.6.5 :: Anaconda, Inc.

但是,当我尝试从 Google 运行示例时: Reading and writing data from BigQuery 使用 Jupyter(PySpark 内核),它给出以下错误:

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-13-1cf15cbebfd5> in <module>
     55 
     56 # Display 10 results.
---> 57 pprint.pprint(word_counts.take(10))
     58 
     59 

/usr/lib/spark/python/pyspark/rdd.py in take(self, num)
   1358 
   1359             p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1360             res = self.context.runJob(self, takeUpToNumLeft, p)
   1361 
   1362             items += res

/usr/lib/spark/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal)
   1049         # SparkContext#runJob.
   1050         mappedRDD = rdd.mapPartitions(partitionFunc)
-> 1051         sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
   1052         return list(_load_from_socket(sock_info, mappedRDD._jrdd_deserializer))
   1053 

/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 24.0 failed 4 times, most recent failure: Lost task 0.3 in stage 24.0 (TID 563, test-1-w-0.c.abc.internal, executor 3): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 262, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 2.7 than that in driver 3.6, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1888)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1875)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2109)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2058)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2047)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:153)
    at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 262, in main
    ("%d.%d" % sys.version_info[:2], version))
Exception: Python in worker has different version 2.7 than that in driver 3.6, PySpark cannot run with different minor versions.Please check environment variables PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON are correctly set.

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:588)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:571)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

我不明白为什么会发生 master-worker python 版本错误。此外,当我从本地命令行提交此作业时,它可以正常工作。任何帮助或建议表示赞赏。

【问题讨论】:

  • 您有用于创建集群的完整命令吗?包括任何元数据标志和 --initialization-actions

标签: google-cloud-platform pyspark jupyter-notebook google-cloud-dataproc


【解决方案1】:

仅在使用最初为 Dataproc 1.2 或更早版本编写的初始化操作时才会出现所述问题。使用 Dataproc 映像版本 1.3 或更高版本时,您应该使用 Dataproc Optional Components 而不是初始化操作来安装 Juptyer;这种方法会更可靠,也能确保整个集群的所有相关版本设置都是正确的:

gcloud dataproc clusters create cluster-name \
  --optional-components=JUPYTER \
  --image-version=1.4 \
  ... other flags

【讨论】:

  • 可选组件的问题是我无法自定义 Jupyter 设置。例如,Jupyter 在右上角有一个“退出”按钮。我想禁用那个按钮,我可以修改 setup-jupyter-kernel.sh 来禁用它。但是,使用可选组件,我将无法在创建时执行此操作(可以在创建后修改配置,但并不理想)。在创建时这样做有什么建议吗?
  • 您可以编写自己的初始化操作,该操作将在集群启动期间修改由可选组件安装的setup-jupyter-kernel.sh
  • 是否担心在创建后修改它并不理想,因为启动需要更长的时间?在实践中,使用可选组件启动比 init 操作要快得多,因为可选组件不需要从 Internet 下载任何内容,因此很可能即使在 init 操作中重新配置和重新启动 Jupyter 之后,整体部署时间也会更快而不是使用初始化操作。如果您想要非常深入的自定义,您可以考虑使用custom images
  • 您还可以向 dataproc-feedback@google.com 发送有关您想要的可配置选项的建议,并且有可能通过集群属性将一些常用选项作为一流的支持
  • 我的目标是提供一个创建脚本,以便任何人都可以根据需要使用它来创建自己的集群。能够自定义 jupyter.sh 和 setup-jupyter-kernel.sh 可以让用户轻松创建具有预期行为的集群(而不是每次都手动修改配置)。但是问题是如果我使用jupyter.sh来初始化它,就遇到了这个问题的问题。您是否知道 jupyter.sh 的哪个部分导致了问题?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-11-17
  • 2015-12-21
  • 1970-01-01
  • 2019-09-08
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多