【问题标题】:Force Multiprocessing Pool to iterate over argument强制多处理池迭代参数
【发布时间】:2018-10-20 09:16:51
【问题描述】:

我正在使用多处理池一遍又一遍地为多个参数运行一个函数。我使用一个由另一个线程填充的作业列表和一个job_handler 函数来处理每个作业。我的问题是,当列表变空时,池将结束函数。我想让游泳池保持活力并等到列表填满。实际上,有两种情况可以解决这个问题。

1.使用一个池,但列表为空后会结束:

from multiprocessing import Pool
from threading import Thread
from time import sleep


def job_handler(i):
    print("Doing job:", i)
    sleep(0.5)

def job_adder():
    i = 0
    while True:
        jobs.append(i)
        i += 1
        sleep(0.1)


if __name__ == "__main__":
    pool = Pool(4)
    jobs = []
    thr = Thread(target=job_adder)
    thr.start()
    # wait for job_adder to add to list
    sleep(1)
    pool.map_async(job_handler, jobs)
    while True:
        pass

2.多个map_async:

from multiprocessing import Pool
from threading import Thread
from time import sleep


def job_handler(i):
    print("Doing job:", i)
    sleep(0.5)

def job_adder():
    i = 0
    while True:
        jobs.append(i)
        i += 1
        sleep(0.1)


if __name__ == "__main__":
    pool = Pool(4)
    jobs = []
    thr = Thread(target=job_adder)
    thr.start()
    while True:
        for job in jobs:
            pool1 = pool.map_async(job_handler, (job,))
            jobs.remove(job)

两者有什么区别?我认为第一个选项会更好,因为地图本身会处理迭代。我的目标是获得更好的性能来分别处理每项工作。

【问题讨论】:

    标签: python python-3.x python-multiprocessing


    【解决方案1】:

    在许多情况下都需要“放慢”Pool。这种情况比some简单:

    q=queue.Queue()
    m=pool.imap(iter(q.get,None))
    

    你也可以使用imap_unorderedNone 是终止Pool 的哨兵。 Pool 必须使用线程来收集任务(因为这些函数“比map() 更懒惰”),并且它会根据需要阻塞q

    【讨论】:

    • 谢谢,使用队列是个好主意。但我的问题是当使用imap 时强制我通过m 获得结果,这似乎是一个生成器,但我更喜欢map_async 上的回调。将queuemap_async 一起使用对我不起作用。
    • @MasoudR.:你显然可以启动一个线程来消费返回的生成器并发出回调; map_async 已经在单独的线程上这样做了,所以同步要求是一样的。
    猜你喜欢
    • 2020-04-16
    • 2015-01-13
    • 2020-07-24
    • 2018-12-04
    • 2021-08-15
    • 1970-01-01
    • 2015-05-23
    • 2017-02-11
    • 1970-01-01
    相关资源
    最近更新 更多