【发布时间】:2020-04-22 17:28:55
【问题描述】:
我有一个类 (MyClass),其中包含需要运行的操作队列 (self.msg_queue),并且我有多个可以将任务添加到队列的输入源。
现在我想同时运行三个函数:
- MyClass.get_input_from_user()
- 在 tkinter 中创建一个窗口,让用户填写信息,并在用户按下提交时将该消息推送到队列中。
- MyClass.get_input_from_server()
- 检查服务器是否有消息,读取消息,然后将其放入队列。此方法使用 MyClass 的父类中的函数。
- MyClass.execute_next_item_on_the_queue()
- 从队列中弹出一条消息,然后对其进行操作。这取决于消息是什么,但每条消息都对应于 MyClass 或其父级中的某个方法,这些方法根据一个大决策树运行。
流程说明: 类加入网络后,我让它产生三个线程(一个用于上述每个函数)。每个线程函数使用语法“self.msg_queue.put(message)”从队列中添加项目,并使用“self.msg_queue.get_nowait()”从队列中删除项目。
问题描述: 我遇到的问题是,似乎每个线程都在修改自己的队列对象(它们没有共享队列 msg_queue,它们都是函数的成员)。
我对 Multiprocessing 不够熟悉,无法知道重要的错误消息是什么;但是,它声明它不能腌制一个弱引用对象(它没有给出哪个对象是弱引用对象的指示),并且在 queue.put() 中调用“self._sem.acquire(block, timeout) 产生'[WinError 5] 访问被拒绝'”错误。假设队列引用中的这个失败没有正确复制是否安全?
[我正在使用 Python 3.7.2 和 Multiprocessing 包的 Process and Queue]
[我已经看到多个关于让线程在类之间穿梭信息的 Q/As——创建一个主工具来生成一个队列,然后将该队列作为参数传递给每个线程。如果函数不必使用 MyClass 中的其他函数,我可以看到通过让这些函数进入队列并使用局部变量而不是类变量来适应这种策略。]
[我相当有信心这个错误不是将我的队列传递给 tkinter 对象的结果,因为我的单元测试我的 GUI 如何修改其调用者的队列工作正常]
以下是队列错误的最小可重现示例:
from multiprocessing import Queue
from multiprocessing import Process
import queue
import time
class MyTest:
def __init__(self):
self.my_q = Queue()
self.counter = 0
def input_function_A(self):
while True:
self.my_q.put(self.counter)
self.counter = self.counter + 1
time.sleep(0.2)
def input_function_B(self):
while True:
self.counter = 0
self.my_q.put(self.counter)
time.sleep(1)
def output_function(self):
while True:
try:
var = self.my_q.get_nowait()
except queue.Empty:
var = -1
except:
break
print(var)
time.sleep(1)
def run(self):
process_A = Process(target=self.input_function_A)
process_B = Process(target=self.input_function_B)
process_C = Process(target=self.output_function)
process_A.start()
process_B.start()
process_C.start()
# without this it generates the WinError:
# with this it still behaves as if the two input functions do not modify the queue
process_C.join()
if __name__ == '__main__':
test = MyTest()
test.run()
【问题讨论】:
-
@stovfl,因此我从中收集到 Process 在创建具有新内存空间的新进程方面的性质取代了类将类变量分组到公共空间中的能力。因此,为了解决我的问题,我需要给我的函数参数并将我的类的浅克隆传递给新的 Process 以便它修改正确的东西?或者进程试图修改其堆栈外的内存是导致 WinError 5 Access is Denied 错误的原因吗?
-
没有minimal reproducible example,一切都是胡乱猜测。一般来说,如果您使用
multiprocessing.queue,则根本不应该有访问被拒绝,因为Queue使用锁定来处理并发访问。 -
@stovfl,我为我从队列中得到的错误添加了一个最小的可重现示例。
标签: python multithreading multiprocessing