【发布时间】:2016-06-08 00:34:11
【问题描述】:
我有以下代码,它利用多处理来遍历一个大列表并找到匹配项。一旦在任何一个进程中找到匹配项,如何让所有进程停止?我看过一些例子,但我似乎都不适合我在这里所做的事情。
#!/usr/bin/env python3.5
import sys, itertools, multiprocessing, functools
alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ12234567890!@#$%^&*?,()-=+[]/;"
num_parts = 4
part_size = len(alphabet) // num_parts
def do_job(first_bits):
for x in itertools.product(first_bits, *itertools.repeat(alphabet, num_parts-1)):
# CHECK FOR MATCH HERE
print(''.join(x))
# EXIT ALL PROCESSES IF MATCH FOUND
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=4)
results = []
for i in range(num_parts):
if i == num_parts - 1:
first_bit = alphabet[part_size * i :]
else:
first_bit = alphabet[part_size * i : part_size * (i+1)]
pool.apply_async(do_job, (first_bit,))
pool.close()
pool.join()
感谢您的宝贵时间。
更新 1:
我已经实现了@ShadowRanger 的伟大方法中建议的更改,并且它几乎按照我想要的方式工作。因此,我添加了一些日志记录以指示进度,并在其中放置了一个“测试”键以进行匹配。 我希望能够独立于 num_parts 增加/减少 iNumberOfProcessors。在这个阶段,当我将它们都设置为 4 时,一切都按预期工作,启动了 4 个进程(控制台额外运行了一个)。当我更改 iNumberOfProcessors = 6 时,有 6 个进程启动,但只有其中一个进程有任何 CPU 使用率。所以看起来2是空闲的。就像我之前的解决方案一样,我能够在不增加 num_parts 的情况下将核心数量设置得更高,并且所有进程都会被使用。
我不确定如何重构这种新方法以提供相同的功能。您能否看一下并给我一些指导,以便能够相互独立地设置 iNumberOfProcessors 和 num_parts 并且仍然使用所有进程?
这是更新后的代码:
#!/usr/bin/env python3.5
import sys, itertools, multiprocessing, functools
alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ12234567890!@#$%^&*?,()-=+[]/;"
num_parts = 4
part_size = len(alphabet) // num_parts
iProgressInterval = 10000
iNumberOfProcessors = 6
def do_job(first_bits):
iAttemptNumber = 0
iLastProgressUpdate = 0
for x in itertools.product(first_bits, *itertools.repeat(alphabet, num_parts-1)):
sKey = ''.join(x)
iAttemptNumber = iAttemptNumber + 1
if iLastProgressUpdate + iProgressInterval <= iAttemptNumber:
iLastProgressUpdate = iLastProgressUpdate + iProgressInterval
print("Attempt#:", iAttemptNumber, "Key:", sKey)
if sKey == 'test':
print("KEY FOUND!! Attempt#:", iAttemptNumber, "Key:", sKey)
return True
def get_part(i):
if i == num_parts - 1:
first_bit = alphabet[part_size * i :]
else:
first_bit = alphabet[part_size * i : part_size * (i+1)]
return first_bit
if __name__ == '__main__':
# with statement with Py3 multiprocessing.Pool terminates when block exits
with multiprocessing.Pool(processes = iNumberOfProcessors) as pool:
# Don't need special case for final block; slices can
for gotmatch in pool.imap_unordered(do_job, map(get_part, range(num_parts))):
if gotmatch:
break
else:
print("No matches found")
更新 2:
好的,这是我尝试@noxdafox 建议的尝试。我根据他的建议提供的链接整理了以下内容。不幸的是,当我运行它时,我得到了错误:
... 第 322 行,在 apply_async 中 raise ValueError("池未运行") ValueError: 池未运行
谁能给我一些关于如何让它工作的指导。
基本上问题是我第一次尝试进行多处理但不支持在找到匹配项后取消所有进程。
我的第二次尝试(基于@ShadowRanger 的建议)解决了这个问题,但破坏了能够独立扩展进程数量和 num_parts 大小的功能,这是我第一次尝试可以做到的。
我的第三次尝试(基于@noxdafox 的建议)抛出了上述错误。
如果有人能给我一些关于如何维护我第一次尝试的功能的指导(能够独立缩放进程数量和 num_parts 大小),并添加一旦找到匹配项就取消所有进程的功能不胜感激。
感谢您的宝贵时间。
这是我根据@noxdafox 建议第三次尝试的代码:
#!/usr/bin/env python3.5
import sys, itertools, multiprocessing, functools
alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ12234567890!@#$%^&*?,()-=+[]/;"
num_parts = 4
part_size = len(alphabet) // num_parts
iProgressInterval = 10000
iNumberOfProcessors = 4
def find_match(first_bits):
iAttemptNumber = 0
iLastProgressUpdate = 0
for x in itertools.product(first_bits, *itertools.repeat(alphabet, num_parts-1)):
sKey = ''.join(x)
iAttemptNumber = iAttemptNumber + 1
if iLastProgressUpdate + iProgressInterval <= iAttemptNumber:
iLastProgressUpdate = iLastProgressUpdate + iProgressInterval
print("Attempt#:", iAttemptNumber, "Key:", sKey)
if sKey == 'test':
print("KEY FOUND!! Attempt#:", iAttemptNumber, "Key:", sKey)
return True
def get_part(i):
if i == num_parts - 1:
first_bit = alphabet[part_size * i :]
else:
first_bit = alphabet[part_size * i : part_size * (i+1)]
return first_bit
def grouper(iterable, n, fillvalue=None):
args = [iter(iterable)] * n
return itertools.zip_longest(*args, fillvalue=fillvalue)
class Worker():
def __init__(self, workers):
self.workers = workers
def callback(self, result):
if result:
self.pool.terminate()
def do_job(self):
print(self.workers)
pool = multiprocessing.Pool(processes=self.workers)
for part in grouper(alphabet, part_size):
pool.apply_async(do_job, (part,), callback=self.callback)
pool.close()
pool.join()
print("All Jobs Queued")
if __name__ == '__main__':
w = Worker(4)
w.do_job()
【问题讨论】:
标签: python python-3.x multiprocessing