【问题标题】:Multiprocessing - Shared Array多处理 - 共享数组
【发布时间】:2016-08-24 11:37:10
【问题描述】:

所以我试图在 python 中实现多处理,我希望有一个由 4-5 个进程组成的池并行运行一个方法。这样做的目的是运行一千次 Monte 模拟(每个进程 250-200 次模拟),而不是运行 1000 次。我希望每个进程在完成处理后立即通过获取锁定来写入公共共享数组一个模拟的结果,写入结果并释放锁。所以这应该是一个三步过程:

  1. 获取锁
  2. 写入结果
  3. 为等待写入数组的其他进程释放锁。

每次我将数组传递给进程时,每个进程都会创建一个我不想要的该数组的副本,因为我想要一个公共数组。任何人都可以通过提供示例代码来帮助我吗?

【问题讨论】:

  • 如果你搜索一些答案,你有大量的示例代码。因此,如果您不提供您的一些,我们将无法为您提供帮助。顺便说一句,使用semaphores 锁定线程
  • 官方文档中examples有什么不清楚的地方?
  • 您是否需要在模拟进行时访问阵列?如果没有,您可以使用 Pool.map 函数集。

标签: python multiprocessing shared


【解决方案1】:

由于您只是将状态从子进程返回到父进程,因此使用共享数组和显式锁是多余的。您可以使用Pool.mapPool.starmap 来完成您所需要的。例如:

from multiprocessing import Pool

class Adder:
    """I'm using this class in place of a monte carlo simulator"""

    def add(self, a, b):
        return a + b

def setup(x, y, z):
    """Sets up the worker processes of the pool. 
    Here, x, y, and z would be your global settings. They are only included
    as an example of how to pass args to setup. In this program they would
    be "some arg", "another" and 2
    """
    global adder
    adder = Adder()

def job(a, b):
    """wrapper function to start the job in the child process"""
    return adder.add(a, b)

if __name__ == "__main__":   
    args = list(zip(range(10), range(10, 20)))
    # args == [(0, 10), (1, 11), ..., (8, 18), (9, 19)]

    with Pool(initializer=setup, initargs=["some arg", "another", 2]) as pool:
        # runs jobs in parallel and returns when all are complete
        results = pool.starmap(job, args)

    print(results) # prints [10, 12, ..., 26, 28] 

【讨论】:

    【解决方案2】:

    未经测试,但类似的东西应该可以工作。 数组和锁在进程之间共享。

    from multiprocessing import Process, Array, Lock
    
    def f(array, lock, n): #n is the dedicated location in the array
        lock.acquire()
        array[n]=-array[n]
        lock.release()
    
    if __name__ == '__main__':
        size=100
        arr=Array('i', [3,-7])
        lock=Lock()
        p = Process(target=f, args=(arr,lock,0))
        q = Process(target=f, args=(arr,lock,1))
        p.start()
        q.start()
        q.join()
        p.join()
    
        print(arr[:])
    

    https://docs.python.org/3.5/library/multiprocessing.html 此处的文档有大量示例可供参考

    【讨论】:

    • 那是不是一个共享数组。在子进程中更改它不会对父进程产生任何影响。
    • 另外,Array 默认情况下会创建一个附加锁,您可以使用它的get_lock() 方法获得它,除非您想使用不同的锁类型,否则无需显式分配它。然后身体可以变成with array.get_lock(): ...
    • @mata 我需要澄清一下。你的意思是我可以把函数 f 改成 "def f(array): with array.get_lock(): #modify array here#" 就不需要创建锁对象并显式传递了吗?
    • @RodrikTheReader 是的,这就是 mata 的解释:docs.python.org/2/library/…
    • @Julien 谢谢。我实际上正在努力解决 OP 在问题中发布的类似共享内存问题。我需要 4 个可以由多个进程共享的单独数组。我在一个类中定义了共享数组,并将该类的对象传递给一个在共享数组上写入的函数。我正在生成多个进程,目标是这个函数。无论如何,该程序没有按我的预期工作。你认为我应该在主程序中创建共享数组并将它们传递给函数,而不是创建对象?
    猜你喜欢
    • 2023-03-10
    • 1970-01-01
    • 1970-01-01
    • 2021-03-19
    • 2019-10-09
    • 2020-11-13
    • 1970-01-01
    • 2019-12-11
    • 2018-10-20
    相关资源
    最近更新 更多