【问题标题】:multiprocessing - pool allocation多处理 - 池分配
【发布时间】:2012-04-27 09:27:52
【问题描述】:

我注意到 python 中用于池分配的这种行为。即使我在池中有 20 个进程,当我为 8 个进程执行 map_async 时,我并没有将所有进程都扔掉,而是只执行了 4 个。当这四个完成时,它会再发送两个,然后当这两个完成时发送一个。

当我向它扔超过 20 个时,它会运行所有 20 个,直到它开始在队列中获得少于 20 个,当上述行为重复时。

我认为这是故意的,但看起来很奇怪。我的目标是在请求进入后立即处理它们,显然这种行为不适合。

使用带有billiard 的python 2.6 来支持maxtasksperchild

有什么想法可以改进吗?

代码:

mypool = pool.Pool(processes=settings['num-processes'], initializer=StartChild, maxtasksperchild=10)

while True:
    lines = DbData.GetAll()
    if len(lines) > 0:
        print 'Starting to process: ', len(lines), ' urls'
        Res = mypool.map_async(RunChild, lines)
        Returns = Res.get(None)
        print 'Pool returns: ', idx, Returns
    else:
        time.sleep(0.5)

【问题讨论】:

    标签: python multiprocessing pool


    【解决方案1】:

    我在 Python 中处理多处理的一种方法如下:

    我有数据要使用函数function()
    首先我创建一个多处理子类:

    import multiprocessing
    
    class ProcessThread(multiprocessing.Process):
        def __init__(self, id_t, inputqueue, idqueue, function, resultqueue):
            self.id_t = id_t
            self.inputlist = inputqueue
            self.idqueue = idqueue
            self.function = function
            self.resultqueue = resultqueue
    
            multiprocessing.Process.__init__(self)
    
        def run(self):
            s = "process number: " + str(self.id_t) + " starting"
            print s
            result = []
    
            while self.inputqueue.qsize() > 0
                try:
                    inp = self.inputqueue.get()
                except Exception:
                    pass
                result = self.function(inp)
                while 1:
                   try:
                       self.resultqueue.put([self.id,])
                   except Exception:
                       pass
                   else:
                       break
                self.idqueue.put(id)
                return
    

    及主要功能:

    inputqueue = multiprocessing.Queue()
    resultqueue = multiprocessing.Queue()
    idqueue = multiprocessing.Queue()
    
    def function(data):
        print data # or what you want
    
    for datum in data:
        inputqueue.put(datum)
    
    for i in xrange(nbprocess):
        ProcessThread(i, inputqueue, idqueue, function, resultqueue).start()
    

    最终得到结果:

    results = []
    while idqueue.qsize() < nbprocess:
        pass
    while resultqueue.qsize() > 0:
        results.append(resultqueue.get())
    

    通过这种方式,您可以完美地控制流程和其他内容的附加内容。 仅当每个数据的计算速度非常慢(inputqueue 才是一种有效的技术,因为不同进程同时访问队列(这就是我使用异常的原因)。如果您的函数计算速度非常快,请考虑在开始时仅将数据拆分一次,并在开始时为每个进程放置数据集块。

    【讨论】:

    • 谢谢,这有助于修改我的脚本。我摆脱了默认池处理并根据您的示例实现了自己的。
    猜你喜欢
    • 2015-01-13
    • 1970-01-01
    • 2021-07-24
    • 2020-08-14
    • 2016-11-10
    • 2016-07-10
    • 2020-04-11
    • 2017-04-18
    相关资源
    最近更新 更多