【问题标题】:Python Multiprocessing Pool as DecoratorPython 多处理池作为装饰器
【发布时间】:2021-07-11 21:12:14
【问题描述】:

我正在编写经常需要使用 python 的 multiprocessing Pool 类的代码。这导致大量代码如下所示:

import time
from multiprocessing import Pool
from functools import partial

def test_func(x):
    time.sleep(1)
    return x

def test_func_parallel(iterable, processes):
    p = Pool(processes=processes)
    output = p.map(test_func, iterable)
    p.close()
    return output

这可以更通用:

def parallel(func, iterable, **kwargs):
    func = partial(func, **kwargs)
    p = Pool(processes=6)
    out = p.map(func, iterable)
    p.close()
    return out

这可行,但是为每个其他函数添加并行包装器会使代码复杂化。我真正想要的是让它作为一个装饰器工作。像这样的:

def parallel(num_processes):
    def parallel_decorator(func, num_processes=num_processes):
        def parallel_wrapper(iterable, **kwargs):
            func = partial(func, **kwargs)
            p = Pool(processes=num_processes)
            output = p.map(func, iterable)
            p.close()
            return output

        return parallel_wrapper
    return parallel_decorator

可以如下使用:

@parallel(6)
def test_func(x):
    time.sleep(1)
    return x

由于泡菜的原因而失败

Can't pickle <function test1 at 0x117473268>: it's not the same object as __main__.test1

我已经阅读了一些有关相关问题的帖子,但它们都实现了在装饰器之外执行多处理的解决方案。有谁知道如何实现这项工作?

【问题讨论】:

  • 顺便说一句,这是一个比我看到的关于多处理的大多数问题要好得多的问题——经过深思熟虑,具有可靠的复制器,等等。
  • What can multiprocessing and dill do together? -- 从multiprocessing 切换到第3 方pathos.multiprocessing,你就在那里。
  • 你可以用“fork”试试这个还是你在 Windows 上?
  • 您是否尝试过利用copyregdocs.python.org/3.7/library/copyreg.html functools 中还有 `partialmethod',虽然我不知道这是否是一个解决方案。 docs.python.org/3.7/library/functools.html
  • “他们都实现了一个解决方案,在装饰器之外执行多处理。” >>你介意分享这些链接吗..(用于我/公共学习..)..tq..

标签: python multiprocessing python-multiprocessing


【解决方案1】:

如果你不介意不使用装饰器的语法糖(@ 符号),这样的东西应该可以工作:

import functools
import time

from multiprocessing import Pool


def parallel(func=None, **options):
    if func is None:
        return functools.partial(parallel, **options)

    def wrapper(iterable, **kwargs):
        processes = options["processes"]

        with Pool(processes) as pool:
            result = pool.map(func, iterable)

        return result

    return wrapper


def test(i):
    time.sleep(1)
    print(f"{i}: {i * i}")

test_parallel = parallel(test, processes=6)


def main():
    test_parallel(range(10))


if __name__ == "__main__":
    main()

【讨论】:

  • 为什么不只是parallel(test, processes=6)(range(10))?我也不认为 @functools.wraps(func) 正在完成任何事情,因为您没有包装 func
【解决方案2】:

我也有同样的问题。它围绕 Pool() 对象的实现方式展开。因此,它可以与普通包装器一起正常工作,但不能与装饰器一起使用。解决方法是使用 Process() 定义您自己的类似 Pool() 的实现。

这可能很难优化,但如果您是装饰器爱好者,这里有一个(肮脏的)示例:

# something to do
args = range(10)


def parallel(function):
    """ An alternative implementation to
    multiprocessing.Pool().map() using
    multiprocessing.Process(). """

    def interfacer(args):
        """ The wrapper function. """
        
        # required libraries
        from multiprocessing import (Queue, Process)
        from os import cpu_count
        
        # process control
        ## maximum number of processes required
        max_processes = len(args)
        
        ## maximum numer of processes running
        max_threads = cpu_count() - 1
        
        """ Since there is no Pool() around
        we need to take care of the processes
        ourselves. If there is nothing for a
        processes to do, it is going to await
        for an input, if there are too many of
        them, the processor shall suffer. """
        
        # communications
        ## things to do
        inbasket = Queue()
        
        ## things done
        outbasket = Queue()
        
        """ I am thinking asynchronouly,
        there is probably a better way of
        doing this. """
        
        # populate inputs
        for each in args:
            
            ## put arguments into the basket
            inbasket.put(each)
        
        def doer():
            """ Feeds the targeted/decorated
            'function' with data from the baskets and
            collets the results.
            
            This blind function helps the
            implementation to generalize over any
            iterable. """
            
            outbasket.put(function(inbasket.get()))
            return(True)
        
        def run(processes = max_threads):
            """ Create a certain number of
            Process()s and runs each one. 
            There is room for improvements here. """
            
            # the process pool
            factory = list()
            
            # populate the process pool
            for each in range(processes):
                factory.append(Process(target = doer))
            
            # execute in process pool
            for each in factory:
                each.start()
                each.join()
                each.close()
            
            return(True)
        
        """ Now we need to manage the processes,
        and prevent them for overwhelm the CPU.
        That is the tricky part that Pool() does
        so well. """
        
        while max_processes:
        # as long as there is something to do
        
            if (max_processes - max_threads) >= 0:
                
                run(max_threads)
                max_processes -= max_threads
            
            else:
            # play it safe    
                run(1)
                max_processes -= 1
        
        # undo the queue and give me back the list of 'dones'
        return([outbasket.get() for each in range(outbasket.qsize())])

    return(interfacer)

@parallel
def test(x):
    return(x**2)

print(test(args))

可能这段代码效率低下,但给出了一个想法。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-12-28
    • 1970-01-01
    • 2016-09-27
    • 2021-07-24
    • 2020-08-14
    • 2016-11-10
    • 1970-01-01
    • 2021-07-16
    相关资源
    最近更新 更多