【问题标题】:how to use python multiprocessing properly?如何正确使用 python 多处理?
【发布时间】:2021-07-31 14:33:07
【问题描述】:

我正在运行以下代码。我期待打印在进程之间是随机的。但是,我看到了一个确定性的结果:在每次运行时,首先第一个进程完成它的循环,然后,第二个进程才开始运行循环。 我期待随机行为,这意味着两个进程之间的上下文切换。但我看到的只是一个进程完成后,第二个进程开始,没有任何上下文切换。

有人可以描述我缺少什么吗?

import multiprocessing
import time
import os

lock = multiprocessing.Lock()


def func(_lock):
    for _ in range(0, 3):
        with _lock:
            print("sleeping in pid " + str(os.getpid()))
            time.sleep(1)
            print("finished sleeping in pid " + str(os.getpid()))


process1 = multiprocessing.Process(target=func, args=(lock,))
process2 = multiprocessing.Process(target=func, args=(lock,))
process1.start()
process2.start()

================================================ ===============

输出是:

在 pid 2322 中睡觉

在 pid 2322 中完成睡眠

在 pid 2322 中睡觉

在 pid 2322 中完成睡眠

在 pid 2322 中睡觉

在 pid 2322 中完成睡眠

在 pid 2323 中睡觉

在 pid 2323 中完成睡眠

在 pid 2323 中睡觉

在 pid 2323 中完成睡眠

在 pid 2323 中睡觉

在 pid 2323 中完成睡眠

进程以退出代码 0 结束

【问题讨论】:

  • 你那里有一把锁。它让一切都变得连续。
  • 我知道这是连续的。我不明白的是为什么所有第一把锁每次都是由同一个进程完成的
  • 第一个进程是最先启动的。它很有可能能够在其他人之前获得锁。同样,其余进程也按照您启动它们的顺序排列在队列中。

标签: python multiprocessing python-multiprocessing locks


【解决方案1】:

您的进程获得一个锁,“做它的事”,而另一个进程被阻塞,然后释放锁并立即循环回来并尝试重新获得它刚刚释放的同一个锁。由于进程已经在运行并且仍然可以调度,所以它是成功的,即仅仅因为它释放了锁并不意味着它会自动停止运行,因此它在竞争中击败了另一个进程来获取锁。将代码更改为以下内容,您将得到我认为您期望看到的内容:

def func(_lock):
    for _ in range(0, 3):
        with _lock:
            print("sleeping in pid " + str(os.getpid()))
        time.sleep(1) # this gives the other process a chance to acquire the lock
        with _lock:
            print("finished sleeping in pid " + str(os.getpid()))

锁只应保持尽可能短的时间。尝试提出允许这样做的逻辑。

【讨论】:

    【解决方案2】:

    这是一个使用 ThreadPoolExecutor 的示例。如果您需要 Process,则只需更改为 ProcessPoolExecutor。要了解使用什么(线程/进程),您需要了解 CPU Bound and I/O Bound

    首先,您创建字典 class_holder 并在其中保存 YourClass 对象。此外,您将对象名称放入队列queue.put(i)。对于每个执行程序,只要您获得队列名称queue.get(),您就可以使用调用方法my_print 以随机秒数运行线程executor.submit(...)

    希望这种实现方式对您有所帮助。至于我,我找到了扩展项目的方法。

    import os
    import threading
    import time
    from concurrent.futures import ThreadPoolExecutor
    from multiprocessing import Queue
    import random
    
    
    class YourClass:
        def __init__(self):
            self.string = None
            self.sleep_time = None
    
        def my_print(self, string, sleep_t):
            self.string = string
            self.sleep_time = sleep_t
            time.sleep(self.sleep_time)
            print(self.string + str(threading.current_thread().ident) + " process id: " + str(os.getpid()))
    
    
    lock = threading.Lock()
    queue = Queue()
    
    class_holder = dict()
    for i in ['a', 'b', 'c', 'd', 'e', 'f']:
        class_holder[i] = YourClass()
        queue.put(i)
    
    thread_limit = 3
    with ThreadPoolExecutor(max_workers=thread_limit) as executor:
        while True:
            _i = queue.get()
            if _i in class_holder:
                executor.submit(class_holder[_i].my_print,
                                string=f"sleeping {_i} in thread id: ",
                                sleep_t=random.randint(1,4))
    

    如果您需要lock = threading.Lock(),那么您可以在YourClass 方法中使用它来隔离编辑文件等内容。

    class YourClass:
    
        ...
    
        def my_extra_method(self):
            with lock:
                os.system(fr"sed -i 's|ARG_IN_FIE|NEW_ARG|g' some_file")
    

    我推荐你使用队列,它有助于组织工作,并且已经有一个锁。检查队列的这个例子:

    class Queue(object):
        def __init__(self, size=5):
            self._size = size
            self._queue = []
            self._mutex = threading.RLock()
            self._empty = threading.Condition(self._mutex)
            self._full = threading.Condition(self._mutex)
    
        def put(self, val):
            with self._full:
                while len(self._queue) >= self._size:
                    self._full.wait()
                self._queue.append(val)
                self._empty.notify()
    
        def get(self):
            with self._empty:
                while len(self._queue) == 0:
                    self._empty.wait()
                ret = self._queue.pop(0)
                self._full.notify()
                return ret
    
    from queue import Queue
    from threading import Thread
    
    def worker(q, n):
        while True:
            item = q.get()
            if item is None:
                break
            print("process data:", n, item)
    
    q = Queue(5)
    th1 = Thread(target=worker, args=(q, 1))
    th2 = Thread(target=worker, args=(q, 2))
    th1.start(); th2.start()
    for i in range(50):
        q.put(i)
    q.put(None); q.put(None)
    th1.join(); th2.join()
    

    【讨论】:

    • 感谢您的回答。但是,我的目标是在多个内核上同时并行化作业,并绕过 GIL。我相信线程不会在这里解决问题
    • 如果您向 Internet 发出请求,那么您可能需要检查 asynchronous programming on python,协程及其 yield from 隐藏在 return await 中。如果您需要更深入地挖掘,请检查模块选择select.epoll() - 基于它的异步,这就是您可以并行化请求的原因。如果我能帮你做点什么,请“竖起大拇指”:)
    猜你喜欢
    • 2021-10-23
    • 2013-02-25
    • 2015-12-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-05-22
    • 1970-01-01
    相关资源
    最近更新 更多