【问题标题】:Process won't terminate when i use Queue当我使用队列时进程不会终止
【发布时间】:2016-07-03 16:38:13
【问题描述】:

当数组(数据)超过 10,000 个时,并非所有进程都完成 (见最后一行 print('compete'))。 当数组多达 2,000 个元素时,此代码可以正常工作。 我认为 queue 有问题, 没有 result_queue.put([i,j]) 所有进程都正确完成。 有人可以帮我处理这部分代码吗?

    def finder(start,end,proc,result_queue,lock):
    global data
    i=start
    while i<=end:
        el=data[i]
        j=-1
        for el1 in data:
            j=j+1
            s1 = SequenceMatcher(None, el, el1)
            s1_val=s1.ratio()
            if s1_val>0.9: result_queue.put([i,j])
        i=i+1
    print('end')


if __name__ == '__main__':
    multiprocessing.freeze_support()
    result_queue = multiprocessing.Queue()
    allProcesses = []
    data=r.keys()
    print(len(data))
    parts=8
    part=int(len(data)/parts)
    i=0
    lock = multiprocessing.Lock()

    while i<parts:
            p = multiprocessing.Process(target=finder, args=(part*i, part*i+part,i,result_queue,lock ))
            print('init',part*i, part*i+part,i)
            allProcesses.append(p)
            p.daemon = True
            p.start()
            i=i+1
            print('started process',i)
    i=0

    for p in allProcesses:
        p.join()
        print('complete')

【问题讨论】:

  • 在没有p.daemon = True的情况下运行有什么区别吗?
  • 没有区别(((仅完成 8 个中的 3 个
  • 队列可能有大小限制。如果队列没有消费者,它将被填满,queue.put() 将阻塞。至少this 表明存在限制。
  • 我的队列长度低于 32767。实际长度小于 500-600。和代码执行没有错误
  • 您能否创建一个简化的 MCVE (stackoverflow.com/help/mcve) 来演示该问题?我们无法运行您提供的代码,因为我们没有SequenceMatcher,并且没有定义r

标签: python-3.x queue multiprocessing


【解决方案1】:

简答:使用 multiprocessing.Manager 创建队列

    m = multiprocessing.Manager()
    result_queue = m.Queue()

更详细一点的答案: multiprocessing.Manager 将返回一个

<class 'multiprocessing.managers.AutoProxy[Queue]'>

可以在工作人员之间安全共享的实例。

这是一个完整的可运行示例

import time
import multiprocessing


def finder(start,end,proc,result_queue,lock):
    #global data


    for i in range(start, end+1):
        #print (type(result_queue))

        result_queue.put((i,))




    print('end %s'%proc)

r = {i:i for i in range(100000)}


def main():

    multiprocessing.freeze_support()
    allProcesses = []
    data=r.keys()
    print(len(data))
    parts=8
    part=int(len(data)/parts)
    i=0
    lock = multiprocessing.Lock()
    m = multiprocessing.Manager()
    result_queue = m.Queue()
    while i<parts:

            p = multiprocessing.Process(target=finder, args=(part*i, part*i+part,i,result_queue,lock ))
            print('init',part*i, part*i+part,i)

            p.daemon = False
            p.start()
            i=i+1
            print('started process',i)
            allProcesses.append(p)


    for p in allProcesses:
        print("join", p)

        print(p.join())
        print('complete')


if __name__ == '__main__':
    main()

如果您将 m.Queue 更改为 multiprocessing.Queue,您将看到您的旧行为

【讨论】:

  • 我是否需要为每个队列配备不同的经理,或者一个经理就足以满足所有需要的队列?
  • @ZenoDallaValle - 一个就足够了
猜你喜欢
  • 2021-09-05
  • 2015-08-26
  • 2011-09-19
  • 1970-01-01
  • 1970-01-01
  • 2016-04-16
  • 1970-01-01
  • 1970-01-01
  • 2012-11-06
相关资源
最近更新 更多