【问题标题】:python multiprocessing shared variable safepython多处理共享变量安全
【发布时间】:2017-07-31 14:28:41
【问题描述】:

我在使用多处理模块时遇到了问题。我已经使用了锁,但是下面的代码仍然不安全,我不知道为什么,计数器有时不会等于 100,我该如何修复代码让它安全?

import random
import threading
from multiprocessing import Pool, Manager

import time

lock = threading.Lock()


def a_complex_operation(counter):
    with lock:
        time.sleep(random.random())
        counter.value += 1


def main():
    pool = Pool(16)
    counter = Manager().Value('i', 0)

    for i in range(100):
        pool.apply_async(a_complex_operation, args=(counter,))

    pool.close()
    pool.join()
    if counter.value != 100:
        print "not equal 100, current value is: "+str(counter.value)


if __name__ == '__main__':
    count = 0
    while True:
        t1 = time.time()
        main()
        count += 1
        print "the " + str(count) + " loop, cost time: " + str(time.time() - t1)

输出将是:

the 1 loop, cost time: 4.1369998455
the 2 loop, cost time: 3.74100017548
the 3 loop, cost time: 3.92299985886
the 4 loop, cost time: 4.05500006676
not equal 100, current value is: 99
the 5 loop, cost time: 4.01900005341
the 6 loop, cost time: 4.14299988747

然后我将 Manager().list() 和 Manager().Value('i', 0) 一起测试

import random
from multiprocessing import Pool, Manager
import time


def a_complex_operation(list_, counter):
    for x in range(10):
        time.sleep(random.random()/10)
        list_.append(x)
        counter.value += 1


def main():
    pool = Pool(16)
    counter0 = 0
    list_ = Manager().list()
    counter = Manager().Value('i', 0)

    for i in range(100):
        pool.apply_async(a_complex_operation, args=(list_, counter))
        counter0 += 1

    pool.close()
    pool.join()
    if len(list_) != 1000:
        print "length of list is not equal 1000, current is:" + str(len(list_))
    if counter.value != 1000:
        print "value of counter is not equal 1000, current is :" + str(counter.value)


if __name__ == '__main__':
    counter = 0
    while True:
        counter += 1
        t1 = time.time()
        main()
        t2 = time.time()
        print "the " + str(counter) + " loop cost time: " + str(t2 - t1)

输出将是:

value of counter is not equal 1000, current is :916
the 1 loop cost time: 3.92299985886
value of counter is not equal 1000, current is :911
the 2 loop cost time: 3.98500013351
value of counter is not equal 1000, current is :925
the 3 loop cost time: 4.007999897
value of counter is not equal 1000, current is :913
the 4 loop cost time: 3.99399995804
value of counter is not equal 1000, current is :920
the 5 loop cost time: 4.09500002861
value of counter is not equal 1000, current is :915

我发现 Manager().list() 是安全的,Manager().Value('i', 0) 是不安全的,有趣的是,谁能告诉我为什么 Manager().list() 看起来很安全?

【问题讨论】:

    标签: python multithreading thread-safety


    【解决方案1】:

    您的子进程不继承锁定对象。或者他们这样做,但它们是独立的副本,没有链接在一起,不能用于任何事情。所以有一个竞争条件,它最终会失败。

    您可以通过 Manager().Lock() 解决此问题,因为您已经在使用 Manager。

    def a_complex_operation(counter, alock):
        with alock:
            time.sleep(random.random())
            counter.value += 1
    
    def main():
        pool = Pool(16)
        ma = Manager()
        counter = ma.Value('i', 0)
        lock = ma.Lock()
        for i in range(100):
            pool.apply_async(a_complex_operation, args=(counter, lock))
    

    这可行(不过,您的子进程现在会慢得多。预计每次运行大约 50 秒,100 次平均为 0.5 秒)。

    但现在你的 counter.value 总是 100。

    【讨论】:

    • 您可以通过将time.sleep(random.random()) 放在“with”语句之外来加快速度。
    • 当然。我假设 OP 只是使用 sleep 来模拟需要一些时间的操作。我的评论只是为了提醒这一点。在原始消息中,代码每次迭代需要几秒钟,因为锁什么都不做,并且总是可以被获取。在我的解决方案中,锁有效,但运行时间增加了很多,这是应该的。 (最初当我测试它时,前几秒钟没有发生任何事情,我认为这是一个僵局......结果发现我没有足够的耐心)
    • 不是死锁 :)
    • 不,不是。它按预期工作。当我首先尝试您的代码并在几秒钟内完成循环时,我只是不耐烦,然后当我修复锁定时,它花了很长时间。但这是预期的行为。无论如何,您的锁定问题似乎可以通过 Manager.Lock() 解决。
    【解决方案2】:

    我认为下面的代码应该安全快速,感谢@Hannu 和@gzc

    import random
    from multiprocessing import Pool, Manager
    import time
    
    
    def a_complex_operation(list_, counter, lock):
        for x in range(10):
            time.sleep(random.random() / 10)
            with lock:
                list_.append(x)
                counter.value += 1
    
    
    def main():
        pool = Pool(16)
        list_ = Manager().list()
        counter = Manager().Value('i', 0)
        lock = Manager().Lock()
    
        for i in range(100):
            pool.apply_async(a_complex_operation, args=(list_, counter, lock))
    
        pool.close()
        pool.join()
        if len(list_) != 1000:
            print ">>> length of list is not equal 1000, current is:" + str(len(list_))
        elif len(list_) == 1000:
            print ">>> length of list is equal 1000"
        if counter.value != 1000:
            print "value of counter is not equal 1000, current is :" + str(counter.value)
        elif counter.value == 1000:
            print "value of counter is equal 1000"
    
    
    if __name__ == '__main__':
        counter = 0
        while True:
            counter += 1
            t1 = time.time()
            main()
            t2 = time.time()
            print "the " + str(counter) + " loop cost time: " + str(t2 - t1)
            print "--------------------------------"
    

    输出将是:

    >>> length of list is equal 1000
    value of counter is equal 1000
    the 1 loop cost time: 3.78799986839
    --------------------------------
    >>> length of list is equal 1000
    value of counter is equal 1000
    the 2 loop cost time: 3.79299998283
    --------------------------------
    >>> length of list is equal 1000
    value of counter is equal 1000
    the 3 loop cost time: 3.78299999237
    --------------------------------
    >>> length of list is equal 1000
    value of counter is equal 1000
    the 4 loop cost time: 3.77500009537
    --------------------------------
    

    【讨论】:

      猜你喜欢
      • 2020-06-12
      • 1970-01-01
      • 2013-06-26
      • 2019-12-11
      • 2015-06-08
      • 2016-11-14
      • 1970-01-01
      • 2017-06-18
      • 1970-01-01
      相关资源
      最近更新 更多