【问题标题】:How to allow a class's variables to be modified concurrently by multiple threads如何允许多个线程同时修改一个类的变量
【发布时间】:2020-04-22 17:28:55
【问题描述】:

我有一个类 (MyClass),其中包含需要运行的操作队列 (self.msg_queue),并且我有多个可以将任务添加到队列的输入源。

现在我想同时运行三个函数:

  • MyClass.get_input_from_user()
    • 在 tkinter 中创建一个窗口,让用户填写信息,并在用户按下提交时将该消息推送到队列中。
  • MyClass.get_input_from_server()
    • 检查服务器是否有消息,读取消息,然后将其放入队列。此方法使用 MyClass 的父类中的函数。
  • MyClass.execute_next_item_on_the_queue()
    • 从队列中弹出一条消息,然后对其进行操作。这取决于消息是什么,但每条消息都对应于 MyClass 或其父级中的某个方法,这些方法根据一个大决策树运行。

流程说明: 类加入网络后,我让它产生三个线程(一个用于上述每个函数)。每个线程函数使用语法“self.msg_queue.put(message)”从队列中添加项目,并使用“self.msg_queue.get_nowait()”从队列中删除项目。

问题描述: 我遇到的问题是,似乎每个线程都在修改自己的队列对象(它们没有共享队列 msg_queue,它们都是函数的成员)。

我对 Multiprocessing 不够熟悉,无法知道重要的错误消息是什么;但是,它声明它不能腌制一个弱引用对象(它没有给出哪个对象是弱引用对象的指示),并且在 queue.put() 中调用“self._sem.acquire(block, ti​​meout) 产生'[WinError 5] 访问被拒绝'”错误。假设队列引用中的这个失败没有正确复制是否安全?

[我正在使用 Python 3.7.2 和 Multiprocessing 包的 Process and Queue]

[我已经看到多个关于让线程在类之间穿梭信息的 Q/As——创建一个主工具来生成一个队列,然后将该队列作为参数传递给每个线程。如果函数不必使用 MyClass 中的其他函数,我可以看到通过让这些函数进入队列并使用局部变量而不是类变量来适应这种策略。]

[我相当有信心这个错误不是将我的队列传递给 tkinter 对象的结果,因为我的单元测试我的 GUI 如何修改其调用者的队列工作正常]

以下是队列错误的最小可重现示例:

from multiprocessing import Queue
from multiprocessing import Process
import queue
import time

class MyTest:
    def __init__(self):
        self.my_q = Queue()
        self.counter = 0

    def input_function_A(self):
        while True:
            self.my_q.put(self.counter)
            self.counter = self.counter + 1
            time.sleep(0.2)

    def input_function_B(self):
        while True:
            self.counter = 0
            self.my_q.put(self.counter)
            time.sleep(1)

    def output_function(self):
        while True:
            try:
                var = self.my_q.get_nowait()
            except queue.Empty:
                var = -1
            except:
                break
            print(var)
            time.sleep(1)

    def run(self):
        process_A = Process(target=self.input_function_A)
        process_B = Process(target=self.input_function_B)
        process_C = Process(target=self.output_function)

        process_A.start()
        process_B.start()
        process_C.start()

        # without this it generates the WinError: 
        # with this it still behaves as if the two input functions do not modify the queue
        process_C.join() 

if __name__ == '__main__':
    test = MyTest()
    test.run()

【问题讨论】:

  • @stovfl,因此我从中收集到 Process 在创建具有新内存空间的新进程方面的性质取代了类将类变量分组到公共空间中的能力。因此,为了解决我的问题,我需要给我的函数参数并将我的类的浅克隆传递给新的 Process 以便它修改正确的东西?或者进程试图修改其堆栈外的内存是导致 WinError 5 Access is Denied 错误的原因吗?
  • 没有minimal reproducible example,一切都是胡乱猜测。一般来说,如果您使用multiprocessing.queue,则根本不应该有访问被拒绝,因为Queue 使用锁定来处理并发访问。
  • @stovfl,我为我从队列中得到的错误添加了一个最小的可重现示例。

标签: python multithreading multiprocessing


【解决方案1】:

确实 - 这些不是“线程” - 这些是“进程” - 而如果您使用的是多线程而不是多处理,self.my_q 实例将是同一个对象,放置在计算机上相同的内存空间, multiprocessing 对进程进行fork,原始进程中的任何数据(在“run”调用中执行的数据)在使用时都会被复制 - 因此,每个子进程都会看到自己的“队列”实例,与其他实例无关。

让各个进程共享一个 multiprocessing.Queue 对象的正确方法是将它作为参数传递给目标方法。重组代码以使其正常工作的更简单方法是:

from multiprocessing import Queue
from multiprocessing import Process
import queue
import time

