【问题标题】:Spark cluster on EC2 is using only one nodeEC2 上的 Spark 集群仅使用一个节点
【发布时间】:2018-06-12 21:16:06
【问题描述】:

我使用flintrock 在 Amazon EC2 上启动具有 8+1 个节点的 Spark 集群。

> flintrock --config config.yaml launch cluster-8nodes

然后我使用 flintrock 登录集群:

> flintrock --config config.yaml login cluster-8nodes

我正在运行的工作本质上是一个大文本文件上的这个简单的二元计数代码:

@contextmanager
def use_spark_context(appName):
    conf = SparkConf().setAppName(appName) 
    spark_context = SparkContext(conf=conf)

    try:
        print("starting ", appName)
        yield spark_context
    finally:
        spark_context.stop()
        print("stopping ", appName)

with use_spark_context("AppName") as spark:
    text_file = spark.textFile(text_path)
    bigrams = text_file.flatMap(lambda line: line.split(".")) \
                       .map(lambda line: line.strip().split(" ")) \
                       .flatMap(lambda xs: (tuple(x) for x in zip(xs, xs[1:])))
    counts = bigrams.map(lambda bigram: (bigram, 1)) \
            .reduceByKey(lambda x, y: x + y) \
            .filter(lambda bigram: bigram in name_bigrams) \
            .collect()

保存为.py文件,通过flintrock登录后提交如下:

> PYSPARK_PYTHON=python3 spark-submit --num-executors 8 my_job.py --input data/bigtext.txt

程序似乎运行良好并产生以下输出。但是,除一个之外的所有节点都处于空闲状态。这种设置不应该在集群的 8 个节点之间分配作业吗?

18/06/08 09:50:48 INFO Executor: Finished task 10.0 in stage 0.0 (TID 10). 1998 bytes result sent to driver
18/06/08 09:50:48 INFO TaskSetManager: Starting task 12.0 in stage 0.0 (TID 12, localhost, executor driver, partition 12, PROCESS_LOCAL, 4851 bytes)
18/06/08 09:50:48 INFO Executor: Running task 12.0 in stage 0.0 (TID 12)
18/06/08 09:50:48 INFO TaskSetManager: Finished task 10.0 in stage 0.0 (TID 10) in 30285 ms on localhost (executor driver) (11/382)
18/06/08 09:50:48 INFO HadoopRDD: Input split: file:/home/ec2-user/data/enwiki-extract.txt:402653184+33554432
18/06/08 09:50:53 INFO PythonRunner: Times: total = 32160, boot = -586, init = 588, finish = 32158
18/06/08 09:50:54 INFO Executor: Finished task 11.0 in stage 0.0 (TID 11). 1998 bytes result sent to driver
18/06/08 09:50:54 INFO TaskSetManager: Starting task 13.0 in stage 0.0 (TID 13, localhost, executor driver, partition 13, PROCESS_LOCAL, 4851 bytes)
18/06/08 09:50:54 INFO TaskSetManager: Finished task 11.0 in stage 0.0 (TID 11) in 32785 ms on localhost (executor driver) (12/382)
18/06/08 09:50:54 INFO Executor: Running task 13.0 in stage 0.0 (TID 13)
18/06/08 09:50:54 INFO HadoopRDD: Input split: file:/home/ec2-user/data/enwiki-extract.txt:436207616+33554432
18/06/08 09:51:19 INFO PythonRunner: Times: total = 30232, boot = -571, init = 578, finish = 30225
18/06/08 09:51:19 INFO Executor: Finished task 12.0 in stage 0.0 (TID 12). 1998 bytes result sent to driver
18/06/08 09:51:19 INFO TaskSetManager: Starting task 14.0 in stage 0.0 (TID 14, localhost, executor driver, partition 14, PROCESS_LOCAL, 4851 bytes)
18/06/08 09:51:19 INFO Executor: Running task 14.0 in stage 0.0 (TID 14)
18/06/08 09:51:19 INFO TaskSetManager: Finished task 12.0 in stage 0.0 (TID 12) in 30794 ms on localhost (executor driver) (13/382)
18/06/08 09:51:19 INFO HadoopRDD: Input split: file:/home/ec2-user/data/enwiki-extract.txt:469762048+33554432
18/06/08 09:51:25 INFO PythonRunner: Times: total = 31385, boot = -608, init = 611, finish = 31382
18/06/08 09:51:26 INFO Executor: Finished task 13.0 in stage 0.0 (TID 13). 1998 bytes result sent to driver
18/06/08 09:51:26 INFO TaskSetManager: Starting task 15.0 in stage 0.0 (TID 15, localhost, executor driver, partition 15, PROCESS_LOCAL, 4851 bytes)
18/06/08 09:51:26 INFO TaskSetManager: Finished task 13.0 in stage 0.0 (TID 13) in 32061 ms on localhost (executor driver) (14/382)
18/06/08 09:51:26 INFO Executor: Running task 15.0 in stage 0.0 (TID 15)
18/06/08 09:51:26 INFO HadoopRDD: Input split: file:/home/ec2-user/data/enwiki-extract.txt:503316480+33554432

编辑:如果我将主 URL 指定为 flintrock launchspark-submit --master 的输出,作业会启动但会失败,因为找不到本地存储在登录节点上的输入文件:

py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 4 times, most recent failure: Lost task 3.3 in stage 0.0 (TID 30, 172.31.28.28, executor 5): java.io.FileNo$
FoundException: File file:/home/ec2-user/data/enwiki-extract.txt does not exist

登录节点不也是主节点吗?我的假设是主节点会读取文件并将其分区分配给工作节点。

【问题讨论】:

  • 问题中没有任何内容表明您实际连接到集群。您拥有的代码(很难说没有看到use_spark_context)建议您使用local 模式。
  • @user8371915 为use_spark_context 添加了代码。我是否需要编辑 SparkConf 才能退出本地模式?
  • 这只是一个猜测。你能查一下sc.master吗?我很确定 Flintrock 应该将主 URL 写入配置。
  • 我的理解是 Flintrock 应该在没有任何进一步干预的情况下进行配置。但是如果我没记错的话,this(从官方 git repo 链接)显示了类似的问题。也许 Nicholas Chammas 或更熟悉 Flintrock 的人能够对此有所了解。也许你可以检查SPARK_HOME/conf/spark-defaults.confSPARK_CONF_DIR/spark-defaults.conf
  • @user8371915 检查什么? spark-defaults.conf 仅包含 spark.jars.packages org.apache.hadoop:hadoop-aws:2.7.3

标签: python apache-spark amazon-ec2 pyspark


【解决方案1】:

默认情况下,spark-submit 以本地模式启动 Spark。有效的是通过--master spark://<masterURL>:7077 指定主节点并将--num-executors 设置为至少工作节点的数量,具体取决于集群配置。

此外,在这种情况下,集群的每个节点都需要文件的完整本地副本。起初这出乎我的意料,因为我认为 Spark 会通过网络自动将文件的分区分发给工作人员。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-09-20
    • 2018-12-06
    • 1970-01-01
    • 1970-01-01
    • 2016-11-16
    • 1970-01-01
    • 2021-06-17
    • 1970-01-01
    相关资源
    最近更新 更多