【问题标题】:How to get all pool.apply_async processes to stop once any one process has found a match in python一旦任何一个进程在python中找到匹配项,如何让所有pool.apply_async进程停止
【发布时间】:2016-06-08 00:34:11
【问题描述】:

我有以下代码,它利用多处理来遍历一个大列表并找到匹配项。一旦在任何一个进程中找到匹配项,如何让所有进程停止?我看过一些例子,但我似乎都不适合我在这里所做的事情。

#!/usr/bin/env python3.5
import sys, itertools, multiprocessing, functools

alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ12234567890!@#$%^&*?,()-=+[]/;"
num_parts = 4
part_size = len(alphabet) // num_parts

def do_job(first_bits):
    for x in itertools.product(first_bits, *itertools.repeat(alphabet, num_parts-1)):
        # CHECK FOR MATCH HERE
        print(''.join(x))
        # EXIT ALL PROCESSES IF MATCH FOUND

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=4)
    results = []

    for i in range(num_parts):
        if i == num_parts - 1:
            first_bit = alphabet[part_size * i :]
        else:
            first_bit = alphabet[part_size * i : part_size * (i+1)]
        pool.apply_async(do_job, (first_bit,))

    pool.close()
    pool.join()

感谢您的宝贵时间。

更新 1:

我已经实现了@ShadowRanger 的伟大方法中建议的更改,并且它几乎按照我想要的方式工作。因此,我添加了一些日志记录以指示进度,并在其中放置了一个“测试”键以进行匹配。 我希望能够独立于 num_parts 增加/减少 iNumberOfProcessors。在这个阶段,当我将它们都设置为 4 时,一切都按预期工作,启动了 4 个进程(控制台额外运行了一个)。当我更改 iNumberOfProcessors = 6 时,有 6 个进程启动,但只有其中一个进程有任何 CPU 使用率。所以看起来2是空闲的。就像我之前的解决方案一样,我能够在不增加 num_parts 的情况下将核心数量设置得更高,并且所有进程都会被使用。

我不确定如何重构这种新方法以提供相同的功能。您能否看一下并给我一些指导,以便能够相互独立地设置 iNumberOfProcessors 和 num_parts 并且仍然使用所有进程?

这是更新后的代码:

#!/usr/bin/env python3.5
import sys, itertools, multiprocessing, functools

alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ12234567890!@#$%^&*?,()-=+[]/;"
num_parts = 4
part_size = len(alphabet) // num_parts
iProgressInterval = 10000
iNumberOfProcessors = 6

def do_job(first_bits):
    iAttemptNumber = 0
    iLastProgressUpdate = 0
    for x in itertools.product(first_bits, *itertools.repeat(alphabet, num_parts-1)):
        sKey = ''.join(x)
        iAttemptNumber = iAttemptNumber + 1
        if iLastProgressUpdate + iProgressInterval <= iAttemptNumber:
            iLastProgressUpdate = iLastProgressUpdate + iProgressInterval
            print("Attempt#:", iAttemptNumber, "Key:", sKey)
        if sKey == 'test':
            print("KEY FOUND!! Attempt#:", iAttemptNumber, "Key:", sKey)
            return True

def get_part(i):
    if i == num_parts - 1:
        first_bit = alphabet[part_size * i :]
    else:
        first_bit = alphabet[part_size * i : part_size * (i+1)]
    return first_bit

if __name__ == '__main__':
    # with statement with Py3 multiprocessing.Pool terminates when block exits
    with multiprocessing.Pool(processes = iNumberOfProcessors) as pool:

        # Don't need special case for final block; slices can 
        for gotmatch in pool.imap_unordered(do_job, map(get_part, range(num_parts))):
             if gotmatch:
                 break
        else:
             print("No matches found")

更新 2:

好的,这是我尝试@noxdafox 建议的尝试。我根据他的建议提供的链接整理了以下内容。不幸的是,当我运行它时,我得到了错误:

... 第 322 行,在 apply_async 中 raise ValueError("池未运行") ValueError: 池未运行

谁能给我一些关于如何让它工作的指导。

基本上问题是我第一次尝试进行多处理但不支持在找到匹配项后取消所有进程。

我的第二次尝试(基于@ShadowRanger 的建议)解决了这个问题,但破坏了能够独立扩展进程数量和 num_parts 大小的功能,这是我第一次尝试可以做到的。

我的第三次尝试(基于@noxdafox 的建议)抛出了上述错误。

如果有人能给我一些关于如何维护我第一次尝试的功能的指导(能够独立缩放进程数量和 num_parts 大小),并添加一旦找到匹配项就取消所有进程的功能不胜感激。

感谢您的宝贵时间。

这是我根据@noxdafox 建议第三次尝试的代码:

#!/usr/bin/env python3.5
import sys, itertools, multiprocessing, functools

alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ12234567890!@#$%^&*?,()-=+[]/;"
num_parts = 4
part_size = len(alphabet) // num_parts
iProgressInterval = 10000
iNumberOfProcessors = 4


