【问题标题】:Issue with sharing data between Python processes with multiprocessing使用多处理在 Python 进程之间共享数据的问题
【发布时间】:2017-04-01 05:03:53
【问题描述】:

我已经看过几篇关于这个的帖子,所以我知道这很简单,但我似乎做不到。我不确定是否需要创建一个工作池,或者使用 Queue 类。基本上,我希望能够创建多个进程,每个进程都自主运行(这就是它们从 Agent 超类继承的原因)。

在我的主循环的随机滴答声中,我想更新每个代理。我在主循环和代理的运行循环中使用具有不同值的time.sleep 来模拟不同的处理器速度。

这是我的代理超类:

# Generic class to handle mpc of each agent
class Agent(mpc.Process):
  # initialize agent parameters
  def __init__(self,):
    # init mpc
    mpc.Process.__init__(self)
    self.exit = mpc.Event()

  # an agent's main loop...generally should be overridden
  def run(self):
    while not self.exit.is_set():
      pass
    print "You exited!"

  # safely shutdown an agent
  def shutdown(self):
    print "Shutdown initiated"
    self.exit.set()

  # safely communicate values to this agent
  def communicate(self,value):
    print value

特定代理的子类(模拟 HVAC 系统):

class HVAC(Agent):
  def __init__(self, dt=70, dh=50.0):
    super(Agent, self).__init__()
    self.exit = mpc.Event()

    self.__pref_heating     = True
    self.__pref_cooling     = True
    self.__desired_temperature = dt
    self.__desired_humidity    = dh

    self.__meas_temperature = 0
    self.__meas_humidity    = 0.0
    self.__hvac_status      = "" # heating, cooling, off

    self.start()

  def run(self): # handle AC or heater on 
    while not self.exit.is_set():
      ctemp = self.measureTemp()
      chum  = self.measureHumidity()

      if (ctemp < self.__desired_temperature):
        self.__hvac_status = 'heating'
        self.__meas_temperature += 1

      elif (ctemp > self.__desired_temperature):
        self.__hvac_status = 'cooling'
        self.__meas_temperature += 1

      else:
        self.__hvac_status = 'off'
      print self.__hvac_status, self.__meas_temperature


      time.sleep(0.5)


    print "HVAC EXITED"

  def measureTemp(self):
    return self.__meas_temperature
  def measureHumidity(self):
    return self.__meas_humidity

  def communicate(self,updates):
    self.__meas_temperature = updates['temp']
    self.__meas_humidity    = updates['humidity']
    print "Measured [%d] [%f]" % (self.__meas_temperature,self.__meas_humidity)

还有我的主循环:

if __name__ == "__main__":
  print "Initializing subsystems"
  agents = {}
  agents['HVAC'] = HVAC()

  # Run simulation
  timestep = 0
  while timestep < args.timesteps:
    print "Timestep %d" % timestep

    if timestep % 10 == 0:
      curr_temp = random.randrange(68,72)
      curr_humidity = random.uniform(40.0,60.0)
      agents['HVAC'].communicate({'temp':curr_temp, 'humidity':curr_humidity})

    time.sleep(1)
    timestep += 1

  agents['HVAC'].shutdown()
  print "HVAC process state: %d" % agents['HVAC'].is_alive()

所以问题是,每当我在主循环中运行agents['HVAC'].communicate(x) 时,我都可以看到在其run 循环中传递到HVAC 子类的值(因此它会正确打印接收到的值)。但是,该值永远不会被成功存储。

所以典型的输出如下所示:

Initializing subsystems
Timestep 0
Measured [68] [56.948675]
heating 1
heating 2
Timestep 1
heating 3
heating 4
Timestep 2
heating 5
heating 6

在现实中,一旦出现 Measured [68],内部存储值应更新为输出 68(不是加热 1,加热 2 等)。如此有效,HVAC 的 self.__meas_temperature 没有得到正确更新。


编辑:经过一番研究,我意识到我不一定了解幕后发生的事情。每个子进程都使用自己的虚拟内存块进行操作,并且完全抽象出以这种方式共享的任何数据,因此将值传入是行不通的。我的新问题是我不一定确定如何与多个进程共享全局价值。

我正在查看 Queue 或 JoinableQueue 包,但我不一定确定如何将 Queue 传递给我拥有的超类设置类型(尤其是使用 mpc.Process.__init__(self) 调用)。

另一个问题是我是否可以让多个代理从队列中读取值而不将其拉出队列?例如,如果我想与多个代理共享一个temperature 值,队列是否适用于此?

Pipe v Queue

【问题讨论】:

  • 这是一个非常广泛的问题。据我所知,您基本上是在问“在多进程之间共享数据的方式有哪些?” (你读过the docs吗?),然后是“我的应用程序应该使用哪一个?”。你能把它缩小一点,即给出一个特定的期望行为吗?否则,我认为人们可能会留下可能对您没有太大帮助的通用答案。
  • 实际上这就是编辑应该涵盖的内容。根据我的确切实现,我需要一种共享数据的方法......我不知道 Queue 或 JoinableQueue 或其他什么是最好的,但如果我使用 mpc,我似乎无法弄清楚如何共享数据。 Process.__init__(self)
  • *多个消费者都读取相同的变量

