注意:Barrier是PYTHON3才有的功能,在2中无法测试。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import multiprocessing
from multiprocessing import Barrier, Lock, Process
from time import time
from datetime import datetime

def test_with_barrier(synchronizer, seializer):
    name = multiprocessing.current_process().name
    synchronizer.wait()
    now = time()
    with serializer:
        print("process %s -----> %s" % (name, datetime.fromtimestamp(now)))

def test_without_barrier():
    name = multiprocessing.current_process().name
    now = time()
    print("process %s -----> %s" % (name, datetime.fromtimestamp(now)))

def worker(dictionary, key, item):
    dictionary[key] = item
    print(key, item)

if __name__ == '__main__':
    synchronizer = Barrier(2)
    serializer = Lock()
    Process(name='p1 - test_with_barrier', target=test_with_barrier, args=(synchronizer, serializer)).start()
    Process(name='p2 - test_with_barrier', target=test_with_barrier, args=(synchronizer, serializer)).start()
    Process(name='p3 - test_without_barrier', target=test_without_barrier).start()
    Process(name='p4 - test_without_barrier', target=test_without_barrier).start()

    mgr = multiprocessing.Manager()
    dictionary = mgr.dict()
    jobs = [multiprocessing.Process\
            (target=worker, args=(dictionary, i, i*2))
            for i in range(10)
            ]
    for j in jobs:
        j.start()
    for j in jobs:
        j.join()

在python多进程中使用manager和Barrier

相关文章:

  • 2022-12-23
  • 2022-12-23
  • 2021-05-16
  • 2021-11-29
  • 2021-04-14
  • 2021-08-31
  • 2021-11-01
  • 2021-09-28
猜你喜欢
  • 2022-02-27
  • 2021-06-17
  • 2022-12-23
  • 2021-12-12
  • 2021-09-28
  • 2021-10-07
  • 2022-12-23
相关资源
相似解决方案