【问题标题】:Multiprocessing python not running in parallel多处理python没有并行运行
【发布时间】:2015-04-28 22:10:42
【问题描述】:

我一直在尝试使用 python 中的多处理模块来实现计算成本高昂的任务的并行性。

我能够执行我的代码,但它不能并行运行。我一直在阅读多处理的手册页和论坛,以了解它为什么不起作用,但我还没有弄清楚。

我认为这个问题可能与执行我创建和导入的其他模块的某种锁定有关。

这是我的代码:

main.py:

##import my modules
import prepare_data
import filter_part
import wrapper_part
import utils
from myClasses import ML_set
from myClasses import data_instance

n_proc = 5

def main():
    if __name__ == '__main__':
        ##only main process should run this
        data = prepare_data.import_data() ##read data from file  
        data = prepare_data.remove_and_correct_outliers(data)
        data = prepare_data.normalize_data_range(data)
        features = filter_part.filter_features(data)

        start_t = time.time()
        ##parallelism will be used on this part
        best_subset = wrapper_part.wrapper(n_proc, data, features)

        print time.time() - start_t


main()

wrapper_part.py:

##my modules
from myClasses import ML_set
from myClasses import data_instance
import utils

def wrapper(n_proc, data, features):

    p_work_list = utils.divide_features(n_proc-1, features)
    n_train, n_test = utils.divide_data(data)

    workers = []

    for i in range(0,n_proc-1):
        print "sending process:", i
        p = mp.Process(target=worker_classification, args=(i, p_work_list[i], data, features, n_train, n_test))
        workers.append(p)
        p.start()

    for worker in workers:
        print "waiting for join from worker"
        worker.join()


    return


def worker_classification(id, work_list, data, features, n_train, n_test):
    print "Worker ", id, " starting..."
    best_acc = 0
    best_subset = []
    while (work_list != []):
        test_subset = work_list[0]
        del(work_list[0])
        train_set, test_set = utils.cut_dataset(n_train, n_test, data, test_subset)
        _, acc = classification_decision_tree(train_set, test_set)
        if acc > best_acc:
            best_acc = acc
            best_subset = test_subset
    print id, " found best subset ->  ", best_subset, " with accuracy: ", best_acc

所有其他模块不使用多处理模块并且工作正常。 在这个阶段,我只是在测试并行性,甚至没有试图取回结果,因此进程之间没有任何通信,也没有共享内存变量。 每个进程都会使用一些变量,但是据我所知,它们是在生成进程之前定义的,我相信每个进程都有自己的变量副本。

作为 5 个进程的输出,我得到了这个:

importing data from file...
sending process: 0
sending process: 1
Worker  0  starting...
0  found best subset ->   [2313]  with accuracy:  60.41
sending process: 2
Worker  1  starting...
1  found best subset ->   [3055]  with accuracy:  60.75
sending process: 3
Worker  2  starting...
2  found best subset ->   [3977]  with accuracy:  62.8
waiting for join from worker
waiting for join from worker
waiting for join from worker
waiting for join from worker
Worker  3  starting...
3  found best subset ->   [5770]  with accuracy:  60.07
55.4430000782

4 个进程执行并行部分大约需要 55 秒。仅使用 1 个进程进行测试,执行时间为 16 秒:

importing data from file...
sending process: 0
waiting for join from worker
Worker  0  starting...
0  found best subset ->   [5870]  with accuracy:  63.32
16.4409999847

我在 python 2.7 和 windows 8 上运行它

编辑

我在 ubuntu 上测试了我的代码,它工作正常,我猜它是 windows 8 和 python 的问题。 这是 ubuntu 上的输出:

importing data from file...
size trainset:  792  size testset:  302
sending process: 0
sending process: 1
Worker  0  starting...
sending process: 2
Worker  1  starting...
sending process: 3
Worker  2  starting...
waiting for join from worker
Worker  3  starting...
2  found best subset ->   [5199]  with accuracy:  60.93
1  found best subset ->   [3198]  with accuracy:  60.93
0  found best subset ->   [1657]  with accuracy:  61.26
waiting for join from worker
waiting for join from worker
waiting for join from worker
3  found best subset ->   [5985]  with accuracy:  62.25
6.1428809166

我将从现在开始使用ubuntu进行测试,但是我想知道为什么代码在windows上不起作用。

