【问题标题】:Does multiprocessing.Pipe have to be passed to subprocess through inheritancemultiprocessing.Pipe 是否必须通过继承传递给子进程
【发布时间】:2016-04-22 08:23:32
【问题描述】:

我了解multiprocessing.Queue has to be passed to subprocess through inheritance。但是,当我尝试通过消息传递将Pipe 传递给子进程时,就像下面的代码一样,我得到的错误并不是说“管道只能通过继承在进程之间共享”。相反,它在q.get() 处失败,错误为TypeError: Required argument 'handle' (pos 1) not found。我想知道是否有可能这样做?假设管道是使用 linux 命名管道实现的,那么重要的是管道的名称,它可能是要序列化并在进程之间传递的状态,对吗?

from multiprocessing import Process, Pipe, Queue    

def reader(q):
  output_p = q.get()
  msg = output_p.recv()
  while msg is not None:
    msg = output_p.recv()    

if __name__ == '__main__':
    q = Queue()
    reader_p = Process(target=reader, args=(q,))
    reader_p.start()     # Launch the reader process

    output_p, input_p = Pipe(True)
    q.put(output_p)

    input_p.send('MyMessage')
    input_p.send(None)
    reader_p.join()

【问题讨论】:

    标签: python pipe multiprocessing python-multiprocessing


    【解决方案1】:

    这是一个bug,已在 Python 3 中修复。

    您在 Python 3 中的代码可以完美运行。

    【讨论】:

    • 酷!很高兴知道那件事。顺便说一句,python 3 中的底层实现是否使用类似于 Manager 的东西?在我的情况下,Manager 的一些实现细节对我不起作用。
    • 据我所知,底层实现仍然是一个操作系统管道。他们只是修复了它的序列化方式以便安全传输。
    【解决方案2】:

    noxadofox 在这里给出了正确答案。我正在添加一个我设计的示例来验证管道不需要继承。在这个例子中,我在执行器启动它的两个进程后创建了第二个管道,并将它作为参数传递给现有进程。

    """ Multiprocessing pipe and queue test """
    import multiprocessing
    import concurrent.futures
    import time
    
    
    class Example:
        def __init__(self):
            manager = multiprocessing.Manager()
            q = manager.Queue()
            executor = concurrent.futures.ProcessPoolExecutor(max_workers=2)
    
            pipe_out_1, pipe_in_1 = multiprocessing.Pipe(duplex=True)
            executor.submit(self.requester, q, pipe_in_1)
            executor.submit(self.worker, q, pipe_out_1)
            print(executor._processes)
    
            pipe_out_2, pipe_in_2 = multiprocessing.Pipe(duplex=True)
            executor.submit(self.requester, q, pipe_in_2)
            executor.submit(self.worker, q, pipe_out_2)
            print(executor._processes)
    
        @staticmethod
        def worker(q, pipe_out):
            task = q.get()
            print('worker got task {}'.format(task))
            pipe_out.send(task + '-RESPONSE')
            print('loop_proc sent')
    
        @staticmethod
        def requester(q, pipe_in):
            q.put('TASK')
            response = pipe_in.recv()
            print('requester got response {}'.format(response))
            time.sleep(2)
    
    
    if __name__ == '__main__':
        Example()
        time.sleep(30)
    

    【讨论】:

      猜你喜欢
      • 2016-07-21
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-03-14
      • 1970-01-01
      相关资源
      最近更新 更多