【发布时间】:2015-04-28 22:10:42
【问题描述】:
我一直在尝试使用 python 中的多处理模块来实现计算成本高昂的任务的并行性。
我能够执行我的代码,但它不能并行运行。我一直在阅读多处理的手册页和论坛,以了解它为什么不起作用,但我还没有弄清楚。
我认为这个问题可能与执行我创建和导入的其他模块的某种锁定有关。
这是我的代码:
main.py:
##import my modules
import prepare_data
import filter_part
import wrapper_part
import utils
from myClasses import ML_set
from myClasses import data_instance
n_proc = 5
def main():
if __name__ == '__main__':
##only main process should run this
data = prepare_data.import_data() ##read data from file
data = prepare_data.remove_and_correct_outliers(data)
data = prepare_data.normalize_data_range(data)
features = filter_part.filter_features(data)
start_t = time.time()
##parallelism will be used on this part
best_subset = wrapper_part.wrapper(n_proc, data, features)
print time.time() - start_t
main()
wrapper_part.py:
##my modules
from myClasses import ML_set
from myClasses import data_instance
import utils
def wrapper(n_proc, data, features):
p_work_list = utils.divide_features(n_proc-1, features)
n_train, n_test = utils.divide_data(data)
workers = []
for i in range(0,n_proc-1):
print "sending process:", i
p = mp.Process(target=worker_classification, args=(i, p_work_list[i], data, features, n_train, n_test))
workers.append(p)
p.start()
for worker in workers:
print "waiting for join from worker"
worker.join()
return
def worker_classification(id, work_list, data, features, n_train, n_test):
print "Worker ", id, " starting..."
best_acc = 0
best_subset = []
while (work_list != []):
test_subset = work_list[0]
del(work_list[0])
train_set, test_set = utils.cut_dataset(n_train, n_test, data, test_subset)
_, acc = classification_decision_tree(train_set, test_set)
if acc > best_acc:
best_acc = acc
best_subset = test_subset
print id, " found best subset -> ", best_subset, " with accuracy: ", best_acc
所有其他模块不使用多处理模块并且工作正常。 在这个阶段,我只是在测试并行性,甚至没有试图取回结果,因此进程之间没有任何通信,也没有共享内存变量。 每个进程都会使用一些变量,但是据我所知,它们是在生成进程之前定义的,我相信每个进程都有自己的变量副本。
作为 5 个进程的输出,我得到了这个:
importing data from file...
sending process: 0
sending process: 1
Worker 0 starting...
0 found best subset -> [2313] with accuracy: 60.41
sending process: 2
Worker 1 starting...
1 found best subset -> [3055] with accuracy: 60.75
sending process: 3
Worker 2 starting...
2 found best subset -> [3977] with accuracy: 62.8
waiting for join from worker
waiting for join from worker
waiting for join from worker
waiting for join from worker
Worker 3 starting...
3 found best subset -> [5770] with accuracy: 60.07
55.4430000782
4 个进程执行并行部分大约需要 55 秒。仅使用 1 个进程进行测试,执行时间为 16 秒:
importing data from file...
sending process: 0
waiting for join from worker
Worker 0 starting...
0 found best subset -> [5870] with accuracy: 63.32
16.4409999847
我在 python 2.7 和 windows 8 上运行它
编辑
我在 ubuntu 上测试了我的代码,它工作正常,我猜它是 windows 8 和 python 的问题。 这是 ubuntu 上的输出:
importing data from file...
size trainset: 792 size testset: 302
sending process: 0
sending process: 1
Worker 0 starting...
sending process: 2
Worker 1 starting...
sending process: 3
Worker 2 starting...
waiting for join from worker
Worker 3 starting...
2 found best subset -> [5199] with accuracy: 60.93
1 found best subset -> [3198] with accuracy: 60.93
0 found best subset -> [1657] with accuracy: 61.26
waiting for join from worker
waiting for join from worker
waiting for join from worker
3 found best subset -> [5985] with accuracy: 62.25
6.1428809166
我将从现在开始使用ubuntu进行测试,但是我想知道为什么代码在windows上不起作用。
【问题讨论】:
-
当您使用
mp.Process生成每个进程时,作为args传递的数据有多大?不确定这是否重要。此外,是否每个工作人员都必须以某种方式对数据进行分区(utils.cut_dataset)?如果是这样,那是工作中计算成本高昂的部分吗?如果是这样,也许您可以在将工作委派给他们之前进行拆分(根据我的经验,这将是一种更常见的模式)。最后,del(work_list[0])引起了我的注意。这份清单很大吗?如果是这样,从前端删除东西可能会很昂贵。另一种方法是先反转它,然后从末端弹出。 -
数据可以被描绘成一个相当大的矩阵,比如 1100 * N 个浮点数。其中 N 是 300~400 左右的数字。每个流程都需要所有数据。在这个例子中,只是为了测试并行性,我只是执行问题的第一步。但是,当我让一切正常工作时,我的想法是每个工人在 while (work_list != []) 中找到的每个有用的集合都将与所有其他可能性结合起来。这是计算成本高的部分。感谢您提供有关我如何从 work_list 中删除工作的提示。
标签: python parallel-processing multiprocessing python-multiprocessing