【问题标题】:Python functions in a separate process. Can i unify the wrapper functionsPython 在一个单独的进程中运行。我可以统一包装函数吗
【发布时间】:2014-12-14 17:42:57
【问题描述】:

我是 Python 新手,我想知道如何在语法上更有效地实现以下问题。 我有函数 f1、f2 ... fN 这些函数是产生新进程的包装器(目标为 _f1、_f2、.. _fN), 将其参数 (arg1, arg2, ...) 传递给子进程并接收返回值
使用这样的代码,我希望模块功能在与调用者(模块的用户)进程不同的进程中执行。
函数 f1, f2, ... fN(分别为 _f1, f2, ... _fN)可能有不同的原型。

in a module

def _f1(arg1, arg2, ... argn,  connection):
    ...
    connection.send(return_value)
    connection.close()
def f1(arg1, arg2, ... argn):
    parent_conn, child_conn = Pipe()
    p = Process(target=_f1, args=(arg1, arg2, ... argn, child_conn))
    p.start()
    p.join() 
    return parent_conn.recv()


def _f2(arg1, arg2, ... argm,  connection):
    ...
    connection.send(return_value)
    connection.close()    
def f2(arg1, arg2, ... argn):
    parent_conn, child_conn = Pipe()
    p = Process(target=_f2, args=(arg1, arg2, ... argm, child_conn))
    p.start()
    p.join() 
    return parent_conn.recv()

...

def _fn(arg1, arg2, ... argk,  connection):
    ...
    connection.send(return_value)
    connection.close()    
def fN(arg1, arg2, ... argn):
    parent_conn, child_conn = Pipe()
    p = Process(target=_fN, args=(arg1, arg2, ... argk, child_conn))
    p.start()
    p.join() 
    return parent_conn.recv()

很明显,包装函数 f1、f2、fN 大致相同。我可以将它们实现为单个包装函数吗? 我希望执行不会阻塞。例如,模块的用户应该能够同时执行 f1 和 f2。

我希望我已经设法解释了我的问题。

这里有两个函数 sum() 和 sin() 的具体例子:

def _sum(a, b,  connection):
   return_value=a+b
   connection.send(return_value)
   connection.close()
def sum(a, b):
   parent_conn, child_conn = Pipe()
   p = Process(target=_sum, args=(a, b, child_conn))
   p.start()
   p.join() 
   return parent_conn.recv()

def _sin(x,  connection):
   return_value=sin(x)
   connection.send(return_value)
   connection.close()    
def sin(x):
   parent_conn, child_conn = Pipe()
   p = Process(target=_sin, args=(x, child_conn))
   p.start()
   p.join() 
   return parent_conn.recv() 

采用关于使用装饰的 srj 想法,我找到了下面发布的解决方案。 我试图进一步扩展它来装饰 connection.send(return_value) 和 connection.close() 但它对我不起作用。代码下方。在评论中,我指定了哪些有效,哪些等效(在我看来)无效。有什么帮助吗?

from multiprocessing import Process, Pipe

def process_wrapper1(func):
    def wrapper(*args):
        parent_conn, child_conn = Pipe()
        f_args = args + (child_conn,)
        p = Process(target=func, args=f_args)
        p.start()
        p.join() 
        return parent_conn.recv()
    return wrapper

def process_wrapper2(func):
    def wrapper(*args):
        res=func(*args[0:len(args)-1])
        args[-1].send(res)
        args[-1].close()
    return wrapper



#def _sum(a, b,  connection):            #Working 
#   return_value=a+b
#   connection.send(return_value)
#   connection.close()
def __sum(a, b):                       #Doesn't work, see the error bellow
    return(a+b)    
_sum=process_wrapper2(__sum)

sum=process_wrapper1(_sum) 

Pyzo ipython shell 中的上述代码生成以下结果:

In [3]: import test1
In [4]: test1.sum(2,3)
---------------------------------------------------------------------------
PicklingError                             Traceback (most recent call last)
<ipython-input-4-8c542dc5e11a> in <module>()
----> 1 test1.sum(2,3)

C:\projects\PYnGUInLib\test1.py in wrapper(*args)
     11         f_args = (child_conn,) + args
     12         p = Process(target=func, args=f_args)
---> 13         p.start()
     14         p.join()
     15         return parent_conn.recv()

C:\pyzo2014a_64b\lib\multiprocessing\process.py in start(self)
    103                'daemonic processes are not allowed to have children'
    104         _cleanup()
--> 105         self._popen = self._Popen(self)
    106         self._sentinel = self._popen.sentinel
    107         _children.add(self)

C:\pyzo2014a_64b\lib\multiprocessing\context.py in _Popen(process_obj)
    210     @staticmethod
    211     def _Popen(process_obj):
--> 212         return _default_context.get_context().Process._Popen(process_obj)
    213 
    214 class DefaultContext(BaseContext):

C:\pyzo2014a_64b\lib\multiprocessing\context.py in _Popen(process_obj)
    311         def _Popen(process_obj):
    312             from .popen_spawn_win32 import Popen
