【问题标题】:Python Multiprocessing.Pool lazy iterationPython Multiprocessing.Pool 延迟迭代
【发布时间】:2011-07-16 04:36:21
【问题描述】:

我想知道 python 的 Multiprocessing.Pool 类与 map、imap 和 map_async 一起工作的方式。我的特殊问题是我想映射一个创建大量内存对象的迭代器,并且不希望所有这些对象同时生成到内存中。我想看看各种 map() 函数是否会使我的迭代器干涸,或者仅在子进程缓慢推进时智能地调用 next() 函数,所以我像这样破解了一些测试:

def g():
  for el in xrange(100):
    print el
    yield el

def f(x):
  time.sleep(1)
  return x*x

if __name__ == '__main__':
  pool = Pool(processes=4)              # start 4 worker processes
  go = g()
  g2 = pool.imap(f, go)
  g2.next()

map、imap 和 map_async 等等。然而,这是最明显的例子,因为在 g2 上简单地调用一次 next() 会打印出我的生成器 g() 中的所有元素,而如果 imap 是“懒惰”地这样做,我希望它只调用 go.next () 一次,因此只打印出 '1'。

有人能弄清楚发生了什么吗?是否有某种方法可以让进程池根据需要“懒惰地”评估迭代器?

谢谢,

加布

【问题讨论】:

  • 在删除time.sleep 调用并在f 中添加print os.getpid(), x 之后,行为看起来更加奇怪,有时只打印2 或3 个不同的PID,并且总是执行不同数量的迭代。 ..顺便说一句,您使用的是什么 Python 版本?
  • Python 2.6.6 (r266:84292, Dec 26 2010, 22:31:48) 标准 debian 安装。

标签: python multiprocessing


【解决方案1】:

在此示例中(请参阅代码)2 个工人。

按预期进行池工作:当 worker 空闲时,进行下一次迭代。

此代码作为主题中的代码,除了一件事:参数大小 = 64 k。

64 k - 默认套接字缓冲区大小。

import itertools
from multiprocessing import Pool
from time import sleep


def f( x ):
    print( "f()" )
    sleep( 3 )
    return x


def get_reader():
    for x in range( 10 ):
        print( "readed: ", x )
        value = " " * 1024 * 64 # 64k
        yield value


if __name__ == '__main__':

    p = Pool( processes=2 )

    data = p.imap( f, get_reader() )

    p.close()
    p.join()

