【问题标题】:PySpark distributing module importsPySpark 分发模块导入
【发布时间】:2016-12-14 18:36:19
【问题描述】:

在过去的几天里,我一直在努力了解 Spark 执行器如何知道如何在导入时使用给定名称的模块。我正在研究 AWS EMR。情况: 我通过键入在 EMR 上初始化 pyspark

pyspark --master 纱线

然后,在 pyspark 中,

import numpy as np ## notice the naming

def myfun(x):
    n = np.random.rand(1)
    return x*n

rdd = sc.parallelize([1,2,3,4], 2)
rdd.map(lambda x: myfun(x)).collect() ## works!

我的理解是,当我导入numpy as np时,主节点是唯一一个通过np导入识别numpy的节点。但是,对于 EMR 集群(2 个工作节点),如果我在 rdd 上运行 map 函数,驱动程序会将该函数发送到工作节点以执行列表中每个项目(每个分区)的函数,并且返回成功结果。

我的问题是:工人如何知道 numpy 应该作为 np 导入?每个worker都已经安装了numpy,但是我没有明确定义每个节点导入模块as np的方法。

有关依赖关系的更多详细信息,请参阅 Cloudera 的以下帖子: http://blog.cloudera.com/blog/2015/09/how-to-prepare-your-apache-hadoop-cluster-for-pyspark-jobs/

复杂依赖下,他们有一个示例(代码),其中在每个节点上显式导入了 pandas 模块。

我听说的一个理论是驱动程序分发在 pyspark 交互式 shell 中传递的所有代码。我对此表示怀疑。我提出来反驳这个想法的例子是,如果我在主节点上输入:

print "hello"

每个工作节点是否也打印“hello”?我不这么认为。但也许我错了。

【问题讨论】:

    标签: python apache-spark pyspark


    【解决方案1】:

    当函数被序列化时,会有一个number of objects is being saved:

    • 代码
    • 全局
    • 默认值
    • closure
    • 字典

    以后可以用来恢复给定功能所需的完整环境。

    由于函数引用了np,因此可以从其代码中提取:

    from pyspark.cloudpickle import CloudPickler
    
    CloudPickler.extract_code_globals(myfun.__code__)
    ## {'np'}
    

    并且绑定可以从它的globals中提取:

    myfun.__globals__['np']
    ## <module 'numpy' from ...
    

    所以序列化的闭包(广义上)捕获了恢复环境所需的所有信息。当然,在闭包中访问的所有模块都必须在每台工作机器上都是可导入的。

    其他一切都只是读写机器。

    附带说明,主节点不应执行任何 Python 代码。它负责不运行应用程序代码的资源分配。

    【讨论】:

    • 太好了,感谢您的意见。话虽如此,这是否意味着像print "hello" 这样的代码片段会在每个工作人员处执行?还是忽略,只执行函数运行所必需的代码?
    • 只有闭包捕获的代码才会在worker上实际执行。其他所有内容都被忽略。
    猜你喜欢
    • 2021-11-30
    • 2017-07-04
    • 2019-05-12
    • 2020-10-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-09
    相关资源
    最近更新 更多