【问题标题】:Can Spark parallelize nested?Spark 可以并行化嵌套吗?
【发布时间】:2016-12-27 21:01:12
【问题描述】:

我在一个 60 代码 EC2 实例上运行以下代码

from pyspark import SparkContext
import time, md5
workers_count = 10
sc = SparkContext("local[%s]" % workers_count, "App Name")
max_num = 50000000
start_time = time.time()
first_item = sc.parallelize(xrange(max_num)).map(lambda n: (n, md5.md5(str(n)).hexdigest())).reduce(lambda a,b: a if a[1] > b[1] else b)
end_time = time.time()
print("sorting took took %s seconds with %s workers" % (end_time-start_time, workers_count))

使用 1 名工人需要 52 秒。 有 2 名工人需要 26 秒。 有 4 名工人需要 13 秒 有 8 名工人需要 6 秒 16 名或更多工人需要 4 秒(或多或少)

上面的代码是inner-part,需要运行几百万次

从上面我了解到并行化提高性能的程度是有限度的,这没关系,但由于我使用的是 60 核机器,我希望它能够充分利用内核,我希望每个循环使用 8 个核心,同时运行 7 个循环。

是否可以为每个函数定义它将使用多少个内核?

【问题讨论】:

    标签: python apache-spark amazon-ec2 pyspark


    【解决方案1】:

    Spark 可以并行化嵌套吗?

    它不能。 Spark 并行执行必须是平坦的。

    您可以使用单独的线程提交多个并发作业。例如使用带有线程的joblibnumSlices

    import hashlib
    from joblib import Parallel, delayed
    
    def run(sc, numSlices=8):
        return sc.range(0, max_num, numSlices=numSlices) \
            .map(lambda n: (n, hashlib.md5(str(n)).hexdigest())) \
            .reduce(lambda a,b: a if a[1] > b[1] else b) 
    
    Parallel(n_jobs=7, backend="threading")(delayed(run)(sc) for _ in range(7))
    

    【讨论】:

    • 我同意这将解决本地单机的问题,但 spark 的想法是它也可以与多节点集群一起使用,并且此解决方案不会扩展 spark-way
    • 只要集群中有足够的资源,它就会按预期工作。
    • 问题是关于spark,而不是关于如何使用其他方法来完成这项工作,而且我还认为如果我添加另一个级别的嵌套,事情可能会变得复杂和丑陋
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-03-16
    • 2016-09-11
    • 1970-01-01
    • 1970-01-01
    • 2017-11-05
    • 2011-11-30
    相关资源
    最近更新 更多