【问题讨论】:

  • 当您使用mp.Process 生成每个进程时,作为args 传递的数据有多大?不确定这是否重要。此外,是否每个工作人员都必须以某种方式对数据进行分区(utils.cut_dataset)?如果是这样,那是工作中计算成本高昂的部分吗?如果是这样,也许您可​​以在将工作委派给他们之前进行拆分(根据我的经验,这将是一种更常见的模式)。最后,del(work_list[0]) 引起了我的注意。这份清单很大吗?如果是这样,从前端删除东西可能会很昂贵。另一种方法是先反转它,然后从末端弹出。
  • 数据可以被描绘成一个相当大的矩阵,比如 1100 * N 个浮点数。其中 N 是 300~400 左右的数字。每个流程都需要所有数据。在这个例子中,只是为了测试并行性,我只是执行问题的第一步。但是,当我让一切正常工作时,我的想法是每个工人在 while (work_list != []) 中找到的每个有用的集合都将与所有其他可能性结合起来。这是计算成本高的部分。感谢您提供有关我如何从 work_list 中删除工作的提示。

标签: python parallel-processing multiprocessing python-multiprocessing


【解决方案1】:

请务必阅读 multiprocessing 手册中的 Windows 指南:https://docs.python.org/2/library/multiprocessing.html#windows

特别是“安全导入主模块”:

相反,应该通过使用来保护程序的“入口点” if __name__ == '__main__':如下:

您在上面显示的第一个代码 sn-p 中违反了此规则,所以我没有进一步查看。希望您观察到的问题的解决方案与包含此保护一样简单。

这很重要的原因是:在类 Unix 系统上,子进程是通过分叉创建的。在这种情况下,操作系统会创建创建分叉的进程的精确副本。也就是说,所有状态都是由孩子从父母那里继承的。例如,这意味着定义了所有函数和类。

在 Windows 上,没有这样的系统调用。 Python 需要执行相当繁重的任务,即在子进程中创建新的 Python 解释器会话,并重新创建(逐步)父进程的状态。例如,所有函数和类都需要重新定义。这就是为什么沉重的import 机器在 Windows 上的 Python 多处理子引擎下运行。当子模块导入主模块时,该机制启动。在您的情况下,这意味着在孩子中调用main()!当然,你不希望这样。

您可能会觉得这很乏味。我觉得令人印象深刻的是multiprocessing 模块设法为两个非常不同的平台提供相同功能的接口。确实,就进程处理而言,符合 POSIX 的操作系统和 Windows 是如此不同,因此很难想出适用于两者的抽象。

【讨论】:

  • 感谢您的回复。我正在使用 if name == 'main': 它在主要部分中定义。我试图在进程产生之前在包装函数内使用它。但是,如果有保护,那么所有进程都会开始执行 main.py 中调用的函数。我知道 Windows 上不存在分叉,但是我认为它不需要重新创建所有步骤。由于在 Windows 上创建其他进程非常昂贵,我从现在开始使用 unix 系统。
  • 我不太明白:保护对main() 的调用是否解决了您在Windows 上的问题?您应该注意,虽然流程创建确实可能是一件复杂的事情,但在许多情况下它当然是值得的。该过程执行的工作必须比创建过程复杂得多。然后它得到了回报。另一方面,如果任务微不足道,您不希望将任务外包给另一个进程。
  • 在 main() 上调用保护并没有解决问题。我的代码仍然不能在 Windows 上并行运行。然而,使用那里的保护使得 main() 上的代码仅由一个进程执行成为可能。我的代码的主要部分需要大约 30 秒才能执行,因为数据量非常大。如果在 Windows 中,每个创建的进程都需要重新创建这 30 秒才能执行,那么这将比使用 unix fork 花费更多时间。
  • 如果您的子进程需要 30 秒 工作函数开始工作,那么您的架构设计不当。要么有太多的数据通过管道从父级传输到子级,要么你仍然有太多由纯导入触发的工作。进口可能很昂贵,是的。但在常见情况下,我们在这里谈论的是毫秒。我观察到的最极端的情况是通过 NFS 导入 scipy/numpy/matplotlib 堆栈,这涉及到数百或数千个文件读取。即使在慢速网络上,这也可以在 10 秒内完成。
猜你喜欢
  • 2022-01-25
  • 1970-01-01
  • 2022-09-23
  • 2021-09-02
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-09-19
  • 1970-01-01
相关资源
最近更新 更多