【问题标题】:Python多处理 - 是否可以在各个进程之间引入固定的时间延迟?
【发布时间】:2015-08-01 07:23:20
【问题描述】:

我已经搜索过,但在其他地方找不到这个问题的答案。希望我没有错过任何东西。

我正在尝试使用 Python 多处理来并行批量运行一些专有模型。比如说,我有 200 次模拟,我想一次批量运行 10-20 次。我的问题是,如果两个模型碰巧在相同/相似的时间启动,专有软件就会崩溃。我需要在多处理产生的进程之间引入延迟,以便每个新模型运行在开始前等待一点。

到目前为止,我的解决方案是在子进程启动模型运行之前引入随机时间延迟。但是,这只会降低任意两次运行同时开始的概率,因此在尝试处理大量模型时仍然会遇到问题。因此,我认为时间延迟需要内置到代码的多处理部分中,但我无法找到任何文档或示例。

编辑:我使用的是 Python 2.7

这是我目前的代码:

from time import sleep
import numpy as np
import subprocess
import multiprocessing

def runmodels(arg):
    sleep(np.random.rand(1,1)*120) # this is my interim solution to reduce the probability that any two runs start at the same time, but it isn't a guaranteed solution
    subprocess.call(arg) # this line actually fires off the model run

if __name__ == '__main__':    

    arguments =     [big list of runs in here
                    ]    

    count = 12
    pool = multiprocessing.Pool(processes = count)
    r = pool.imap_unordered(runmodels, arguments)      
    pool.close()
    pool.join()

【问题讨论】:

  • imap_unordered() 返回一个迭代器,您应该使用它,例如 for result in pool.imap_unordered(...):
  • 在这种情况下,我通常会在该循环中做什么(即在 for 语句之后的行中)?
  • 处理单个模拟的结果(如果有)和/或报告进度 ('\r{x} out of {y} simulations done')。
  • 重点是,您应该明确地使用迭代器(无论实际的 multiprocessing 实现是什么)。如果您不需要结果;使用deque(r, maxlen=0)

标签: python multithreading batch-file multiprocessing


【解决方案1】:

multiprocessing.Pool() 已经限制了同时运行的进程数。

您可以使用锁来分隔进程的开始时间(未测试):

import threading
import multiprocessing

def init(lock):
    global starting
    starting = lock

def run_model(arg):
    starting.acquire() # no other process can get it until it is released
    threading.Timer(1, starting.release).start() # release in a second
    # ... start your simulation here

if __name__=="__main__":
   arguments = ...
   pool = Pool(processes=12, 
               initializer=init, initargs=[multiprocessing.Lock()])
   for _ in pool.imap_unordered(run_model, arguments):
       pass

【讨论】:

  • 为我工作,似乎比以前的答案更简单
【解决方案2】:

使用线程和信号量的一种方法:

from time import sleep
import subprocess
import threading


def runmodels(arg):
    subprocess.call(arg)
    sGlobal.release() # release for next launch


if __name__ == '__main__':
    threads = []
    global sGlobal
    sGlobal = threading.Semaphore(12) #Semaphore for max 12 Thread
    arguments =  [big list of runs in here
                ]
    for arg in arguments :
        sGlobal.acquire() # Block if more than 12 thread
        t = threading.Thread(target=runmodels, args=(arg,))
        threads.append(t)
        t.start()
        sleep(1)

    for t in threads :
        t.join()

【讨论】:

  • 感谢您的回复,但是我遇到了一些问题: - sGlobal = threading.Semaphore(n) 似乎没有限制同时运行的数量。它似乎可以随心所欲地发射。 - sTempo = threading.Semaphore(n) 似乎不会影响两次运行之间的延迟。对延迟产生影响的唯一数字是 sleep(n)。 - 如果所有进程成功完成,最后我会收到一条错误消息: AttributeError: 'module' object has no attribute 'main_thread' 。如果您能提供任何进一步的帮助,我们将不胜感激。
  • 暂缓我对 sTempo 信号量的评论,我现在明白它应该如何工作了。但是,sGlobal 信号量仍在运行超过 n 次模型。
  • 经过进一步调查,我认为无限线程是由于子进程没有阻塞造成的。虽然,我知道 subprocess.call() 意味着阻塞。
  • 我做错了。信号量没有阻塞,因为代码启动了线程,直接在信号量上做了释放(所以没有停止)。我已经编辑了代码:信号量发布现在在线程中,所以当子进程完成时发布。我希望这次会没事:)
  • 对于 AttributeError:“模块”对象没有属性“主线程”。我想是因为我使用 python 3.4 而你使用 python 2.X。我已经编辑了 python 版本 2.X 的代码。下次在问题中给出你的python版本;)
【解决方案3】:

jfs 建议的答案给我带来了问题,因为我用threading.Timer 启动了一个新线程。如果工人恰好在计时器完成之前完成,则计时器将被终止并且永远不会释放锁。

我提出了一条替代路线,其中每个连续的工人将等待,直到自前一个工人开始以来已经过去了足够的时间。这似乎具有相同的预期效果,但不必依赖另一个子进程。

import multiprocessing as mp
import time

def init(shared_val):
    global start_time
    start_time = shared_val

def run_model(arg):
    with start_time.get_lock():
        wait_time = max(0, start_time.value - time.time())
        time.sleep(wait_time)
        start_time.value = time.time() + 1.0 # Specify interval here
    # ... start your simulation here

if __name__=="__main__":
   arguments = ...
   pool = mp.Pool(processes=12, 
                  initializer=init, initargs=[mp.Value('d')])
   for _ in pool.imap_unordered(run_model, arguments):
       pass

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-03-10
    • 1970-01-01
    相关资源
    最近更新 更多