【问题标题】:Python: sharing variable between two threads spawn from multiprocess.ProcessPython:从 multiprocess.Process 产生的两个线程之间共享变量
【发布时间】:2012-07-03 17:02:12
【问题描述】:

Python 3.1.2

multiprocessing.Process 产生的两个线程之间的变量共享存在问题。这是一个简单的布尔变量,它应该确定线程是否应该运行或应该停止执行。下面是三种情况下显示的简化代码(但使用与我的原始代码相同的机制):

  1. threading.Thread 类型和 self.is_running bool 类型的主类beeing [工作正常]。
  2. multiprocess.Process 类型和 self.is_running bool 类型的主类蜂 [不工作。子线程拥有 self.is_running 的本地副本,而不是共享它]。
  3. multiprocess.Process 类型和 self.is_running 的主类蜜蜂属于 multiprocessing.Value("b", True) [工作正常]。

我想了解为什么它以这种方式而不是另一种方式工作。 (即为什么第 2 点没有像我假设的那样工作)。

测试是通过 python 的解释器完成的:

from testclass import *

d = TestClass()
d.start()
d.stop()

以下是第 1 点的示例:

import threading
import time
import queue
import multiprocessing

class TestClass(threading.Thread):
def __init__(self):
    threading.Thread.__init__(self)
    self.q = queue.Queue(10)
    self.is_running = True
    self.sema = threading.Semaphore()

def isRunning(self):
    self.sema.acquire()
    print ("Am I running?", self.is_running)
    z = self.is_running
    self.sema.release()
    return z

def stop(self):
    self.sema.acquire()
    self.is_running = False
    print("STOPPING")
    self.sema.release()

def reader(self):
    while self.isRunning():
        print("R] Reading!")
        try:
            data = self.q.get(timeout=1)
        except:
            print("R] NO DATA!")
        else:
            print("R] Read: ", data)
def writer(self):
    while self.isRunning():
        print("W] Writing!")
        self.q.put(time.time())
        time.sleep(2)

def run(self):
    tr = threading.Thread(target=self.reader)
    tw = threading.Thread(target=self.writer)
    tr.start()
    tw.start()
    tr.join()
    tw.join()

第 2 点的示例:

import threading
import time
import queue
import multiprocessing


class Test(multiprocessing.Process):
def __init__(self):
    multiprocessing.Process.__init__(self)
    self.q = queue.Queue(10)
    self.is_running = True
    self.sema = threading.Semaphore()

def isRunning(self):
    self.sema.acquire()
    print ("Am I running?", self.is_running)
    z = self.is_running
    self.sema.release()
    return z

def stop(self):
    self.sema.acquire()
    self.is_running = False
    print("STOPPING")
    self.sema.release()

def reader(self):
    while self.isRunning():
        print("R] Reading!")
        try:
            data = self.q.get(timeout=1)
        except:
            print("R] NO DATA!")
        else:
            print("R] Read: ", data)
def writer(self):
    while self.isRunning():
        print("W] Writing!")
        self.q.put(time.time())
        time.sleep(2)

def run(self):
    tr = threading.Thread(target=self.reader)
    tw = threading.Thread(target=self.writer)
    tr.start()
    tw.start()
    tr.join()
    tw.join()

第 3 点的示例:

import threading
import time
import queue
import multiprocessing

class TestClass(multiprocessing.Process):
def __init__(self):
    multiprocessing.Process.__init__(self)
    self.q = queue.Queue(10)
    self.is_running = multiprocessing.Value("b", True)
    self.sema = threading.Semaphore()

def isRunning(self):
    self.sema.acquire()
    print ("Am I running?", self.is_running)
    z = self.is_running.value
    self.sema.release()
    return z

def stop(self):
    self.sema.acquire()
    self.is_running.value = False
    print("STOPPING")
    self.sema.release()

def reader(self):
    while self.isRunning():
        print("R] Reading!")
        try:
            data = self.q.get(timeout=1)
        except:
            print("R] NO DATA!")
        else:
            print("R] Read: ", data)
def writer(self):
    while self.isRunning():
        print("W] Writing!")
        self.q.put(time.time())
        time.sleep(2)

def run(self):
    tr = threading.Thread(target=self.reader)
    tw = threading.Thread(target=self.writer)
    tr.start()
    tw.start()
    tr.join()
    tw.join()

【问题讨论】:

    标签: python multithreading variables multiprocessing sharing


    【解决方案1】:

    在第 2 点中,父进程和子进程都有自己的is_running 副本。在父进程中调用stop()时,它只在父进程中修改is_running,在子进程中不修改。 multiprocessing.Value 起作用的原因是它的内存在两个进程之间共享。

    如果您想要一个进程感知队列,请使用multiprocessing.Queue

    【讨论】:

    • 您能解释一下为什么父进程和子进程都有自己的is_running 副本而不共享一个吗?如果父进程是 threading 类型,为什么不是这样?如果TestClass 不是multiprocessing.Processthreading.Thread 类型,那么它将与其子进程共享is_running,对吧?如果是这样 - 为什么?
    • 正如 Marco 所说,进程不共享内存,但线程共享。 is_running在父进程中与子进程占用不同的内存地址,但在使用线程时占用相同的地址。尽管ProcessThread 看起来很相似,但它们并不是完全可以互换的。如果TestClass 不是Process 的实例,它的成员仍然不会在进程之间神奇地共享。您必须使用来自multiprocessing 的消息或共享原语之一,例如multiprocessing.Value 来执行此操作。
    【解决方案2】:

    线程都是同一个进程的一部分,所以它们共享内存。另一个后果是线程不能被不同的 cpu 精确地同时执行,因为一个进程只能被一个 cpu 拾取。

    进程有独立的内存空间。一个 cpu 可以运行一个进程,而另一个 cpu 可以同时运行另一个进程。需要特殊的结构来让流程合作。

    【讨论】:

    • 我知道。但是这里(第 2 点)线程是在进程内存空间中创建的。至少应该是这样的,那个 Process (TestClass) 是在那个 Process 中创建的线程的包装器。我没有创建新流程。但它只是看起来从 multiprocessing.Process 产生的线程没有共享它们的内存空间。为什么?看起来它们(线程)是否与 TestClass 并行创建而不是在它之下(如果你知道我的意思)
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2023-04-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-07-22
    • 2016-02-09
    相关资源
    最近更新 更多