【问题标题】:is there a way to limit how much gets submitted to a Pool of workers?有没有办法限制提交给工人池的数量?
【发布时间】:2019-04-03 23:56:40
【问题描述】:

我有一个工人池并使用apply_async 向他们提交工作。 我不关心应用于每个项目的函数的结果。 池似乎可以接受任意数量的apply_async 调用,无论数据有多大或工作人员能够以多快的速度跟上工作。

有没有办法让apply_async 在有一定数量的项目等待处理时立即阻止?我确信在内部,池正在使用队列,所以只使用队列的最大大小是微不足道的吗?

如果这不受支持,那么提交一份大报告是否有意义,因为这看起来是非常基本的功能,而且添加起来相当琐碎?

如果一个人必须从根本上重新实现 Pool 的整个逻辑才能使其工作,那将是一种耻辱。

这是一些非常基本的代码:

from multiprocessing import Pool
dowork(item):
    # process the item (for side effects, no return value needed)
    pass 

pool = Pool(nprocesses)
for work in getmorework():
    # this should block if we already have too many work waiting!        
    pool.apply_async(dowork, (work,))
pool.close()
pool.join()

【问题讨论】:

  • 拦截的目的是什么?也许Pool 是错误的抽象,为什么不只是Queue 和一些Processes?该模块已经存在很多年了,我怀疑它会改变,它已经非常复杂了,更不用说公开阻止你的建议的许多组合了(或者它可能不会太糟糕,但它似乎并不重要我)
  • 唯一的目的是简单地限制可以排队给工人的工作量,而不必手动监控工人已经完成了多少工作。填充队列的进程通常可以比消耗数据快得多的速度提供数据,这可能会导致内存问题。正如我所说,队列已经具有限制大小的功能,所以添加它应该是微不足道的。

标签: python python-3.x python-multiprocessing


【解决方案1】:

所以是这样的?

import multiprocessing
import time

worker_count = 4
mp = multiprocessing.Pool(processes=worker_count)
workers = [None] * worker_count

while True:
    try:
        for i in range(worker_count):
            if workers[i] is None or workers[i].ready():
                workers[i] = mp.apply_async(dowork, args=next(getmorework()))
    except StopIteration:
        break
    time.sleep(1)

我不知道您希望每个工人多快完成,time.sleep 可能是必要的,也可能不是必要的,或者可能需要不同的时间或其他什么。

【讨论】:

  • 谢谢,我认为这实际上可能是在当前情况下实现这一目标的最简单方法。 Pool 构造函数不允许将最大大小传递给内部使用的队列,这真是太可惜了。
【解决方案2】:

另一种方法可能是直接使用Queue

from multiprocessing import Process, JoinableQueue
from time import sleep
from random import random

def do_work(i):
    print(f"worker {i}")
    sleep(random())
    print(f"done {i}")

def worker():
    while True:
        item = q.get()
        if item is None:
            break
        do_work(item)
        q.task_done()

def generator(n):
    for i in range(n):
        print(f"gen {i}")
        yield i

# 1 = allow generator to get this far ahead
q = JoinableQueue(1)

# 2 = maximum amount of parallelism
procs = [Process(target=worker) for _ in range(2)]
# and get them running
for p in procs:
    p.daemon = True
    p.start()

# schedule 10 items for processing
for item in generator(10):
    q.put(item)

# wait for jobs to finish executing
q.join()

# signal workers to finish up
for p in procs:
    q.put(None)
# wait for workers to actually finish
for p in procs:
    p.join()

大部分来自 Python 的 queue 模块示例:

https://docs.python.org/3/library/queue.html#queue.Queue.join

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2014-01-02
    • 2022-07-20
    • 1970-01-01
    • 1970-01-01
    • 2013-08-10
    • 2015-08-11
    • 1970-01-01
    相关资源
    最近更新 更多