【问题标题】:Running Spark driver program in Docker container - no connection back from executor to the driver?在 Docker 容器中运行 Spark 驱动程序 - 没有从执行程序返回到驱动程序的连接?
【发布时间】:2018-01-11 08:32:53
【问题描述】:

更新:问题已解决。 Docker 镜像在这里:docker-spark-submit

我使用 Docker 容器内的胖 jar 运行 spark-submit。我的独立 Spark 集群在 3 台虚拟机上运行——一台主机和两台工作机。从工作机器上的执行程序日志中,我看到执行程序具有以下驱动程序 URL:

"--driver-url" "spark://CoarseGrainedScheduler@172.17.0.2:5001"

172.17.0.2实际上是带有驱动程序的容器的地址,而不是容器运行的宿主机。工作机器无法访问此 IP,因此工作人员无法与驱动程序通信。 正如我从 StandaloneSchedulerBackend 的源代码中看到的,它使用 spark.driver.host 设置构建 driverUrl:

val driverUrl = RpcEndpointAddress(
  sc.conf.get("spark.driver.host"),
  sc.conf.get("spark.driver.port").toInt,
  CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString

它没有考虑 SPARK_PUBLIC_DNS 环境变量 - 这是正确的吗?在容器中,除了容器“内部”IP 地址(本例中为 172.17.0.2)之外,我无法将 spark.driver.host 设置为其他任何内容。尝试将 spark.driver.host 设置为主机的 IP 地址时,出现如下错误:

警告实用程序:服务“sparkDriver”无法绑定到端口 5001。 正在尝试端口 5002。

我尝试将 spark.driver.bindAddress 设置为主机的 IP 地址,但得到了同样的错误。 那么,如何配置 Spark 使用宿主机 IP 地址而不是 Docker 容器地址与驱动程序通信?

UPD:来自执行程序的堆栈跟踪:

ERROR RpcOutboxMessage: Ask timeout before connecting successfully
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1713)
    at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:66)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:188)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:284)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 120 seconds. This timeout is controlled by spark.rpc.askTimeout
    at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:216)
    at scala.util.Try$.apply(Try.scala:192)
    at scala.util.Failure.recover(Try.scala:216)
    at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326)
    at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:326)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at org.spark_project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
    at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:136)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
    at scala.concurrent.Promise$class.complete(Promise.scala:55)
    at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
    at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:237)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
    at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:63)
    at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:78)
    at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
    at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
    at scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:54)
    at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:601)
    at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:106)
    at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:599)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
    at scala.concurrent.Promise$class.tryFailure(Promise.scala:112)
    at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:153)
    at org.apache.spark.rpc.netty.NettyRpcEnv.org$apache$spark$rpc$netty$NettyRpcEnv$$onFailure$1(NettyRpcEnv.scala:205)
    at org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:239)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply in 120 seconds
    ... 8 more

【问题讨论】:

标签: docker apache-spark mesos apache-spark-standalone


【解决方案1】:

所以工作配置是:

  • 将 spark.driver.host 设置为主机的 IP 地址
  • 将 spark.driver.bindAddress 设置为容器的 IP 地址

工作的 Docker 镜像在这里:docker-spark-submit

【讨论】:

  • 使用 SPARK_DRIVER_HOST 时不起作用显然,这里 docker0 有三个 IP 地址:172.17.0.1(主机的 docker 以太网)、作业服务器 docker IP(172.17.0.2)和主机本地分配的 IP 地址。我们尝试了所有 3 个作为 SPARK_DRIVER_HOST 但我们得到连接超时!
  • @Somum 上述解决方案适用于 Apache Spark-2.1.0+。请检查您的 Spark 版本。
  • bindAddress 应该设置为0.0.0.0
【解决方案2】:

我注意到其他答案正在使用 Spark Standalone(在 VM 上,如 OP 或 127.0.0.1 所述作为其他答案)。

我想展示对远程 AWS Mesos 集群运行 jupyter/pyspark-notebook 的变体并在本地 Mac 上的 Docker 中运行容器的方法似乎对我有用。

在这种情况下,these instuctions apply,但是,--net=host 在 Linux 主机上不起作用。
这里的重要一步 - 在 Mesos 从属的操作系统上创建笔记本用户,如链接中所述。

