【发布时间】:2023-03-09 11:02:01
【问题描述】:
我正在使用 Python 2.6 和多处理模块进行多线程处理。现在我想要一个同步的字典(我真正需要的唯一原子操作是一个值上的 += 运算符)。
我应该用 multiprocessing.sharedctypes.synchronized() 调用来包装字典吗?还是另辟蹊径?
【问题讨论】:
标签: python multiprocessing dictionary
我正在使用 Python 2.6 和多处理模块进行多线程处理。现在我想要一个同步的字典(我真正需要的唯一原子操作是一个值上的 += 运算符)。
我应该用 multiprocessing.sharedctypes.synchronized() 调用来包装字典吗?还是另辟蹊径?
【问题讨论】:
标签: python multiprocessing dictionary
似乎有很多扶手椅建议,但没有工作示例。这里列出的答案都没有建议使用多处理,这有点令人失望和不安。作为 python 爱好者,我们应该支持我们的内置库,虽然并行处理和同步从来都不是一件小事,但我相信通过适当的设计可以让它变得微不足道。这在现代多核架构中变得极为重要,怎么强调都不为过!话虽如此,我对多处理库还很不满意,因为它仍处于起步阶段,有很多陷阱、错误,并且面向函数式编程(我讨厌)。目前我仍然更喜欢Pyro 模块(远远领先于它的时代)而不是多处理,因为多处理的严重限制是在服务器运行时无法共享新创建的对象。管理器对象的“注册”类方法只会在管理器(或其服务器)启动之前实际注册一个对象。废话不多说,更多代码:
from multiprocessing.managers import SyncManager
class MyManager(SyncManager):
pass
syncdict = {}
def get_dict():
return syncdict
if __name__ == "__main__":
MyManager.register("syncdict", get_dict)
manager = MyManager(("127.0.0.1", 5000), authkey="password")
manager.start()
raw_input("Press any key to kill server".center(50, "-"))
manager.shutdown()
在上面的代码示例中,Server.py 使用了 multiprocessing 的 SyncManager,它可以提供同步的共享对象。此代码无法在解释器中运行,因为多处理库对于如何为每个注册对象查找“可调用”非常敏感。运行 Server.py 将启动一个定制的 SyncManager,该 SyncManager 共享同步字典以供多个进程使用,并且可以连接到同一台机器上的客户端,或者如果在环回以外的 IP 地址上运行,则可以连接到其他机器。在这种情况下,服务器在端口 5000 上的环回 (127.0.0.1) 上运行。使用 authkey 参数在操作 syncdict 时使用安全连接。按下任意键时,管理器将关闭。
from multiprocessing.managers import SyncManager
import sys, time
class MyManager(SyncManager):
pass
MyManager.register("syncdict")
if __name__ == "__main__":
manager = MyManager(("127.0.0.1", 5000), authkey="password")
manager.connect()
syncdict = manager.syncdict()
print "dict = %s" % (dir(syncdict))
key = raw_input("Enter key to update: ")
inc = float(raw_input("Enter increment: "))
sleep = float(raw_input("Enter sleep time (sec): "))
try:
#if the key doesn't exist create it
if not syncdict.has_key(key):
syncdict.update([(key, 0)])
#increment key value every sleep seconds
#then print syncdict
while True:
syncdict.update([(key, syncdict.get(key) + inc)])
time.sleep(sleep)
print "%s" % (syncdict)
except KeyboardInterrupt:
print "Killed client"
客户端还必须创建一个自定义的 SyncManager,注册“syncdict”,这一次不传递可调用来检索共享字典。然后,它使用自定义的 SycnManager 使用端口 5000 上的环回 IP 地址 (127.0.0.1) 和在 Server.py 中启动的与管理器建立安全连接的 authkey 进行连接。它通过调用管理器上注册的可调用对象来检索共享字典同步字典。它会提示用户以下内容:
客户端然后检查密钥是否存在。如果不是,它会在 syncdict 上创建密钥。客户端然后进入一个“无限”循环,在该循环中它通过增量更新键的值,休眠指定的数量,并打印同步字典以重复此过程,直到发生 KeyboardInterrupt (Ctrl+C)。
我希望你和我一样喜欢这个相当彻底但稍微费时的答案。我很难弄清楚为什么我在多处理模块上苦苦挣扎,Pyro 让它变得轻而易举,现在多亏了这个答案,我已经一针见血了。我希望这对 python 社区如何改进多处理模块有用,因为我相信它有很大的希望,但在它的起步阶段还没有实现。尽管描述了令人讨厌的问题,但我认为这仍然是一个非常可行的选择,并且非常简单。您还可以使用 SyncManager.dict() 并将其作为参数传递给 Processes,就像文档显示的那样,根据您的要求,它可能是一个更简单的解决方案,这对我来说是不自然的。
【讨论】:
sync_dict 将获得客户的更新。相反,您似乎必须在服务器端调用 manager.get_dict() 才能获取将被更新的类似字典的对象。全局sync_dict 未更新。我猜那只是为了设置初始值?
我会专门使用一个单独的过程来维护“共享字典”:只需使用例如xmlrpclib 使少量代码可用于其他进程,通过 xmlrpclib 例如公开一个函数采用 key, increment 来执行增量,一个函数只采用 key 并返回值,以及语义细节(是否存在缺失键的默认值等),具体取决于您的应用程序的需要。
然后您可以使用任何您喜欢的方法来实现 shared-dict 专用进程:从在内存中具有简单 dict 的单线程服务器到简单的 sqlite DB 等等。我建议您开始使用“尽可能简单”的代码(取决于您是否需要 persistent 共享字典,或者您不需要持久性),然后根据需要进行测量和优化。
【讨论】:
Manager,但还是没有运气。你能看看我的问题here,看看你能不能提供一个解决方案?如果我每次生成随机数时都执行np.random.seed(None),我仍然可以获得不同的随机数,但这不允许我使用父进程的随机状态,这不是我想要的。非常感谢任何帮助。
响应并发写入问题的适当解决方案。我做了非常快速的研究,发现this article 建议使用锁/信号量解决方案。 (http://effbot.org/zone/thread-synchronization.htm)
虽然该示例不是专门针对字典的,但我很确定您可以编写一个基于类的包装器对象来帮助您使用基于此想法的字典。
如果我需要以线程安全的方式实现类似的东西,我可能会使用 Python Semaphore 解决方案。 (假设我之前的合并技术行不通。)我相信信号量由于其阻塞特性通常会降低线程效率。
来自网站:
信号量是一种更高级的锁定机制。信号量有一个内部计数器而不是一个锁定标志,并且它仅在超过给定数量的线程试图保持信号量时才会阻塞。根据信号量的初始化方式,这允许多个线程同时访问同一代码段。
semaphore = threading.BoundedSemaphore()
semaphore.acquire() # decrements the counter
... access the shared resource; work with dictionary, add item or whatever.
semaphore.release() # increments the counter
【讨论】:
首先需要共享字典是否有原因?您能否让每个线程维护自己的字典实例,并在线程处理结束时合并或定期使用回调将各个线程字典的副本合并在一起?
我不知道你在做什么,所以请记住,我的书面计划可能无法逐字执行。我的建议更多的是一个高级设计理念。
【讨论】: