【问题标题】:Dead simple example of using Multiprocessing Queue, Pool and Locking使用多处理队列、池和锁定的简单示例
【发布时间】:2014-01-20 04:08:58
【问题描述】:

我尝试阅读http://docs.python.org/dev/library/multiprocessing.html 的文档,但我仍在为多处理队列、池和锁定而苦苦挣扎。现在我能够构建下面的示例。

关于队列和池,我不确定我是否以正确的方式理解了这个概念,如果我错了,请纠正我。我想要实现的是 一次处理 2 个请求(本例中数据列表有 8 个)那么,我应该使用什么?池来创建 2 个可以处理两个不同队列(最多 2 个)的进程,还是我应该每次只使用 Queue 来处理 2 个输入?锁定将正确打印输出。

import multiprocessing
import time

data = (['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
        ['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
)


def mp_handler(var1):
    for indata in var1:
        p = multiprocessing.Process(target=mp_worker, args=(indata[0], indata[1]))
        p.start()


def mp_worker(inputs, the_time):
    print " Processs %s\tWaiting %s seconds" % (inputs, the_time)
    time.sleep(int(the_time))
    print " Process %s\tDONE" % inputs

if __name__ == '__main__':
    mp_handler(data)

【问题讨论】:

    标签: python python-2.7 multiprocessing


    【解决方案1】:

    这是我对这个主题的个人访问:

    这里的要点,(欢迎拉请求!): https://gist.github.com/thorsummoner/b5b1dfcff7e7fdd334ec

    import multiprocessing
    import sys
    
    THREADS = 3
    
    # Used to prevent multiple threads from mixing thier output
    GLOBALLOCK = multiprocessing.Lock()
    
    
    def func_worker(args):
        """This function will be called by each thread.
        This function can not be a class method.
        """
        # Expand list of args into named args.
        str1, str2 = args
        del args
    
        # Work
        # ...
    
    
    
        # Serial-only Portion
        GLOBALLOCK.acquire()
        print(str1)
        print(str2)
        GLOBALLOCK.release()
    
    
    def main(argp=None):
        """Multiprocessing Spawn Example
        """
        # Create the number of threads you want
        pool = multiprocessing.Pool(THREADS)
    
        # Define two jobs, each with two args.
        func_args = [
            ('Hello', 'World',), 
            ('Goodbye', 'World',), 
        ]
    
    
        try:
            pool.map_async(func_worker, func_args).get()
        except KeyboardInterrupt:
            # Allow ^C to interrupt from any thread.
            sys.stdout.write('\033[0m')
            sys.stdout.write('User Interupt\n')
        pool.close()
    
    if __name__ == '__main__':
        main()
    

    【讨论】:

    • 我不确定 .map_async() 是否比 .map() 好。
    • get() 的参数是超时,与启动的作业数量无关。
    • @mata 那么,这意味着要在轮询循环中使用吗? .get(timeout=1)?可以直接说.get() 来获取完整列表吗?
    • 是的,.get() 无限期地等待,直到所有结果都可用并返回结果列表。您可以使用轮询循环来检查天气结果是否可用,或者您可以在 map_async() 调用中传递一个回调函数,然后在每个结果可用时调用该函数。
    【解决方案2】:

    对于使用 Komodo Edit (win10) 等编辑器的所有人,请将 sys.stdout.flush() 添加到:

    def mp_worker((inputs, the_time)):
        print " Process %s\tWaiting %s seconds" % (inputs, the_time)
        time.sleep(int(the_time))
        print " Process %s\tDONE" % inputs
        sys.stdout.flush()
    

    或作为第一行:

        if __name__ == '__main__':
           sys.stdout.flush()
    

    这有助于查看脚本运行期间发生的情况;而不必看黑色的命令行框。

    【讨论】:

      【解决方案3】:

      这可能与问题不是 100% 相关,但在我搜索使用多处理队列的示例时,这首先出现在 google 上。

      这是一个基本示例类,您可以实例化并将项目放入队列中,并可以等待队列完成。这就是我所需要的。

      from multiprocessing import JoinableQueue
      from multiprocessing.context import Process
      
      
      class Renderer:
          queue = None
      
          def __init__(self, nb_workers=2):
              self.queue = JoinableQueue()
              self.processes = [Process(target=self.upload) for i in range(nb_workers)]
              for p in self.processes:
                  p.start()
      
          def render(self, item):
              self.queue.put(item)
      
          def upload(self):
              while True:
                  item = self.queue.get()
                  if item is None:
                      break
      
                  # process your item here
      
                  self.queue.task_done()
      
          def terminate(self):
              """ wait until queue is empty and terminate processes """
              self.queue.join()
              for p in self.processes:
                  p.terminate()
      
      r = Renderer()
      r.render(item1)
      r.render(item2)
      r.terminate()
      

      【讨论】:

      • item1item2 是什么?它们是某种任务或功能,会在两个不同的进程中执行吗?
      • 是的,它们是以并行方式处理的任务或输入参数。
      【解决方案4】:

      解决您的问题的最佳方法是使用Pool。使用Queues 并拥有一个单独的“队列馈送”功能可能是矫枉过正。

      这是您的程序的一个稍微重新排列的版本,这次只有 2 个进程包含在 Pool 中。我相信这是最简单的方法,对原始代码的改动很小:

      import multiprocessing
      import time
      
      data = (
          ['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
          ['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
      )
      
      def mp_worker((inputs, the_time)):
          print " Processs %s\tWaiting %s seconds" % (inputs, the_time)
          time.sleep(int(the_time))
          print " Process %s\tDONE" % inputs
      
      def mp_handler():
          p = multiprocessing.Pool(2)
          p.map(mp_worker, data)
      
      if __name__ == '__main__':
          mp_handler()
      

      请注意,mp_worker() 函数现在接受单个参数(前两个参数的元组),因为 map() 函数将您的输入数据分块到子列表中,每个子列表作为单个参数提供给您的工作函数。

      输出:

      Processs a  Waiting 2 seconds
      Processs b  Waiting 4 seconds
      Process a   DONE
      Processs c  Waiting 6 seconds
      Process b   DONE
      Processs d  Waiting 8 seconds
      Process c   DONE
      Processs e  Waiting 1 seconds
      Process e   DONE
      Processs f  Waiting 3 seconds
      Process d   DONE
      Processs g  Waiting 5 seconds
      Process f   DONE
      Processs h  Waiting 7 seconds
      Process g   DONE
      Process h   DONE
      

      根据下面的@Thales 评论进行编辑:

      如果您想要“为每个池限制锁定”,以便您的进程以串联对运行,ala:

      A 等待 B 等待 | A 完成,B 完成 | C等待,D等待| C 完成,D 完成 | ...

      然后将处理函数更改为为每对数据启动池(2 个进程):

      def mp_handler():
          subdata = zip(data[0::2], data[1::2])
          for task1, task2 in subdata:
              p = multiprocessing.Pool(2)
              p.map(mp_worker, (task1, task2))
      

      现在你的输出是:

       Processs a Waiting 2 seconds
       Processs b Waiting 4 seconds
       Process a  DONE
       Process b  DONE
       Processs c Waiting 6 seconds
       Processs d Waiting 8 seconds
       Process c  DONE
       Process d  DONE
       Processs e Waiting 1 seconds
       Processs f Waiting 3 seconds
       Process e  DONE
       Process f  DONE
       Processs g Waiting 5 seconds
       Processs h Waiting 7 seconds
       Process g  DONE
       Process h  DONE
      

      【讨论】:

      • 感谢简单直接的示例,但我如何为每个池限制应用锁定?我的意思是,如果您执行代码,我希望看到类似“A 等待 B 等待 | A 完成,b 完成 | C 等待,D 等待 | C 完成,D 完成”的内容
      • 换句话说,你不想让 C 在 A 和 B 都完成之前启动?
      • 没错,我可以使用 multiprocessing.Process 做到这一点,但我不知道如何使用 pool 做到这一点
      • 非常感谢,按预期工作,但在函数 mp_handler 上,您引用的是变量数据而不是 var1 :)
      • 好的,谢谢,我完全删除了var1,改为引用全局data
      【解决方案5】:

      这是我的代码中的一个示例(对于线程池,但只需更改类名,您将拥有进程池):

      def execute_run(rp): 
         ... do something 
      
      pool = ThreadPoolExecutor(6)
      for mat in TESTED_MATERIAL:
          for en in TESTED_ENERGIES:
              for ecut in TESTED_E_CUT:
                  rp = RunParams(
                      simulations, DEST_DIR,
                      PARTICLE, mat, 960, 0.125, ecut, en
                  )
                  pool.submit(execute_run, rp)
      pool.join()
      

      基本上:

      • pool = ThreadPoolExecutor(6) 为 6 个线程创建一个池
      • 然后你有一堆 for 将任务添加到池中
      • pool.submit(execute_run, rp) 将任务添加到池中,第一个参数是在线程/进程中调用的函数,其余参数传递给被调用函数。
      • pool.join 等待所有任务完成。

      【讨论】:

      • 请注意,您使用的是 concurrent.futures,但 OP 询问的是 multiprocessing 和 Python 2.7。
      猜你喜欢
      • 2016-04-18
      • 1970-01-01
      • 1970-01-01
      • 2011-06-03
      • 2012-01-23
      • 1970-01-01
      • 2017-06-15
      • 2014-12-23
      • 1970-01-01
      相关资源
      最近更新 更多