【发布时间】:2017-04-01 05:03:53
【问题描述】:
我已经看过几篇关于这个的帖子,所以我知道这很简单,但我似乎做不到。我不确定是否需要创建一个工作池,或者使用 Queue 类。基本上,我希望能够创建多个进程,每个进程都自主运行(这就是它们从 Agent 超类继承的原因)。
在我的主循环的随机滴答声中,我想更新每个代理。我在主循环和代理的运行循环中使用具有不同值的time.sleep 来模拟不同的处理器速度。
这是我的代理超类:
# Generic class to handle mpc of each agent
class Agent(mpc.Process):
# initialize agent parameters
def __init__(self,):
# init mpc
mpc.Process.__init__(self)
self.exit = mpc.Event()
# an agent's main loop...generally should be overridden
def run(self):
while not self.exit.is_set():
pass
print "You exited!"
# safely shutdown an agent
def shutdown(self):
print "Shutdown initiated"
self.exit.set()
# safely communicate values to this agent
def communicate(self,value):
print value
特定代理的子类(模拟 HVAC 系统):
class HVAC(Agent):
def __init__(self, dt=70, dh=50.0):
super(Agent, self).__init__()
self.exit = mpc.Event()
self.__pref_heating = True
self.__pref_cooling = True
self.__desired_temperature = dt
self.__desired_humidity = dh
self.__meas_temperature = 0
self.__meas_humidity = 0.0
self.__hvac_status = "" # heating, cooling, off
self.start()
def run(self): # handle AC or heater on
while not self.exit.is_set():
ctemp = self.measureTemp()
chum = self.measureHumidity()
if (ctemp < self.__desired_temperature):
self.__hvac_status = 'heating'
self.__meas_temperature += 1
elif (ctemp > self.__desired_temperature):
self.__hvac_status = 'cooling'
self.__meas_temperature += 1
else:
self.__hvac_status = 'off'
print self.__hvac_status, self.__meas_temperature
time.sleep(0.5)
print "HVAC EXITED"
def measureTemp(self):
return self.__meas_temperature
def measureHumidity(self):
return self.__meas_humidity
def communicate(self,updates):
self.__meas_temperature = updates['temp']
self.__meas_humidity = updates['humidity']
print "Measured [%d] [%f]" % (self.__meas_temperature,self.__meas_humidity)
还有我的主循环:
if __name__ == "__main__":
print "Initializing subsystems"
agents = {}
agents['HVAC'] = HVAC()
# Run simulation
timestep = 0
while timestep < args.timesteps:
print "Timestep %d" % timestep
if timestep % 10 == 0:
curr_temp = random.randrange(68,72)
curr_humidity = random.uniform(40.0,60.0)
agents['HVAC'].communicate({'temp':curr_temp, 'humidity':curr_humidity})
time.sleep(1)
timestep += 1
agents['HVAC'].shutdown()
print "HVAC process state: %d" % agents['HVAC'].is_alive()
所以问题是,每当我在主循环中运行agents['HVAC'].communicate(x) 时,我都可以看到在其run 循环中传递到HVAC 子类的值(因此它会正确打印接收到的值)。但是,该值永远不会被成功存储。
所以典型的输出如下所示:
Initializing subsystems
Timestep 0
Measured [68] [56.948675]
heating 1
heating 2
Timestep 1
heating 3
heating 4
Timestep 2
heating 5
heating 6
在现实中,一旦出现 Measured [68],内部存储值应更新为输出 68(不是加热 1,加热 2 等)。如此有效,HVAC 的 self.__meas_temperature 没有得到正确更新。
编辑:经过一番研究,我意识到我不一定了解幕后发生的事情。每个子进程都使用自己的虚拟内存块进行操作,并且完全抽象出以这种方式共享的任何数据,因此将值传入是行不通的。我的新问题是我不一定确定如何与多个进程共享全局价值。
我正在查看 Queue 或 JoinableQueue 包,但我不一定确定如何将 Queue 传递给我拥有的超类设置类型(尤其是使用 mpc.Process.__init__(self) 调用)。
另一个问题是我是否可以让多个代理从队列中读取值而不将其拉出队列?例如,如果我想与多个代理共享一个temperature 值,队列是否适用于此?
【问题讨论】:
-
这是一个非常广泛的问题。据我所知,您基本上是在问“在多进程之间共享数据的方式有哪些?” (你读过the docs吗?),然后是“我的应用程序应该使用哪一个?”。你能把它缩小一点,即给出一个特定的期望行为吗?否则,我认为人们可能会留下可能对您没有太大帮助的通用答案。
-
实际上这就是编辑应该涵盖的内容。根据我的确切实现,我需要一种共享数据的方法......我不知道 Queue 或 JoinableQueue 或其他什么是最好的,但如果我使用 mpc,我似乎无法弄清楚如何共享数据。 Process.__init__(self)
-
*多个消费者都读取相同的变量
标签: python subprocess python-multiprocessing