【发布时间】:2011-02-16 21:43:26
【问题描述】:
我很难弄清楚如何制作一个同步的 Python 对象。我有一个名为 Observation 的类和一个名为 Variable 的类,它们基本上看起来像(代码被简化以显示本质):
class Observation:
def __init__(self, date, time_unit, id, meta):
self.date = date
self.time_unit = time_unit
self.id = id
self.count = 0
self.data = 0
def add(self, value):
if isinstance(value, list):
if self.count == 0:
self.data = []
self.data.append(value)
else:
self.data += value
self.count += 1
class Variable:
def __init__(self, name, time_unit, lock):
self.name = name
self.lock = lock
self.obs = {}
self.time_unit = time_unit
def get_observation(self, id, date, meta):
self.lock.acquire()
try:
obs = self.obs.get(id, Observation(date, self.time_unit, id, meta))
self.obs[id] = obs
finally:
self.lock.release()
return obs
def add(self, date, value, meta={}):
self.lock.acquire()
try:
obs = self.get_observation(id, date, meta)
obs.add(value)
self.obs[id] = obs
finally:
self.lock.release()
这就是我设置多处理部分的方式: 插件 = 在其他地方定义的函数 任务 = JoinableQueue() 结果 = JoinableQueue() 经理=经理() 锁定 = 经理.RLock() var = Variable('foobar', 'year', lock)
for person in persons:
tasks.put(Task(plugin, var, person))
代码应该如何工作的示例:
我有一个名为 var 的变量实例,我想向 var 添加一个观察结果:
today = datetime.datetime.today()
var.add(today, 1)
因此,Variable 的 add 函数会查看是否已经存在该日期的观测值,如果存在则返回该观测值,否则它会创建一个新的 Observation 实例。通过调用 obs.add(value) 添加一个观察值而不是实际值。我主要担心的是我想确保不同的进程不会在同一日期创建多个观察实例,这就是我锁定它的原因。
变量的一个实例是使用多处理库创建并在不同进程之间共享的,它是众多观察实例的容器。上面的代码不起作用,我得到错误:
RuntimeError: 锁定对象应该只 通过在进程之间共享 继承
但是,如果我在启动不同进程之前实例化一个 Lock 对象并将其提供给 Variable 的构造函数,那么我似乎会遇到竞争条件,因为所有进程似乎都在等待彼此。
最终目标是不同的进程可以更新对象Variable中的obs变量。我需要它是线程安全的,因为我不仅要修改字典,还要添加新元素并增加现有变量。 obs 变量是一个包含一堆观察实例的字典。
如何在多个多处理进程之间共享一个变量的单个实例时进行同步?非常感谢您的认知过剩!
更新 1:
* 我正在使用多处理锁,并且我已更改源代码以显示这一点。
* 我已更改标题以更准确地捕捉问题
* 我已经用同步替换了 theadsafe,因为我混淆了这两个术语。
感谢 Dmitry Dvoinikov 指出我!
我仍然不确定的一个问题是我在哪里实例化 Lock?这应该发生在类内部还是在初始化多进程并将其作为参数之前?回答:应该发生在课外。
更新 2:
* 我通过将 Lock 的初始化移到类定义之外并使用管理器修复了“锁定对象只能通过继承在进程之间共享”错误。
* 最后一个问题,现在一切正常,除了似乎当我将变量实例放入队列时它没有得到更新,并且每次我从队列中获取它时,它都不包含我在上一次迭代中添加的观察结果。这是唯一让我困惑的事情:(
更新 3:
最终的解决方案是将 var.obs 字典设置为 mgr.dict() 的实例,然后使用自定义序列化程序。很高兴与也在为此苦苦挣扎的人分享代码。
【问题讨论】:
标签: python synchronization thread-safety multiprocessing