【发布时间】:2021-07-10 02:37:08
【问题描述】:
问题
我是多处理的新手,我尝试的所有事情都无济于事。每次我想我想出一些东西时,我都会遇到一个新的障碍。我的目标是使用多个进程加载队列,然后使用多个进程从队列中拉取数据并处理数据。我已经尝试恢复到基本的队列处理,但是一旦我实现了多个进程,我就无法从队列中取出任何东西。我错过了什么?
代码
rom multiprocessing import Process, Lock
from queue import Queue
import os
q = Queue(5)
def get_from_q():
print('trying to get')
print(q.get())
if __name__ == '__main__':
# put items at the end of the queue
for x in range(6):
print('adding ', x)
q.put(x)
PROCESSOR_COUNT = os.cpu_count()
processes = []
for p in range(PROCESSOR_COUNT):
print('spawning process')
p = Process(target=get_from_q)
processes.append(p)
for p in processes:
print('starting')
p.start()
for p in processes:
print('joining')
p.join()
结果:
adding 0
adding 1
adding 2
adding 3
adding 4
adding 5
预期结果
adding 0
adding 1
adding 2
adding 3
adding 4
adding 5
spawning process
spawning process
spawning process
spawning processv
starting
starting
starting
starting
trying to get
0
trying to get
1
trying to get
2
trying to get
3
trying to get
4
trying to get
5
joining
joining
joining
joining
【问题讨论】:
-
您是否尝试过将 queue.Queue 换成 multiprocessing.Queue?
-
是的,我从那个开始,然后移到这个,因为当我有 mp.Queue 时它没有向队列添加任何内容
-
您的队列应该是
multiprocessing.Queue,只能容纳5 条记录。然而,你做的第一件事就是尝试写 6 条记录。您将在第 6 次写入时阻塞。然后您正在创建要读取的cpu_count()进程。如果您拥有的处理器数量大于 6,则您创建的额外进程将永远等待尝试从空队列中读取。您应该为我们这些试图帮助您的凡人明确设置池大小。您可能知道您拥有的处理器数量,但我们不知道。 -
@Booboo 我可以用
if q.full(): break之类的东西杀死额外的进程吗? -
@Branden-Pincince 查看
full()的文档。它说:如果队列已满,则返回True,否则返回False。由于多线程/多处理语义,这是不可靠的。 你想要可靠 代码还是只是胡闹?如果q.full()是可靠的呢?所以你最终只会写 5 条记录。但是第六张唱片呢?你不在乎它是否从未被写过?问问自己,“我想要完成什么?”我真的说不出来。
标签: python multiprocessing queue python-multiprocessing