【讨论】:

    【解决方案2】:

    你想要的实现在 NuMap 包中,来自网站:

    NuMap 是并行的(基于线程或进程,本地或远程), 缓冲的、多任务的、itertools.imap 或 multiprocessing.Pool.imap 功能替换。像 imap 一样,它评估元素的函数 一个序列或可迭代的,它是懒惰的。 可以通过“stride”和“buffer”参数调整惰性。

    【讨论】:

      【解决方案3】:

      我也遇到了这个问题,并且很失望地得知地图会消耗所有元素。我编写了一个函数,它在多处理中使用 Queue 数据类型懒惰地使用迭代器。这类似于@unutbu 在对他的回答的评论中描述的内容,但正如他所指出的那样,没有用于重新加载队列的回调机制。相反,Queue 数据类型公开了一个超时参数,我使用了 100 毫秒来获得良好的效果。

      from multiprocessing import Process, Queue, cpu_count
      from Queue import Full as QueueFull
      from Queue import Empty as QueueEmpty
      
      def worker(recvq, sendq):
          for func, args in iter(recvq.get, None):
              result = func(*args)
              sendq.put(result)
      
      def pool_imap_unordered(function, iterable, procs=cpu_count()):
          # Create queues for sending/receiving items from iterable.
      
          sendq = Queue(procs)
          recvq = Queue()
      
          # Start worker processes.
      
          for rpt in xrange(procs):
              Process(target=worker, args=(sendq, recvq)).start()
      
          # Iterate iterable and communicate with worker processes.
      
          send_len = 0
          recv_len = 0
          itr = iter(iterable)
      
          try:
              value = itr.next()
              while True:
                  try:
                      sendq.put((function, value), True, 0.1)
                      send_len += 1
                      value = itr.next()
                  except QueueFull:
                      while True:
                          try:
                              result = recvq.get(False)
                              recv_len += 1
                              yield result
                          except QueueEmpty:
                              break
          except StopIteration:
              pass
      
          # Collect all remaining results.
      
          while recv_len < send_len:
              result = recvq.get()
              recv_len += 1
              yield result
      
          # Terminate worker processes.
      
          for rpt in xrange(procs):
              sendq.put(None)
      

      此解决方案的优点是不对 Pool.map 的请求进行批处理。一个单独的工人不能阻止其他人取得进步。 YMMV。请注意,您可能希望使用不同的对象来为工作人员发出终止信号。在示例中,我使用了 None。

      在“Python 2.7 (r27:82525, Jul 4 2010, 09:01:59) [MSC v.1500 32 bit (Intel)] on win32 上测试”

      【讨论】:

      • 我已经检查了 Python 3.3,imapimap_unordered 在启动映射函数之前都不会消耗所有参数,尽管 map 会。
      • +1 这几乎是我需要的,但不幸的是我需要有序的结果。
      • 我通常不会为输入/输出队列调整 get/put 超时,而是 1) 为两个队列设置固定大小,2) 如果队列为空/满,则让 get/put 阻塞。这样就无需调整超时。只需要检查进入队列和退出队列的项目数。那么正确的顺序是:1)启动工人; 2)启动out-Queue收集器; 3) 遍历输入并填充队列。
      • @neo,也可以得到有序的结果。实现这一点的方法是拥有 4 个大小有限的队列 [In(工作人员的数据)、Out(最终的、正确排序的结果)、Serial(跟踪处理中的项目)、Sort(中间队列)] 和 2 种类型的工人 - 一个 sorter() 和 N 个实际工人。这个想法是:1)有一个serial数字生成器; 2) 将每个数据集提交为In.put( (serial, data) ),连同Serial.put(serial); 3)工人做Sort.put((serial, result)); 4)sorter()从Sort中获取项目,按serial排序,放入Out。
      • @neo,这是一个未经测试的sorter() 示例:bitbucket.org/qmentis/bioinformatics-scripts/src/… 忘了提到所有队列必须具有相同的大小限制,并且假设所有队列项的处理量大致相同该方案的工作时间(否则sorter() 的内部缓冲区将开始累积太多结果)。
      【解决方案4】:

      我们先看一下程序的结尾。

      当您的程序结束时,多处理模块使用atexit 调用multiprocessing.util._exit_function

      如果您删除g2.next(),您的程序将很快结束。

      _exit_function 最终调用Pool._terminate_pool。主线程将pool._task_handler._state的状态从RUN更改为TERMINATE。同时pool._task_handler 线程在Pool._handle_tasks 中循环,并在达到条件时退出

                  if thread._state:
                      debug('task handler found thread._state != RUN')
                      break
      

      (参见 /usr/lib/python2.6/multiprocessing/pool.py)

      这就是阻止任务处理程序完全使用您的生成器g() 的原因。如果您查看Pool._handle_tasks,您会看到

              for i, task in enumerate(taskseq):
                  ...
                  try:
                      put(task)
                  except IOError:
                      debug('could not put task on queue')
                      break
      

      这是使用您的生成器的代码。 (taskseq 不完全是您的生成器,但随着 taskseq 被消耗,您的生成器也是如此。)

      相反,当您调用g2.next() 时,主线程调用IMapIterator.next,并在到达self._cond.wait(timeout) 时等待。

      主线程正在等待而不是 调用_exit_function 是允许任务处理线程正常运行的原因,这意味着在workers'Pool._handle_tasks 函数中的puts 任务中完全消耗生成器。

      底线是所有Pool 映射函数都会消耗给定的整个可迭代对象。如果你想分块使用生成器,你可以这样做:

      import multiprocessing as mp
      import itertools
      import time
      
      
      def g():
          for el in xrange(50):
              print el
              yield el
      
      
      def f(x):
          time.sleep(1)
          return x * x
      
      if __name__ == '__main__':
          pool = mp.Pool(processes=4)              # start 4 worker processes
          go = g()
          result = []
          N = 11
          while True:
              g2 = pool.map(f, itertools.islice(go, N))
              if g2:
                  result.extend(g2)
                  time.sleep(1)
              else:
                  break
          print(result)
      

      【讨论】:

      • 很好的答案,我最终重新实现了一个线程池,同时消耗一个元素一个元素,但是你的 islice 解决方案对我来说工作量要少得多,哦,好吧:-)。我试着环顾一下 pool.py 并注意到确实 map/imap/map_async 函数似乎立即吃掉了迭代器。我不清楚这是否真的有必要,尤其是在标准 Pool.map() 的情况下?
      • @Gabe:为了即时使用迭代器,我认为必须在Pool 中编写一些额外的信号机制,以告诉任务处理程序何时在put 中添加更多任务inqueue。也许这是可能的,但目前在Pool 中不存在,并且可能也会减慢进程。
      • 确实,我的解决方案是创建一个大小为 N*size_of_pool 的任务队列,并与 N 一起玩,直到看起来队列保持良好的缓冲区。当然,这是依赖于任务的,所以我可以理解 Pool 代码的作者不想处理这个问题。感谢您的回复!
      • 如果生成器不知道元素的数量(在这种情况下为 100)怎么办?
      • @Vince:您可以将for-loop 更改为while-loop,并在pool.map 的结果为空时中断。我已经编辑了帖子以说明我的意思。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2011-12-30
      • 1970-01-01
      • 2017-04-18
      • 2020-05-27
      • 2021-04-21
      • 2013-07-22
      • 2014-03-05
      相关资源
      最近更新 更多