【问题标题】:How to get the number of workers(executors) in PySpark?如何获取 PySpark 中的工人(执行者)数量?
【发布时间】:2016-12-04 07:03:54
【问题描述】:

我需要使用这个参数,那么如何获取worker的数量呢? 就像在 Scala 中一样,我可以调用 sc.getExecutorMemoryStatus 来获取可用的工人数量。但是在 PySpark 中,似乎没有公开 API 来获取这个数字。

【问题讨论】:

  • 我不认为这个问题是另一个问题的重复。我想知道在 Mesos 上运行时,甚至在创建任何 rdds 之前,有多少执行程序可供驱动程序使用。很烦人,但我最终解析了 ui: import pandas as pd df = pd.read_html("localhost:4040/executors")[1] len(df[df['Executor ID'] != 'driver'])
  • 快速回答,获取核心数:sc._jsc.sc().getExecutorMemoryStatus().size()
  • 投票重新开放,因为最初的问题是指 EMR 节点,而这指的是 Spark 执行器。虽然这个问题回答了前者,但前者不太通用。附言这里接受的答案很简单,而且是错误的——无论是结果还是假设。

标签: scala apache-spark pyspark


【解决方案1】:

在 scala 中,getExecutorStorageStatusgetExecutorMemoryStatus 都返回包括驱动程序在内的执行器数量。 像下面的例子 sn-p

/** Method that just returns the current active/registered executors
        * excluding the driver.
        * @param sc The spark context to retrieve registered executors.
        * @return a list of executors each in the form of host:port.
        */
       def currentActiveExecutors(sc: SparkContext): Seq[String] = {
         val allExecutors = sc.getExecutorMemoryStatus.map(_._1)
         val driverHost: String = sc.getConf.get("spark.driver.host")
         allExecutors.filter(! _.split(":")(0).equals(driverHost)).toList
       }

But In python api it was not implemented

@DanielDarabos answer 也证实了这一点。

相当于python中的this...

sc.getConf().get("spark.executor.instances")

编辑(python):

%python
sc = spark._jsc.sc() 
n_workers =  len([executor.host() for executor in sc.statusTracker().getExecutorInfos() ]) -1

print(n_workers)

正如 Danny 在评论中提到的,如果您想交叉验证它们,您可以使用以下语句。

%python

sc = spark._jsc.sc() 

result1 = sc.getExecutorMemoryStatus().keys() # will print all the executors + driver available

result2 = len([executor.host() for executor in sc.statusTracker().getExecutorInfos() ]) -1

print(result1, end ='\n')
print(result2)

示例结果:

Set(10.172.249.9:46467)
0

【讨论】:

  • 抱歉回复晚了,但在我的 Pyspark 中,它显示“SparkContext 对象没有属性 getConf”
  • python 语法是sc._conf.get('spark.executor.instances')。它返回一个字符串。
  • 配置!=现实:p
  • 既然这个问题已经结束,这个答案是错误的,我就在这里回答:sc = spark_session._jsc.sc()result1 = sc.getExecutorMemoryStatus().keys()result2 = [executor.host() for executor in sc.statusTracker().getExecutorInfos()]
  • @Chiel 和 Danny 我编辑/更正了答案。谢谢
【解决方案2】:

您也可以通过 Spark REST API 获取执行器的数量:https://spark.apache.org/docs/latest/monitoring.html#rest-api

您可以检查/applications/[app-id]/executors,它返回给定应用程序的所有活动执行器的列表


PS: 当spark.dynamicAllocation.enabledtrue 时,spark.executor.instances 可能不等于当前可用的执行器,但此 API 始终返回正确的值。

【讨论】:

    猜你喜欢
    • 2022-12-21
    • 1970-01-01
    • 2016-01-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多