【问题标题】:Is it possible to start Pool processes sequentially?是否可以按顺序启动池进程?
【发布时间】:2017-05-04 20:26:34
【问题描述】:

以下代码启动三个进程,它们在一个池中处理 20 个工作调用:

import multiprocessing

def worker(nr):
    print(nr)

numbers = [i for i in range(20)]

if __name__ == '__main__':
    multiprocessing.freeze_support()
    pool = multiprocessing.Pool(processes=3)
    results = pool.map(worker, numbers)
    pool.close()
    pool.join()

有没有办法按顺序启动进程(而不是让它们同时启动),在每个进程启动之间插入延迟?

如果不使用Pool,我会在循环中使用multiprocessing.Process(target=worker, args=(nr,)).start(),依次启动它们并根据需要插入延迟。不过,我发现Pool 非常有用(与map 通话一起),所以如果可能的话,我很乐意保留它。

【问题讨论】:

  • 为什么要这样做?
  • @acushner:举个例子,我有一个需要调用 200 次的 API。 API 的限制是每次调用持续大约 5 分钟(这在调用之间会发生变化)。我可以同时运行 10 个呼叫,并且每个呼叫必须在上一个呼叫结束后至少延迟 5 秒后启动。我可以在我的工作人员开始时添加一个 5 秒的睡眠 - 这在 10 个并行调用同时启动的一开始就可以正常工作。这就是为什么按顺序创建Pool(延迟 5 秒)可以解决问题的原因。
  • 不能编辑我之前的评论,第二句应该是:API的约束是每次调用持续约5分钟(这在调用之间变化),我可以打 10 个电话 (...)。注意逗号而不是句号(有三个约束)
  • 你用的是python 2还是python 3?
  • @acushner: python 3.4.3

标签: python multiprocessing


【解决方案1】:

根据documentation,不存在对池进程的这种控制。但是,您可以使用锁来模拟它:

import multiprocessing
import time

lock = multiprocessing.Lock()

def worker(nr):
    lock.acquire()
    time.sleep(0.100)
    lock.release()
    print(nr)

numbers = [i for i in range(20)]

if __name__ == '__main__':
    multiprocessing.freeze_support()
    pool = multiprocessing.Pool(processes=3)
    results = pool.map(worker, numbers)
    pool.close()
    pool.join()

您的 3 个进程仍将同时启动。好吧,我的意思是您无法控制哪个进程首先开始执行回调。但至少你得到了延迟。这有效地让每个工人以指定的时间间隔“开始”(但实际上,继续)。

以下讨论产生的修正:

请注意,在 Windows 上,无法从父进程继承锁。相反,您可以使用multiprocessing.Manager().Lock() 在进程之间通信一个全局锁对象(当然还有额外的 IPC 开销)。全局锁对象也需要在每个进程中初始化。这看起来像:

from multiprocessing import Process, freeze_support
import multiprocessing
import time
from datetime import datetime as dt

def worker(nr):
    glock.acquire()
    print('started job: {} at {}'.format(nr, dt.now()))
    time.sleep(1)
    glock.release()
    print('ended   job: {} at {}'.format(nr, dt.now()))

numbers = [i for i in range(6)]

def init(lock):
    global glock
    glock = lock

if __name__ == '__main__':
    multiprocessing.freeze_support()
    lock = multiprocessing.Manager().Lock()
    pool = multiprocessing.Pool(processes=3, initializer=init, initargs=(lock,))
    results = pool.map(worker, numbers)
    pool.close()
    pool.join()

【讨论】:

  • 我尝试了您的代码并添加了一些时间 (pastebin.com/aFfQgBSD)。无论lock.acquire()lock.release() 是否存在或被注释掉,结果都是相同的(存在锁的输出:pastebin.com/3gu70D6q 和注释掉的两行:pastebin.com/MgB6D8E5
  • 我的代码按预期工作。您需要更改您的 pastebin 示例以观察正确的行为:您的工作人员需要在打印第一条消息之前获得锁。否则,所有进程都可以在其中任何一个获取锁之前打印它们的时间戳。然后最后,其中一个获得了它,依此类推……你真正想做的是pastebin.com/a2WR0C3N
  • 嗯,前三个作业仍然在您的 pastebin 代码中同时打印“结束...”,这意味着它们没有按顺序执行该行。他们同时开始[一些带有锁定和等待的魔法]并一起打印'结束......'。我想要的是有'结局..;'一部分工人一个接一个地完成。另外,请在问题后的评论后查看我的示例 - 对于现实生活中的应用程序。
  • 它工作得很好,这是我的代码的输出:pastebin.com/rV7w731i。你能粘贴你的输出吗?
  • 感谢您跟上 :) 我将睡眠时间增加到 5 秒,以使班次更明显 (pastebin.com/9QvkZ8fQ)。如您所见,3 个工人从 8 点开始,全部在 13 点结束,新的从 13 点开始并在 18 点结束。这意味着它们不是连续的,间隔 5 秒延迟。
【解决方案2】:

你不能做一些像这样简单的事情吗:

from multiprocessing import Process
from time import sleep

def f(n):
    print 'started job: '+str(n)
    sleep(3)
    print 'ended job: '+str(n)

if __name__ == '__main__':
    for i in range(0,100):
        p = Process(target=f, args=(i,))
        p.start()
        sleep(1)

结果

started job: 0
started job: 1
started job: 2
ended job: 0
started job: 3
ended job: 1
started job: 4
ended job: 2
started job: 5

【讨论】:

  • 这是我在问题的最后一部分中提到的解决方案。我想了解是否可以控制Pool 进程的启动方式。
  • 除此之外,上面的代码将并行启动 100 个进程,而我限制为第一个可用进程消耗 3 个进程和 20 个工作人员。不过,这可以通过Queue 解决。
【解决方案3】:

您可以尝试定义一个缓慢产生值的函数吗?

def get_numbers_on_delay(numbers, delay):
    for i in numbers:
        yield i
        time.sleep(delay)

然后:

results = pool.map(worker, get_numbers_on_delay(numbers, 5))

我还没有测试过,所以我不确定,但试一试。

【讨论】:

    【解决方案4】:

    由于某种原因,我无法让锁定答案起作用,所以我以这种方式实现了它。 我意识到这个问题很老了,但也许其他人也有同样的问题。

    它生成所有类似于锁定解决方案的进程,但根据它们的进程名称编号在工作前休眠。

    from multiprocessing import current_process
    from re import search
    from time import sleep
    
    def worker():
        process_number = search('\d+', current_process().name).group()
        time_between_workers = 5
        sleep(time_between_workers * int(process_number))
        #do your work here
    

    由于赋予进程的名称似乎是唯一且递增的,因此它会获取进程的编号并以此为基础休眠。 SpawnPoolWorker-1 休眠 1 * 5 秒,SpawnPoolWorker-2 休眠 2 * 5 秒等。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2010-11-12
      • 2011-08-29
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多