class MyTest:
    def __init__(self):
        self.my_q = Queue()
        self.counter = 0

    def input_function_A(self, queue):
        while True:
            queue.put(self.counter)
            self.counter = self.counter + 1
            time.sleep(0.2)

    def input_function_B(self, queue):
        while True:
            self.counter = 0
            queue.put(self.counter)
            time.sleep(1)

    def output_function(self, queue):
        while True:
            try:
                var = queue.get_nowait()
            except queue.Empty:
                var = -1
            except:
                break
            print(var)
            time.sleep(1)

    def run(self):
        process_A = Process(target=self.input_function_A, args=(queue,))
        process_B = Process(target=self.input_function_B, args=(queue,))
        process_C = Process(target=self.output_function, args=(queue,))

        process_A.start()
        process_B.start()
        process_C.start()

        # without this it generates the WinError: 
        # with this it still behaves as if the two input functions do not modify the queue
        process_C.join() 

if __name__ == '__main__':
    test = MyTest()
    test.run()

如您所见,由于您的类实际上并没有通过实例的属性共享任何数据,因此这种“类”设计对您的应用程序没有多大意义 - 但对于将不同的工作人员分组在同一个代码块中。

有可能有一个魔术多进程类,它有一些内部方法来实际启动工作方法并共享队列实例 - 所以如果你在一个项目中有很多这样的,就会有一个少得多的样板。

一些东西:

from multiprocessing import Queue
from multiprocessing import Process
import time


class MPWorkerBase:
    def __init__(self, *args, **kw):
        self.queue = None
        self.is_parent_process = False
        self.is_child_process = False
        self.processes = []
        # ensure this can be used as a colaborative mixin
        super().__init__(*args, **kw)

    def run(self):
        if self.is_parent_process or self.is_child_process:
            # workers already initialized
            return

        self.queue = Queue()
        processes = []

        cls = self.__class__
        for name in dir(cls):
            method = getattr(cls, name)
            if callable(method) and getattr(method, "_MP_worker", False):
                process = Process(target=self._start_worker, args=(self.queue, name))
                self.processes.append(process)
                process.start()
        # Setting these attributes here ensure the child processes have the initial values for them.
        self.is_parent_process = True
        self.processes = processes

    def _start_worker(self, queue, method_name):

        # this method is called in a new spawned process - attribute
        # changes here no longer reflect attributes on the
        # object in the initial process

        # overwrite queue in this process with the queue object sent over the wire:
        self.queue = queue
        self.is_child_process = True
        # call the worker method
        getattr(self, method_name)()

    def __del__(self):
        for process in self.processes:
            process.join()


def worker(func):
    """decorator to mark a method as a worker that should
    run in its own subprocess
    """

    func._MP_worker = True
    return func


class MyTest(MPWorkerBase):
    def __init__(self):
        super().__init__()
        self.counter = 0

    @worker
    def input_function_A(self):
        while True:
            self.queue.put(self.counter)
            self.counter = self.counter + 1
            time.sleep(0.2)

    @worker
    def input_function_B(self):
        while True:
            self.counter = 0
            self.queue.put(self.counter)
            time.sleep(1)

    @worker
    def output_function(self):
        while True:
            try:
                var = self.queue.get_nowait()
            except queue.Empty:
                var = -1
            except:
                break
            print(var)
            time.sleep(1)


if __name__ == '__main__':
    test = MyTest()
    test.run()

【讨论】:

  • 感谢您的反馈。我之所以使用这种架构,是因为我有许多计算机,每台计算机都将运行该程序的副本并进行通信(服务器部分)。我把这个线程化的 I/O 函数放到了我的 Client 类中,这样它的子类就只需要定义一些新的属性和它们特定的决策树。我希望这两个 I/O 函数都能够调用一组通用的函数来修改其父类的属性(self.queue、self.passwords、self.read_message() 等)。从你说的看,我想成为多线程而不是多处理。
  • 我添加了一个 mixin 类,它将删除用于分发“self.queue”的样板 - 至于多线程,你提到你使用 tkinter 接口 - 你必须在那里小心:everythng tkinter-相关必须在主线程上运行,您应该使用 tkinter.after 方法来继续调用您的方法来检查接口值,而不是 while True: ...; time.sleep() 模式。 (effbot.org/tkinterbook/widget.htm)
  • 谢谢,这让事情看起来更清楚了。我在这方面肯定有更多的研究要做(在并发 I/O 和 tkinter 方面),所以我非常感谢你的帮助。切换到线程(通过进程)架构已经大有帮助。
  • 在一个线程中,实例属性实际上是共享的——(因此,所有工作人员将看到相同的self.counter)在多处理中,每个工作人员都有自己的计数器
猜你喜欢
  • 1970-01-01
  • 2021-01-24
  • 1970-01-01
  • 2014-12-02
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多