【问题标题】:Change PYSPARK_PYTHON on Spark workers在 Spark 工作人员上更改 PYSPARK_PYTHON
【发布时间】:2020-07-04 23:08:34
【问题描述】:

我们分发使用 Spark 的 Python 应用程序以及 Python 3.7 解释器(python.exe 以及所有必需的库位于 MyApp.exe 附近)。

要设置PYSPARK_PYTHON,我们有一个函数可以确定python.exe的路径:

os.environ['PYSPARK_PYTHON'] = get_python()  

在 Windows 上 PYSPARK_PYTHON 将变为 C:/MyApp/python.exe
在 Ubuntu 上 PYSPARK_PYTHON 将变为 /opt/MyApp/python.exe

我们启动主/驱动节点并在 Windows 上创建SparkSession。然后我们在 Ubuntu 上启动工作节点,但工作节点失败:

Job aborted due to stage failure: Task 1 in stage 11.0 failed 4 times, most recent failure: Lost task 1.3 in stage 11.0 (TID 1614, 10.0.2.15, executor 1): java.io.IOException: Cannot run program "C:/MyApp/python.exe": error=2, No such file or directory

当然,ubuntu 上没有C:/MyApp/python.exe

如果我正确理解此错误,来自驱动程序的PYSPARK_PYTHON 将发送给所有工作人员。

还尝试在spark-env.shspark-defaults.conf 中设置PYSPARK_PYTHON。如何将 Ubuntu 工作人员的 PYSPARK_PYTHON 更改为 /opt/MyApp/python.exe

【问题讨论】:

  • 只有一台Windows机器吗? Windows 机器上的 Spark 主服务器是否也托管执行程序?如果答案分别是yes和no,你也可以尝试在Ubuntu节点上运行master。
  • 在 Windows 机器上我启动 master 和 worker。仅在 Ubuntu 上工作。

标签: python apache-spark ubuntu pyspark environment-variables


【解决方案1】:

浏览源代码,看起来 Python 驱动程序代码在 spark/rdd.py 中创建运行 Python 函数的工作项时,将 Python 可执行路径的值从其 Spark 上下文中放入:

def _wrap_function(sc, func, deserializer, serializer, profiler=None):
    assert deserializer, "deserializer should not be empty"
    assert serializer, "serializer should not be empty"
    command = (func, profiler, deserializer, serializer)
    pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
    return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
                                                                             ^^^^^^^^^^^^^
                                  sc.pythonVer, broadcast_vars, sc._javaAccumulator)

Python 运行程序 PythonRunner.scala 然后使用存储在它接收到的第一个工作项中的路径来启动新的解释器实例:

private[spark] abstract class BasePythonRunner[IN, OUT](
    funcs: Seq[ChainedPythonFunctions],
    evalType: Int,
    argOffsets: Array[Array[Int]])
  extends Logging {
  ...
  protected val pythonExec: String = funcs.head.funcs.head.pythonExec
                                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  ...
  def compute(
      inputIterator: Iterator[IN],
      partitionIndex: Int,
      context: TaskContext): Iterator[OUT] = {
    ...
    val worker: Socket = env.createPythonWorker(pythonExec, envVars.asScala.toMap)
    ...
  }
  ...
}

基于此,恐怕目前似乎不可能在 master 和 worker 中为 Python 可执行文件进行单独的配置。另请参阅发布SPARK-26404 的第三条评论。也许您应该向 Apache Spark 项目提交 RFE。

虽然我不是 Spark 专家,但可能仍有办法做到这一点,也许通过将 PYSPARK_PYTHON 设置为 "python",然后确保系统 PATH 已相应配置,以便您的 Python 可执行文件先到先得。

【讨论】:

猜你喜欢
  • 2017-04-08
  • 1970-01-01
  • 1970-01-01
  • 2021-02-07
  • 2014-07-04
  • 1970-01-01
  • 2017-02-12
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多