This diagram 对调试网络很有帮助,但它没有提到 spark.driver.blockManager.port,这实际上是使它工作的最后一个参数,我在 Spark 文档中错过了它。否则,Mesos 从属服务器上的执行器也会尝试绑定该块管理器端口,而 Mesos 拒绝分配它。

公开这些端口,以便您可以在本地访问 Jupyter 和 Spark UI

  • Jupyter 用户界面 (8888)
  • Spark 用户界面 (4040)

这些端口使 Mesos 可以返回到驱动程序:重要提示:必须允许 Mesos 主设备、从设备和 Zookepeeper 进行双向通信......

  • “libprocess”地址+端口似乎通过LIBPROCESS_PORT变量(随机:37899)在Zookeeper中存储/广播。参考:Mesos documentation
  • Spark 驱动程序端口(随机:33139)+ 16 用于spark.port.maxRetries
  • Spark 块管理器端口(随机:45029)+ 16 用于spark.port.maxRetries

不是很相关,但我使用的是 Jupyter Lab 界面

export EXT_IP=<your external IP>

docker run \
  -p 8888:8888 -p 4040:4040 \
  -p 37899:37899 \
  -p 33139-33155:33139-33155 \
  -p 45029-45045:45029-45045 \
  -e JUPYTER_ENABLE_LAB=y \
  -e EXT_IP \
  -e LIBPROCESS_ADVERTISE_IP=${EXT_IP} \
  -e LIBPROCESS_PORT=37899 \
  jupyter/pyspark-notebook

一旦启动,我会转到 Jupyter 的localhost:8888 地址,然后打开一个终端进行简单的spark-shell 操作。我还可以为实际打包的代码添加卷挂载,但这是下一步。

我没有编辑spark-env.shspark-default.conf,所以我现在将所有相关的配置传递给spark-shell。提醒:这是在容器内

spark-shell --master mesos://zk://quorum.in.aws:2181/mesos \
  --conf spark.executor.uri=https://path.to.http.server/spark-2.4.2-bin-hadoop2.7.tgz \
  --conf spark.cores.max=1 \
  --conf spark.executor.memory=1024m \
  --conf spark.driver.host=$LIBPROCESS_ADVERTISE_IP \
  --conf spark.driver.bindAddress=0.0.0.0 \
  --conf spark.driver.port=33139 \
  --conf spark.driver.blockManager.port=45029

这会加载 Spark REPL,在一些关于查找 Mesos 主机和注册框架的输出之后,然后我使用 NameNode IP 从 HDFS 读取一些文件(尽管我怀疑任何其他可访问的文件系统或数据库应该可以工作)

我得到了预期的输出

Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.2
      /_/

Using Scala version 2.12.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_202)
Type in expressions to have them evaluated.
Type :help for more information.

scala> spark.read.text("hdfs://some.hdfs.namenode:9000/tmp/README.md").show(10)
+--------------------+
|               value|
+--------------------+
|      # Apache Spark|
|                    |
|Spark is a fast a...|
|high-level APIs i...|
|supports general ...|
|rich set of highe...|
|MLlib for machine...|
|and Spark Streami...|
|                    |
|<http://spark.apa...|
+--------------------+
only showing top 10 rows

【讨论】:

  • 我为我的 jupyter 容器使用了相同的图像。打开端口是关键。
【解决方案3】:

我的设置,使用 Docker 和 MacOS:

  • 在同一个 Docker 容器中运行 Spark 1.6.3 master + worker
  • 从 MacOS 运行 Java 应用程序(通过 IDE)

Docker-compose 打开端口:

ports:
- 7077:7077
- 20002:20002
- 6060:6060

Java 配置(用于开发目的):

        esSparkConf.setMaster("spark://127.0.0.1:7077");
        esSparkConf.setAppName("datahub_dev");

        esSparkConf.setIfMissing("spark.driver.port", "20002");
        esSparkConf.setIfMissing("spark.driver.host", "MAC_OS_LAN_IP");
        esSparkConf.setIfMissing("spark.driver.bindAddress", "0.0.0.0");
        esSparkConf.setIfMissing("spark.blockManager.port", "6060");

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-07-05
    • 2017-05-29
    • 2019-09-02
    • 2017-07-30
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多