【问题标题】:Python: How can I check the number of pending tasks in a multiprocessing.Pool?Python:如何检查 multiprocessing.Pool 中待处理任务的数量?
【发布时间】:2011-07-29 09:50:35
【问题描述】:

我有一小部分工人 (4) 和一个非常大的任务列表 (5000~)。我正在使用一个池并使用 map_async() 发送任务。因为我正在运行的任务相当长,所以我强制将块大小设置为 1,这样一个长进程就无法容纳一些较短的进程。

我想做的是定期检查还有多少任务需要提交。我知道最多有 4 个处于活动状态,我关心还有多少需要处理。

我用谷歌搜索过,找不到任何人这样做。

一些简单的帮助代码:

import multiprocessing
import time

def mytask(num):
    print('Started task, sleeping %s' % num)
    time.sleep(num)

pool = multiprocessing.Pool(4)
jobs = pool.map_async(mytask, [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4], chunksize=1)
pool.close()

while True:
    if not jobs.ready():
        print("We're not done yet, %s tasks to go!" % <somethingtogettasks>)
        jobs.wait(2)
    else:
        break

【问题讨论】:

  • 我应该注意到我在 RHEL-6 系统上使用 python2.6,但是我对不同版本/平台上的示例持开放态度。
  • 任务完成时减少的静态变量? (当任务明显开始时增加)。
  • 在工作人员到达任务之前,任务不会“开始”。我想如果我创建了一个与要完成的任务大小相同的全局变量,然后每次启动可能会执行此操作的任务时将其递减,但这有点尴尬并且需要考虑线程安全。
  • 更改以获取示例代码以编译和运行:fpaste.org/p4Hb。另外:gist.github.com/902947
  • 谢谢亚当,我已经让上面的代码工作了。

标签: python pool multiprocess


【解决方案1】:

据我所知没有密闭的方式,但是如果你使用Pool.imap_unordered()函数而不是map_async,你可以截取被处理的元素。

import multiprocessing
import time

process_count = 4

def mytask(num):
    print('Started task, sleeping %s' % num)
    time.sleep(num)
    # Actually, you should return the job you've created here.
    return num

pool = multiprocess.Pool(process_count)
jobs  = []
items = [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4]
job_count = 0
for job in pool.imap_unordered(mytask, items):
    jobs.append(job)
    job_count += 1

    incomplete = len(items) - job_count
    unsubmitted = max(0, incomplete - process_count)

    print "Jobs incomplete: %s. Unsubmitted: %s" % incomplete, unsubmitted

pool.close()

我要减去 process_count,因为您几乎可以假设所有进程都将处理以下两个例外之一:1) 如果您使用迭代器,则可能没有更多的项目可以使用和处理,并且2) 您的剩余物品可能少于 4 件。我没有为第一个异常编写代码。但是,如果您需要,这样做应该很容易。无论如何,你的例子使用了一个列表,所以你不应该有这个问题。

编辑:我还意识到您正在使用 While 循环,这使您看起来像是在尝试定期更新某些内容,例如每半秒或某事。我作为示例给出的代码不会那样做。我不确定这是否有问题。

【讨论】:

  • 谢谢。我还没有真正探索过 imap 函数(文档有点……简洁)。不过你说得对,我想在工作进行时做一些其他事情,并定期报告剩余的工作量。
【解决方案2】:

看起来jobs._number_left 是您想要的。 _ 表示它是一个内部值,可能会随开发人员的心血来潮而改变,但它似乎是获取该信息的唯一方法。

【讨论】:

  • 啊!它不在 API 文档中,我忘记在 ipython 中的作业上执行 dir()。感谢您的回答!
  • API 文档中没有 _number_left 是否有充分的理由?它是否会被弃用或在未来更改名称?
【解决方案3】:

我有类似的要求:跟踪进度,根据结果执行临时工作,在任意时间干净地停止所有处理。我的处理方式是使用apply_async 一次发送一个任务。我所做的非常简化的版本:

maxProcesses = 4
q = multiprocessing.Queue()
pool = multiprocessing.Pool()
runlist = range(100000)
sendcounter = 0
donecounter = 0
while donecounter < len(runlist):
    if stopNowBooleanFunc():  # if for whatever reason I want to stop processing early
        if donecounter == sendcounter:  # wait til already sent tasks finish running
            break
    else:  # don't send new tasks if it's time to stop
        while sendcounter < len(runlist) and sendcounter - donecounter < maxProcesses:
            pool.apply_async(mytask, (runlist[sendcounter], q))
            sendcounter += 1

    while not q.empty():  # process completed results as they arrive
        aresult = q.get()
        processResults(aresult)
        donecounter += 1

请注意,我使用Queue 而不是return 来获取结果。

【讨论】:

    【解决方案4】:

    假设您使用的是apply_async,您可以通过查看Pool._cache 属性来检查待处理作业的数量。这是 ApplyResult 存储的位置,直到它们可用并且等于待处理的 ApplyResults 的数量。

    import multiprocessing as mp
    import random
    import time
    
    
    def job():
        time.sleep(random.randint(1,10))
        print("job finished")
    
    if __name__ == '__main__':
        pool = mp.Pool(5)
        for _ in range(10):
            pool.apply_async(job)
    
        while pool._cache:
            print("number of jobs pending: ", len(pool._cache))
            time.sleep(2)
    
        pool.close()
        pool.join()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2010-12-11
      • 2021-02-13
      • 1970-01-01
      • 2020-09-15
      • 2019-08-29
      • 2014-06-30
      • 2018-02-28
      • 1970-01-01
      相关资源
      最近更新 更多