【问题标题】:How to run multiple concurrent jobs in Spark using python multiprocessing如何使用 python 多处理在 Spark 中运行多个并发作业
【发布时间】:2015-11-09 17:12:41
【问题描述】:

我在笔记本电脑上设置了一个 Spark on YARN 集群,但使用 python 多处理在 Spark 中运行多个并发作业时遇到问题。我在纱线客户端模式下运行。我尝试了两种方法来实现这一点:

  • 设置单个 SparkContext 并创建多个进程来提交作业。此方法不起作用,程序崩溃。我猜单个 SparkContext 不支持 python 多进程
  • 对于每个进程,设置一个 SparkContext 并提交作业。在这种情况下,作业成功提交到 YARN,但作业是串行运行的,一次只运行一个作业,其余作业在队列中。是否可以同时启动多个作业?

    更新设置

    纱线:

  • yarn.nodemanager.resource.cpu-vcores 8

  • yarn.nodemanager.resource.memory-mb 11264
  • yarn.scheduler.maximum-allocation-vcores 1

    火花:

  • SPARK_EXECUTOR_CORES=1

  • SPARK_EXECUTOR_INSTANCES=2
  • SPARK_DRIVER_MEMORY=1G
  • spark.scheduler.mode = 公平
  • spark.dynamicAllocation.enabled = true
  • spark.shuffle.service.enabled = true

yarn 一次只能运行一项工作,使用 3 个容器、3 个 vcore、3GB 内存。所以有足够的 vcore 和 ram 可用于其他作业,但它们没有运行

【问题讨论】:

    标签: python-2.7 apache-spark hadoop-yarn pyspark


    【解决方案1】:

    您有多少 CPU,每个作业需要多少 CPU? YARN 将调度作业并在集群上分配它可以分配的内容:如果您的作业需要 8 个 CPU,而您的系统只有 8 个 CPU,那么其他作业将排队并串行运行。

    如果您为每个作业请求 4 个,那么您将看到 2 个作业在任何时候并行运行。

    【讨论】:

    • 我如何为每个工作请求 4 芯纱线?我已经在上面发布了我的设置,你可以看看它是否有意义?谢谢!
    • 您可以将 --total-executor-cores 4 传递给提交作业以限制/增加要在集群上使用的核心数
    【解决方案2】:

    我找到了解决方案https://stackoverflow.com/a/33012538/957352

    对于单机集群,

    在文件中

    /etc/hadoop/conf/capacity-scheduler.xml

    改变了属性

    yarn.scheduler.capacity.maximum-am-resource-percent 从 0.1 到 0.5。

    【讨论】:

      【解决方案3】:

      我遇到了和你一样的问题,我通过在 pyspark 中设置 .config("spark.executor.cores", '1') 解决了这个问题。 这是我的代码:

      import os,sys
      import numpy as np
      import pyspark
      from multiprocessing import Pool
      from pyspark.sql import SparkSession
      import time
      def train(db):
      
          print(db)
          spark = SparkSession \
              .builder \
              .appName("scene_"+str(db)) \
              .config("spark.executor.cores", '1') \
              .getOrCreate()
          print(spark.createDataFrame([[1.0],[2.0]],['test_column']).collect())
      
      if __name__ == '__main__':
          p = Pool(10)
          for db in range(10):
              p.apply_async(train,args=(db,))    
          p.close()
          p.join()
          #train(4)
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2015-10-23
        • 2017-03-29
        • 2016-03-03
        • 1970-01-01
        相关资源
        最近更新 更多