【问题标题】:Distributed Python On Multi Machine Cluster多机集群上的分布式 Python
【发布时间】:2019-05-19 19:21:24
【问题描述】:

以下是要求-:

class MultiMachineDoWork:

    def Function1(self, A, B):  
        return A+B

    def Function2(self, A, B):  
        return A*B 

    def Function3(self, A, B):  
        return A**B  

    def Function4():  
        X = MultiMachineDoWork.Function1(5,10)
        Y = MultiMachineDoWork.Function2(5,10)
        Z = MultiMachineDoWork.Function3(5,10)
        return X+Y+Z

假设 Function1、Function2 和 Function3 每个都需要很长时间,最好分别在机器 L、M 和 N 上并行运行它们在分布式模型上。 函数4可以运行在机器P上,机器P可以收集结果并合并。

MapReduce 工作在某种类似的概念上,但在数据的不同部分运行相同的功能... Dask / Ray / Celery 在这个案例研究中是否有用...

如果必须构建自定义解决方案,解决方案应该如何进行以及如何进行......

使用 Dask 本地集群的 Pydoop/Spark?


真实案例研究 - 用于 ML 分类的集成模型。一个用于 RandomForest 的函数,一个用于支持向量,一个用于 XGBoost。所有运行在同一个数据集上...

【问题讨论】:

  • 真的很惊喜!!这个问题投了反对票..我想我是世界上唯一一个不知道这种微不足道的建筑问题的人....
  • 正如您提到的 Pydoop,这里已经对 NN 训练进行了一定程度的实验:github.com/crs4/pydoop-examples/tree/master/examples/pydeep。这目前处于非活动状态,可能无法在下一个 Pydoop 版本中正常工作,但它应该提供一些有用的指针。

标签: python machine-learning distributed-computing


【解决方案1】:

可以使用 python 中的各种框架来完成跨多个机器/节点分配任务/功能/计算。最常见和最广泛使用的是 Ray、Dask 和 PySpark,使用其中的哪一种实际上取决于用例。

对于简单的函数/任务分发,您可以使用 Ray 库(@ray.remote)进行分发,然后使用 get 方法将结果集成/计算回来。同样可以通过 dask 完成。

https://rise.cs.berkeley.edu/blog/modern-parallel-and-distributed-python-a-quick-tutorial-on-ray/

我更喜欢 Spark/Pyspark,当您处理大型数据集并且想要执行某种 ETL 操作以将庞大的数据集分布到多个节点然后执行一些转换或操作时在上面。注意 Spark 或 mapreduce 概念假设您将计算带到数据上,它将在不同的数据子集上执行相同/相似的任务,并最终执行一些聚合(涉及洗牌)。

Spark/Pyspark 通过其内置的随机森林或梯度提升树算法支持集成。但是 spark(开箱即用)目前不支持在单独的节点/执行器上训练单独的模型(随机森林、梯度树、逻辑回归等)。尽管可以通过自定义的 spark 代码来实现,就像他们在内部为随机森林所做的那样(训练多个决策树)。

集成的真实场景可以使用 dask 和 sklearn 轻松完成。 Dask 与 scikit-learn xgboost 等很好地集成,可以使用 joblib 上下文管理器跨分布式集群节点/worker 执行并行计算。

现在对于集成场景,您可以使用 scikit-learn 的不同模型/算法(RandomForest、SGD、SVM、Logistic Regression)并使用投票分类器将多个不同的模型(即子估计器)组合成一个模型,这(理想情况下)比任何单独的模型都强(即集成概念的基础)。

使用 Dask 将在集群中的不同机器上训练各个子估计器/模型。

https://docs.dask.org/en/latest/use-cases.html

代码看起来很高级-

classifiers = [
    ('sgd', SGDClassifier(max_iter=1000)),
    ('logisticregression', LogisticRegression()),
    ('xgboost', XGBClassifier()
    ('svc', SVC(gamma='auto')),
]
clf = VotingClassifier(classifiers) 

with joblib.parallel_backend("dask"):
    clf.fit(X, y)

** 以上也可以通过Ray/Spark.etc等其他分布式框架实现,但需要更多的自定义编码。

希望这些信息对您有所帮助!

【讨论】:

  • 感谢 Kuntal 验证了我的思考过程!!
  • 在上面的例子中,唯一的问题是它很难隔离功能......我们只是调用 dask 然后有点像假设并行性以案例场景要求的方式工作......我是乐观地认为某些库将使我们能够绝对控制执行的粒度......
猜你喜欢
  • 2011-03-21
  • 1970-01-01
  • 2015-01-08
  • 2021-01-08
  • 1970-01-01
  • 1970-01-01
  • 2018-09-29
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多