--> 313             return Popen(process_obj)
    314 
    315     class SpawnContext(BaseContext):

C:\pyzo2014a_64b\lib\multiprocessing\popen_spawn_win32.py in __init__(self, process_obj)
     64             try:
     65                 reduction.dump(prep_data, to_child)
---> 66                 reduction.dump(process_obj, to_child)
     67             finally:
     68                 context.set_spawning_popen(None)

C:\pyzo2014a_64b\lib\multiprocessing\reduction.py in dump(obj, file, protocol)
     57 def dump(obj, file, protocol=None):
     58     '''Replacement for pickle.dump() using ForkingPickler.'''
---> 59     ForkingPickler(file, protocol).dump(obj)
     60 
     61 #

PicklingError: Can't pickle <function process_wrapper2.<locals>.wrapper at 0x0000000005541048>: attribute lookup wrapper on test1 failed
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\pyzo2014a_64b\lib\multiprocessing\spawn.py", line 106, in spawn_main
   exitcode = _main(fd)
  File "C:\pyzo2014a_64b\lib\multiprocessing\spawn.py", line 116, in _main
   self = pickle.load(from_parent)
EOFError: Ran out of input

In [5]: 

【问题讨论】:

    标签: python multiprocessing wrapper


    【解决方案1】:

    您可以使用decorator 将函数与创建和执行进程的样板文件一起包装。

    def process_wrapper(func):
        def wrapper(*args):
            parent_conn, child_conn = Pipe()
            #attach the connection to the arguments
            f_args = args + (child_conn,)
            p = Process(target=func, args=f_args)
            p.start()
            p.join() 
            return parent_conn.recv()
        return wrapper
    

    并将函数定义为

    @process_wrapper
    def _f2(arg1, arg2, ... argm,  connection):
        ...
        connection.send(return_value)
        connection.close()
    

    解释:process_wrapper 函数接受一个具有 N 个位置参数的函数,其中最后一个始终是管道连接。它返回一个带有 N-1 个参数的函数,其中预先填充了连接。

    如果是您的具体功能,

    @process_wrapper
    def sin(x,  connection):
       return_value=sin(x)
       connection.send(return_value)
       connection.close()  
    
    @process_wrapper
    def sum(a, b,  connection):
       return_value=a+b
       connection.send(return_value)
       connection.close()
    

    你可以把函数称为

    sum(a,b)
    

    更多关于 python 装饰器的参考 http://www.jeffknupp.com/blog/2013/11/29/improve-your-python-decorators-explained/

    【讨论】:

    • 嗨,srj,我想我知道装饰器是什么,但我不明白在我的特定情况下如何准确地使用它们。在最初的问题中,我添加了一个具有两个功能的具体示例。您可以使用优化两个包装器 sum() 和 sin() 的装饰器发布等效代码吗?
    • 我已对答案进行了修改,如果您正在寻找,请查看它。
    • 嗨,srj,我在测试您的代码时遇到错误。 Can't pickle : it's not the same object as test1.sum ... EOFError: Ran out of input 我不确定问题是什么。我认为这是因为原始功能不可用,在您的情况下,我们只有装饰版本。你还认为我想要的可以用装饰器来实现吗? def _sum(a, b, connection): return_value=a+b connection.send(return_value) connection.close()
    • 你可以做 sum = process_wrapper(_sum),而不是装饰函数。
    • 是的,这就是我尝试过的。请参阅我在原始问题末尾放置的代码。第一个装饰工作正常,但是当我尝试使用另一个装饰时,PicklingError: Can't pickle &lt;function process_wrapper2.&lt;locals&gt;.wrapper at 0x0000000005962A60&gt;: attribute lookup wrapper on test1 failed 失败了,我认为 tehcode 已经接近有用了。谢谢srj!
    【解决方案2】:

    您应该使用multiprocessing.Pool。这是一个例子:

    def f1(*args):
        rv = do_calculations()
        return rv 
    
    def f2(*args):
        ...
    
    ...
    def fN(*args):
        ...
    
    def worker(args):
        fn = args[0]
        return fn(*args[1:])
    
    inputs = [
        [f1, f1_args],
        [f2, f2_args],
        ...
        [fN, fN_args]
    ]
    
    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
    results = pool.map(worker, inputs)
    

    【讨论】:

    • 嗨 Joel,pools 似乎是一个非常好的工具,谢谢!对于我的目的 pool.apply_async() 是我真正需要的。我无法在interactice shell 中使用它,但是这对我的项目来说是一个很强的要求。你认为这可以解决吗?
    • 查看code module。它提供了在非交互式脚本中实现交互式解释器的工具。
    • 嗨 Joel,我想要的是能够使用 IEP IDE 中集成的 ipython_qtconsole 中的库。我不想在脚本中使用交互式解释器。还是我误会了?
    • 好吧,看起来 ipython 捆绑了自己的多处理工具。查看nbviewer.ipython.org/github/vals/scilife-python-course/blob/…
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2010-10-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-10-05
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多