【问题标题】:Is it possible to multiprocess a function that returns something in Python?是否可以对在 Python 中返回某些内容的函数进行多处理?
【发布时间】:2012-06-03 14:36:38
【问题描述】:

在 Python 中,我看到了许多调用多处理但目标只是打印一些东西的例子。我有一个场景,其中目标返回 2 个变量,我需要稍后使用。例如:

def foo(some args):
   a = someObject
   b = someObject
   return a,b

p1=multiprocess(target=foo,args(some args))
p2=multiprocess(target=foo,args(some args))
p3=multiprocess(target=foo,args(some args))

现在呢?我可以执行 .start 和 .join,但如何检索单个结果?我需要为我执行的所有作业捕获返回 a,b,然后处理它。

【问题讨论】:

    标签: python multiprocessing


    【解决方案1】:

    您希望使用多个进程执行一些令人尴尬的并行工作,那么为什么不使用PoolPool 将负责启动流程、检索结果并将结果返回给您。

    我使用pathos,它有一个multiprocessing 的fork,因为它的序列化比标准库提供的版本好得多。

    (.py) 文件

    from pathos.multiprocessing import ProcessingPool as Pool
    
    def foo(obj1, obj2):
        a = obj1.x**2
        b = obj2.x**2
        return a,b
    
    class Bar(object):
        def __init__(self, x):
            self.x = x
    
    Pool().map(foo, [Bar(1),Bar(2),Bar(3)], [Bar(4),Bar(5),Bar(6)])
    

    结果

    [(1, 16), (4, 25), (9, 36)]
    

    您会看到foo 接受两个参数,并返回两个对象的元组。 Poolmap方法将foo提交给底层进程,返回结果为res

    您可以在这里获取pathoshttps://github.com/uqfoundation

    【讨论】:

    • 可能值得披露 Mike McKerns 是 pathos 的作者。
    • 当然。仅供参考,这篇文章是在我意识到作者身份披露是标准做法之前发布的。
    【解决方案2】:

    是的,当然 - 您可以使用多种方法。最简单的方法之一是共享Queue。在此处查看示例:http://eli.thegreenplace.net/2012/01/16/python-parallelizing-cpu-bound-tasks-with-multiprocessing/

    【讨论】:

    • 返回值有限制吗?如果它是像 pdf 这样的二进制文件怎么办?我的目标是获得 n no: of pdf_S 然后 concatenate 。订单对我们来说并不重要。
    • @Nishant:真的可以是任何数据。对于传输实际文件,我会在考虑文件大小的情况下进行谨慎分析。将文件写入磁盘并传递指向它们的指针(即名称)可能更方便,但必须注意同步和原子性
    【解决方案3】:

    我直接从文档中复制了这个例子,因为我不能给你一个直接的链接。请注意,它会从 done_queue 中打印出结果,但您可以用它做任何您喜欢的事情。

    #
    # Simple example which uses a pool of workers to carry out some tasks.
    #
    # Notice that the results will probably not come out of the output
    # queue in the same in the same order as the corresponding tasks were
    # put on the input queue.  If it is important to get the results back
    # in the original order then consider using `Pool.map()` or
    # `Pool.imap()` (which will save on the amount of code needed anyway).
    #
    # Copyright (c) 2006-2008, R Oudkerk
    # All rights reserved.
    #
    
    import time
    import random
    
    from multiprocessing import Process, Queue, current_process, freeze_support
    
    #
    # Function run by worker processes
    #
    
    def worker(input, output):
        for func, args in iter(input.get, 'STOP'):
            result = calculate(func, args)
            output.put(result)
    
    #
    # Function used to calculate result
    #
    
    def calculate(func, args):
        result = func(*args)
        return '%s says that %s%s = %s' % \
            (current_process().name, func.__name__, args, result)
    
    #
    # Functions referenced by tasks
    #
    
    def mul(a, b):
        time.sleep(0.5*random.random())
        return a * b
    
    def plus(a, b):
        time.sleep(0.5*random.random())
        return a + b
    
    #
    #
    #
    
    def test():
        NUMBER_OF_PROCESSES = 4
        TASKS1 = [(mul, (i, 7)) for i in range(20)]
        TASKS2 = [(plus, (i, 8)) for i in range(10)]
    
        # Create queues
        task_queue = Queue()
        done_queue = Queue()
    
        # Submit tasks
        for task in TASKS1:
            task_queue.put(task)
    
        # Start worker processes
        for i in range(NUMBER_OF_PROCESSES):
            Process(target=worker, args=(task_queue, done_queue)).start()
    
        # Get and print results
        print 'Unordered results:'
        for i in range(len(TASKS1)):
            print '\t', done_queue.get()
    
        # Add more tasks using `put()`
        for task in TASKS2:
            task_queue.put(task)
    
        # Get and print some more results
        for i in range(len(TASKS2)):
            print '\t', done_queue.get()
    
        # Tell child processes to stop
        for i in range(NUMBER_OF_PROCESSES):
            task_queue.put('STOP')
    
    
    if __name__ == '__main__':
        freeze_support()
        test()
    

    最初来自multiprocessing module docs

    【讨论】:

      【解决方案4】:

      为什么没有人使用 callback 的 multiprocessing.Pool?

      例子:

      from multiprocessing import Pool
      from contextlib import contextmanager
      
      from pprint import pprint
      from requests import get as get_page
      
      @contextmanager
      def _terminating(thing):
          try:
              yield thing
          finally:
              thing.terminate()
      
      def _callback(*args, **kwargs):
          print("CALBACK")
          pprint(args)
          pprint(kwargs)
      
      print("Processing...")
      with _terminating(Pool(processes=WORKERS)) as pool:
          results = pool.map_async(get_page, URLS, callback=_callback)
      
          start_time = time.time()
          results.wait()
          end_time = time.time()
          print("Time for Processing: %ssecs" % (end_time - start_time))
      

      在这里,我同时打印 args 和 kwargs。但是您可以将 callback 替换为:

      def _callback2(responses):
          for r in responses:
              print(r.status_code) # or do whatever with response...
      

      【讨论】:

        【解决方案5】:

        它不能在 Windows 上工作,但这是我的多处理函数装饰器,它返回一个队列,您可以从该队列轮询并收集返回的数据

        import os
        from Queue import Queue
        from multiprocessing import Process
        
        def returning_wrapper(func, *args, **kwargs):
            queue = kwargs.get("multiprocess_returnable")
            del kwargs["multiprocess_returnable"]
            queue.put(func(*args, **kwargs))
        
        class Multiprocess(object):
            """Cute decorator to run a function in multiple processes."""
            def __init__(self, func):
                self.func = func
                self.processes = []
        
            def __call__(self, *args, **kwargs):
                num_processes = kwargs.get("multiprocess_num_processes", 2) # default to two processes.
                return_obj = kwargs.get("multiprocess_returnable", Queue()) # default to stdlib Queue
                kwargs["multiprocess_returnable"] = return_obj
                for i in xrange(num_processes):
                    pro = Process(target=returning_wrapper, args=tuple([self.func] + list(args)), kwargs=kwargs)
                    self.processes.append(pro)
                    pro.start()
                return return_obj
        
        
        @Multiprocess
        def info():
            print 'module name:', __name__
            print 'parent process:', os.getppid()
            print 'process id:', os.getpid()
            return 4 * 22
        
        data = info()
        print data.get(False)
        

        【讨论】:

          【解决方案6】:

          【讨论】:

            猜你喜欢
            • 2023-01-15
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 2023-02-15
            • 1970-01-01
            • 2017-06-01
            • 1970-01-01
            • 2019-06-29
            相关资源
            最近更新 更多