【问题标题】:How to create a synchronized object with Python multiprocessing?如何使用 Python 多处理创建同步对象?
【发布时间】:2011-02-16 21:43:26
【问题描述】:

我很难弄清楚如何制作一个同步的 Python 对象。我有一个名为 Observation 的类和一个名为 Variable 的类,它们基本上看起来像(代码被简化以显示本质):

class Observation:
    def __init__(self, date, time_unit, id, meta):
        self.date = date
        self.time_unit = time_unit
        self.id = id
        self.count = 0
        self.data = 0

    def add(self, value):
        if isinstance(value, list):
            if self.count == 0:
                self.data = []
            self.data.append(value)
        else:
            self.data += value
        self.count += 1


class Variable:
    def __init__(self, name, time_unit, lock):
        self.name = name
        self.lock = lock
        self.obs = {}
        self.time_unit = time_unit

    def get_observation(self, id, date, meta):
        self.lock.acquire()
        try:
            obs = self.obs.get(id, Observation(date, self.time_unit, id, meta))
            self.obs[id] = obs
        finally:
            self.lock.release()
        return obs

    def add(self, date, value, meta={}):
        self.lock.acquire()
        try:
            obs = self.get_observation(id, date, meta)
            obs.add(value)
            self.obs[id] = obs
        finally:
            self.lock.release()

这就是我设置多处理部分的方式: 插件 = 在其他地方定义的函数 任务 = JoinableQueue() 结果 = JoinableQueue() 经理=经理() 锁定 = 经理.RLock() var = Variable('foobar', 'year', lock)

for person in persons:
    tasks.put(Task(plugin, var, person))

代码应该如何工作的示例:

我有一个名为 var 的变量实例,我想向 var 添加一个观察结果:

today = datetime.datetime.today()  
var.add(today, 1)  

因此,Variable 的 add 函数会查看是否已经存在该日期的观测值,如果存在则返回该观测值,否则它会创建一个新的 Observation 实例。通过调用 obs.add(value) 添加一个观察值而不是实际值。我主要担心的是我想确保不同的进程不会在同一日期创建多个观察实例,这就是我锁定它的原因。

变量的一个实例是使用多处理库创建并在不同进程之间共享的,它是众多观察实例的容器。上面的代码不起作用,我得到错误:

RuntimeError: 锁定对象应该只 通过在进程之间共享 继承

但是,如果我在启动不同进程之前实例化一个 Lock 对象并将其提供给 Variable 的构造函数,那么我似乎会遇到竞争条件,因为所有进程似乎都在等待彼此。

最终目标是不同的进程可以更新对象Variable中的obs变量。我需要它是线程安全的,因为我不仅要修改字典,还要添加新元素并增加现有变量。 obs 变量是一个包含一堆观察实例的字典。

如何在多个多处理进程之间共享一个变量的单个实例时进行同步?非常感谢您的认知过剩!

更新 1:
* 我正在使用多处理锁,并且我已更改源代码以显示这一点。
* 我已更改标题以更准确地捕捉问题
* 我已经用同步替换了 theadsafe,因为我混淆了这两个术语。

感谢 Dmitry Dvoinikov 指出我!

我仍然不确定的一个问题是我在哪里实例化 Lock?这应该发生在类内部还是在初始化多进程并将其作为参数之前?回答:应该发生在课外。

更新 2:
* 我通过将 Lock 的初始化移到类定义之外并使用管理器修复了“锁定对象只能通过继承在进程之间共享”错误。
* 最后一个问题,现在一切正常,除了似乎当我将变量实例放入队列时它没有得到更新,并且每次我从队列中获取它时,它都不包含我在上一次迭代中添加的观察结果。这是唯一让我困惑的事情:(

更新 3:
最终的解决方案是将 var.obs 字典设置为 mgr.dict() 的实例,然后使用自定义序列化程序。很高兴与也在为此苦苦挣扎的人分享代码。

【问题讨论】:

    标签: python synchronization thread-safety multiprocessing


    【解决方案1】:

    您不是在谈论线程安全,而是在谈论不同进程之间的同步,这是完全不同的事情。不管怎样,开始

    不同的进程可以更新对象Variable中的obs变量。

    暗示变量在共享内存中,并且您必须在其中显式存储对象,本地实例对单独的进程可见并不神奇。这里:

    可以使用 Value 或 Array 将数据存储在共享内存映射中

    然后,您的代码 sn-p 缺少关键的导入部分。无法判断您是否实例化了正确的 multiprocessing.Lock,而不是 multithreading.Lock。您的代码没有显示您创建流程和传递数据的方式。

    因此,我建议您了解线程和进程之间的区别,您是否真的需要一个包含多个进程的应用程序的共享内存模型并检查spec

    【讨论】:

    • 您好 Dimitry,感谢您的回答!我根据您的反馈改进了这个问题。我知道线程和进程之间的区别,但我已经澄清了导入部分。我还在为在哪里实例化锁而苦苦挣扎?这应该发生在类内部还是在初始化多进程并将其作为变量和观察的参数之前?是的,你是对的变量应该使用共享内存。
    • 没有必要以任何特殊方式使任何类的实例“安全”,因为在多处理场景中,来自不同进程的线程可能不会执行其中的代码。您跨进程共享的只是简单的哑数据值。换句话说,锁应该保护的不是某些实例的内部,而是对外部值的访问,即您只能同步对共享数据的访问。至于什么时候初始化锁,据我了解,锁需要由主进程创建,并作为参数传递给从进程。
    • 你是对的,Lock 的初始化需要在类之外进行。一切正常,除了当迭代队列并且我得到一个新任务时,新任务中的变量实例就好像它刚刚被初始化一样,它不包含以前添加的任何观察结果......
    • 听上去,queue 是将值从 A 点传递到 B 点的东西,而不是累积任何东西。如果从属进程推送某些内容,则主进程将其按原样弹出(并更新它可能拥有的任何统计信息)。你确定你选择了正确的价值容器吗?队列,而不是数组?
    • 感谢您的所有提示!这是一项相当多的工作,但我学到了很多东西。
    猜你喜欢
    • 2011-08-14
    • 2019-10-27
    • 2017-11-20
    • 2023-03-09
    • 2021-10-06
    • 1970-01-01
    • 2021-11-04
    • 2017-08-31
    • 2014-04-26
    相关资源
    最近更新 更多