【问题标题】:Pass method return values between multiprocessing.Process instances在 multiprocessing.Process 实例之间传递方法返回值
【发布时间】:2014-05-11 15:12:11
【问题描述】:

如何从 multiprocessing.Process 的另一个实例中的方法获取返回值?

我有两个文件:

文件 hwmgr.py:

import multiprocessing as mp
from setproctitle import setproctitle
import smbus
import myLoggingModule as log

class HWManager(mp.Process):
    def __init__(self):
        mp.Process.__init__(self, cmd_q, res_q)
        self.i2c_lock = mp.Lock()
        self.commandQueue = cmd_q
        self.responseQueue = res_q
    def run(self):
        setproctitle('hwmgr')
        while True:
            cmd, args = self.commandQueue.get()
            if cmd is None: self.terminate()
            method = self.__getattribute__(cmd)
            result = method(**args)
            if result is not None:
                self.responseQueue.put(result)

    def get_voltage(self):
        with self.i2c_lock:
            # ...do i2c stuff to get a voltage with smbus module
        return voltage

文件 main.py:

import multiprocessing as mp
import hwmgr

cmd_q = mp.Queue()
res_q = mp.Queue()

hwm = hwmgr.HWManager(cmd_q, res_q)
hwm.start()

cmd_q.put(('get_voltage', {}))
battery = res_q.get()

print battery

虽然此解决方案有效,但 HWManager 进程的复杂性在未来可能会增加,并且使用相同机制的 main.py(代码已简化)衍生出其他进程。错误的进程显然有可能从它的res_q.get() 命令中得到错误的返回数据。

什么是更稳健的方法?
(我试图避免为每个其他进程返回一个 mp.Queue - 因为这需要每次都重新设计 HWManager 类以容纳额外的队列)

OK - WIP 代码如下:

hwmgr.py:

import multiprocessing as mp
from multiprocessing.connection import Listener
from setproctitle import setproctitle
import smbus

class HWManager(mp.Process):
    def __init__(self):
        mp.Process.__init__(self)
        self.i2c_lock = mp.Lock()

    def run(self):
        setproctitle('hwmgr')
        self.listener = Listener('/tmp/hw_sock', 'AF_UNIX')

        with self.i2c_lock:
            pass  # Set up I2C bus to take ADC readings

        while True:
            conn = self.listener.accept()
            cmd, args = conn.recv()

            if cmd is None: self.terminate()
            method = self.__getattribute__(cmd)
            result = method(**args)
            conn.send(result)

    def get_voltage(self):
        with self.i2c_lock:
            voltage = 12.0  # Actually, do i2c stuff to get a voltage with smbus module

        return voltage

文件client.py

import multiprocessing as mp
from multiprocessing.connection import Client
from setproctitle import setproctitle
from time import sleep

class HWClient(mp.Process):

def __init__(self):
    mp.Process.__init__(self)
    self.client = Client('/tmp/hw_sock', 'AF_UNIX')

def run(self):
    setproctitle('client')
    while True:
        self.client.send(('get_voltage', {}))
        battery = self.client.recv()
        print battery
        sleep(5)

main.py:

import hwmgr
import client

cl = client.HWClient()  # Put these lines here = one error (conn refused)
cl.start()
hwm = hwmgr.HWManager()
hwm.start()
# cl = client.HWClient()  # ...Or here, gives the other (in use)
# cl.start()

【问题讨论】:

  • 你的情况有点混乱。如果您需要进程相互交谈,而不是其他进程,那么每个进程之间的 QueuePipe 是可以的,但看起来你想要广播。在这种情况下,我会编写一个 PubSub 类型的系统,您可以向其广播消息,在这种情况下,其他进程将接收所有消息,但可以设置为仅响应他们关心的消息。在这种情况下,每个子进程都会订阅到他们关心的publishers。这可以通过 Pipe 或来自 Manager 对象的共享对象关闭。
  • 这个的真正应用是一个硬件管理器进程来控制对硬件资源的访问。上面的机制只显示了另一个想要获取 I2C 数据的进程(在本例中为“主”进程)。例如,另一个衍生的进程可能会请求切换继电器,返回值将是成功/失败的 T/F。常见的进程是硬件管理器——它可以在单个 mp.Queue 上接收命令,但返回值去往不同的地方。我不想每次有新进程请求服务时都重写HWManager,只是为了提供一个新的返回Queue。

标签: python multiprocessing


【解决方案1】:

这听起来像是需要标准的客户端-服务器架构。您可以使用 UNIX 域套接字(或 Windows 上的命名管道)。多处理模块使得在进程之间传递 python 对象变得超级容易。服务器代码示例结构:

from multiprocessing.connection import Listener

listener = Listener('somefile', 'AF_UNIX')

queue = Queue()
def worker():
    while True:
        conn, cmd = queue.get()
        result = execute_cmd(cmd)
        conn.send(result)
        queue.task_done()


for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()


while True:
    conn = listener.accept()
    cmd = conn.recv()
    queue.put((conn, cmd)) # Do processing of the queue in another thread/process and write result to conn

客户端看起来像:

from multiprocessing.connection import Client
client = Client('somefile', 'AF_UNIX')

client.send(cmd)
result = client.recv()

上面的代码为工作线程使用线程,但您也可以使用多处理模块轻松为工作线程创建进程。详情请见docs

【讨论】:

  • 那么,我会将queue.append((conn, cmd)) 替换为在“服务器”中运行方法的代码吗?这难道不是还有可能,如果两个进程都在等待recv(),它们可能会混淆彼此的返回数据吗?
  • 不,连接对象对于每个客户端进程都是唯一的,就像套接字一样。编程模型有意与套接字 API 非常相似。您会将 cmd 的结果写入 conn,以便正确的客户端进程获得结果。
  • @BugSpray 我已经添加了一些关于它如何工作的更多细节。
  • 啊,好的。所以,为了确认一下,我可以在所有客户端应用程序中使用相同的Client('somefile', 'AF_UNIX')(不需要为每个客户端使用不同的address 参数?(抱歉,API 文档不清楚)
  • 是的,您必须在所有客户端进程中使用相同的域套接字地址(只是一个名称)。在幕后,它只是一个命名管道。有关更多详细信息,请参阅stackoverflow.com/questions/9644251/…
猜你喜欢
  • 2014-07-13
  • 1970-01-01
  • 2013-12-02
  • 2021-11-24
  • 2015-02-03
  • 1970-01-01
  • 2020-04-28
相关资源
最近更新 更多