def find_match(first_bits):
    iAttemptNumber = 0
    iLastProgressUpdate = 0
    for x in itertools.product(first_bits, *itertools.repeat(alphabet, num_parts-1)):
        sKey = ''.join(x)
        iAttemptNumber = iAttemptNumber + 1
        if iLastProgressUpdate + iProgressInterval <= iAttemptNumber:
            iLastProgressUpdate = iLastProgressUpdate + iProgressInterval
            print("Attempt#:", iAttemptNumber, "Key:", sKey)
        if sKey == 'test':
            print("KEY FOUND!! Attempt#:", iAttemptNumber, "Key:", sKey)
            return True

def get_part(i):
    if i == num_parts - 1:
        first_bit = alphabet[part_size * i :]
    else:
        first_bit = alphabet[part_size * i : part_size * (i+1)]
    return first_bit

def grouper(iterable, n, fillvalue=None):
    args = [iter(iterable)] * n
    return itertools.zip_longest(*args, fillvalue=fillvalue)

class Worker():

    def __init__(self, workers):
        self.workers = workers

    def callback(self, result):
        if result:
            self.pool.terminate()

    def do_job(self):
        print(self.workers)
        pool = multiprocessing.Pool(processes=self.workers)
        for part in grouper(alphabet, part_size):
            pool.apply_async(do_job, (part,), callback=self.callback)
        pool.close()
        pool.join()
        print("All Jobs Queued")

if __name__ == '__main__':
    w = Worker(4)
    w.do_job()

【问题讨论】:

    标签: python python-3.x multiprocessing


    【解决方案1】:

    您可以查看this question 以查看解决您问题的实现示例。

    这也适用于 concurrent.futures 池。

    只需将 map 方法替换为 apply_async 并从调用者那里迭代您的列表。

    类似的东西。

    for part in grouper(alphabet, part_size):
        pool.apply_async(do_job, part, callback=self.callback)
    

    grouper recipe

    【讨论】:

    • 感谢您的回复。我遇到的第一个问题是石斑鱼。我看到有一个 itertools._grouper,但是当我尝试使用它时,我得到了错误:TypeError:必须是 itertools.groupby,而不是 str。 grouper 属于哪个库?
    • grouper 不是 Python 标准库中的函数。你可以在我提供的链接中找到它的食谱。
    • 对...不知道我是怎么弄错的...对不起...我尝试将池变量的初始化移动到 do_job 函数,但我仍然收到错误消息:Pool没有运行,有什么想法吗?
    • OK 让 grouper 工作,并在我最后一次尝试中更新了代码。但是仍然收到“池未运行”错误。关于问题所在有什么想法吗?
    • 请粘贴异常的整个回溯。还要确保如果 find_match 没有找到匹配返回 false 以确保回调不会在错误的时刻停止池。
    【解决方案2】:

    multiprocessing 并不是真正为取消任务而设计的,但您可以通过使用 pool.imap_unordered 并在遇到命中时终止池来针对您的特定情况进行模拟:

    def do_job(first_bits):
        for x in itertools.product(first_bits, *itertools.repeat(alphabet, num_parts-1)):
            # CHECK FOR MATCH HERE
            print(''.join(x))
            if match:
                return True
        # If we exit loop without a match, function implicitly returns falsy None for us
    # Factor out part getting to simplify imap_unordered use
    def get_part(i):
        if i == num_parts - 1:
            first_bit = alphabet[part_size * i :]
        else:
            first_bit = alphabet[part_size * i : part_size * (i+1)]
    
    if __name__ == '__main__':
        # with statement with Py3 multiprocessing.Pool terminates when block exits
        with multiprocessing.Pool(processes=4) as pool:
    
            # Don't need special case for final block; slices can 
            for gotmatch in pool.imap_unordered(do_job, map(get_part, range(num_parts))):
                 if gotmatch:
                     break
            else:
                 print("No matches found")
    

    这将为每个部分运行do_job,并尽可能快地返回结果。当工作人员返回 True 时,循环中断,Poolwith 语句退出,terminate-ing Pool(删除所有正在进行的工作)。

    请注意,虽然这有效,但它有点滥用multiprocessing;它不会在不终止整个Pool 的情况下处理取消单个任务。如果你需要更细粒度的任务取消,你会想看concurrent.futures,但即使在那里,它也只能取消未调度的任务;一旦它们开始运行,如果不终止 Executor 或使用边带终止方式(让任务间歇性地轮询某个进程间对象以确定它是否应该继续运行),就无法取消它们。

    【讨论】:

    • 非常感谢这个伟大的方法。我试了一下,它几乎可以满足我的需要。我已经用我实施你的方法的细节更新了这个问题。您能否看一下并给我一些重构的方向,以便能够相互独立地设置 iNumberOfProcessors 和 num_parts 并且仍然使用所有进程?
    猜你喜欢
    • 2015-05-12
    • 2016-02-25
    • 1970-01-01
    • 2018-01-26
    • 2021-01-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-02-25
    相关资源
    最近更新 更多