标签: python subprocess python-multiprocessing


【解决方案1】:

这是一个建议的解决方案,假设您需要以下内容:

  • 控制工作人员生命周期的集中管理器/主进程
  • 工作进程做一些自包含的事情,然后将结果报告给经理和其他进程

在我展示它之前,我想说的是,一般来说,除非你受 CPU 限制,否则multiprocessing 并不是真正合适的,主要是因为增加了复杂性,你可能会更好地使用不同的高级异步框架。另外,你应该使用python 3,它好多了!

也就是说,multiprocessing.Manager,使用multiprocessing 可以很容易地做到这一点。我已经在 python 3 中完成了这项工作,但我认为任何东西都不应该在 python 2 中“正常工作”,但我还没有检查过。

from ctypes import c_bool
from multiprocessing import Manager, Process, Array, Value
from pprint import pprint
from time import sleep, time


class Agent(Process):

    def __init__(self, name, shared_dictionary, delay=0.5):
        """My take on your Agent.

        Key difference is that I've commonized the run-loop and used
        a shared value to signal when to stop, to demonstrate it.
        """
        super(Agent, self).__init__()
        self.name = name

        # This is going to be how we communicate between processes.
        self.shared_dictionary = shared_dictionary

        # Create a silo for us to use.
        shared_dictionary[name] = []
        self.should_stop = Value(c_bool, False)

        # Primarily for testing purposes, and for simulating 
        # slower agents.
        self.delay = delay

    def get_next_results(self):
        # In the real world I'd use abc.ABCMeta as the metaclass to do 
        # this properly.
        raise RuntimeError('Subclasses must implement this')

    def run(self):
        ii = 0
        while not self.should_stop.value:
            ii += 1
            # debugging / monitoring
            print('%s %s run loop execution %d' % (
                type(self).__name__, self.name, ii))

            next_results = self.get_next_results()

            # Add the results, along with a timestamp.
            self.shared_dictionary[self.name] += [(time(), next_results)]
            sleep(self.delay)

    def stop(self):
        self.should_stop.value = True
        print('%s %s stopped' % (type(self).__name__, self.name))


class HVACAgent(Agent):
    def get_next_results(self):
        # This is where you do your work, but for the sake of
        # the example just return a constant dictionary.
        return {'temperature': 5, 'pressure': 7, 'humidity': 9}


class DumbReadingAgent(Agent):
    """A dumb agent to demonstrate workers reading other worker values."""

    def get_next_results(self):
        # get hvac 1 results:
        hvac1_results = self.shared_dictionary.get('hvac 1')
        if hvac1_results is None:
            return None

        return hvac1_results[-1][1]['temperature']

# Script starts.
results = {}

# The "with" ensures we terminate the manager at the end.
with Manager() as manager:

    # the manager is a subprocess in its own right. We can ask
    # it to manage a dictionary (or other python types) for us
    # to be shared among the other children.
    shared_info = manager.dict()

    hvac_agent1 = HVACAgent('hvac 1', shared_info)
    hvac_agent2 = HVACAgent('hvac 2', shared_info, delay=0.1)
    dumb_agent = DumbReadingAgent('dumb hvac1 reader', shared_info)

    agents = (hvac_agent1, hvac_agent2, dumb_agent)

    list(map(lambda a: a.start(), agents))

    sleep(1)

    list(map(lambda a: a.stop(), agents))
    list(map(lambda a: a.join(), agents))

    # Not quite sure what happens to the shared dictionary after
    # the manager dies, so for safety make a local copy.
    results = dict(shared_info)

pprint(results)

【讨论】:

  • 好帖子!在 2.7 中运行良好(由于 2 与 3 中的所有代码,我一直使用 2)。我可能会单独打破运行循环,因为这些代理中的每一个都应该完全自主地运行,因此它们将具有非常不同的运行循环。但我确实喜欢共享字典的想法……它让我想起了一个漂亮的、干净的单例实例,它能够修改它。不过问题...如果hvac_agent1 决定同时修改shared_infohvac_agent2,是否存在竞争条件问题?
  • 文档不是很清楚,虽然他们暗示这很好,但我也测试了 3 个代理不断更新他们在字典中的条目,没有超过 5 秒的停顿(所以每个代理都是每 3-4 毫秒添加一个条目),并且没有数据丢失,所以我认为只要他们只写字典中的条目,就没有竞争条件。显然,如果代理 1 在代理 2 写入的同时写入shared_info['key'],那么只有他们中的一个会获胜!
  • 太棒了,这正是我当时正在寻找的。谢谢!
猜你喜欢
  • 2018-06-18
  • 2018-06-25
  • 1970-01-01
  • 2014-04-24
  • 2021-05-07
  • 1970-01-01
  • 2016-06-13
  • 2013-07-21
  • 2020-06-25
相关资源
最近更新 更多