【问题标题】:Using python multiprocessing pipes使用 python 多处理管道
【发布时间】:2011-12-17 19:35:39
【问题描述】:

我正在尝试编写一个类,该类将使用多个进程计算校验和,从而利用多个内核。我有一个非常简单的类,它在执行一个简单的案例时效果很好。但是每当我创建该类的两个或更多实例时,worker 永远不会退出。似乎它永远不会收到管道已被父级关闭的消息。

所有代码都可以在下面找到。我先分别计算 md5 和 sha1 校验和,这样可行,然后我尝试并行执行计算,然后程序在该关闭管道时锁定。

这里发生了什么?为什么管道没有按我预期的那样工作?我想我可以通过在队列上发送“停止”消息并让孩子以这种方式退出来解决问题,但我真的很想知道为什么这不起作用。

import multiprocessing
import hashlib

class ChecksumPipe(multiprocessing.Process):
    def __init__(self, csname):
        multiprocessing.Process.__init__(self, name = csname)
        self.summer = eval("hashlib.%s()" % csname)
        self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False)
        self.result_queue = multiprocessing.Queue(1)
        self.daemon = True
        self.start()
        self.child_conn.close() # This is the parent. Close the unused end.

    def run(self):
        self.parent_conn.close() # This is the child. Close unused end.
        while True:
            try:
                print "Waiting for more data...", self
                block = self.child_conn.recv_bytes()
                print "Got some data...", self
            except EOFError:
                print "Finished work", self
                break
            self.summer.update(block)
        self.result_queue.put(self.summer.hexdigest())
        self.result_queue.close()
        self.child_conn.close()

    def update(self, block):
        self.parent_conn.send_bytes(block)

    def hexdigest(self):
        self.parent_conn.close()
        return self.result_queue.get()


def main():
    # Calculating the first checksum works
    md5 = ChecksumPipe("md5")
    md5.update("hello")
    print "md5 is", md5.hexdigest()

    # Calculating the second checksum works
    sha1 = ChecksumPipe("sha1")
    sha1.update("hello")
    print "sha1 is", sha1.hexdigest()

    # Calculating both checksums in parallel causes a lockup!
    md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1")
    md5.update("hello")
    sha1.update("hello")
    print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest() # Lockup here!

main()

附言。这个问题已经解决了如果有人有兴趣,这里是上面代码的工作版本:

import multiprocessing
import hashlib

class ChecksumPipe(multiprocessing.Process):

    all_open_parent_conns = []

    def __init__(self, csname):
        multiprocessing.Process.__init__(self, name = csname)
        self.summer = eval("hashlib.%s()" % csname)
        self.child_conn, self.parent_conn = multiprocessing.Pipe(duplex = False)
        ChecksumPipe.all_open_parent_conns.append(self.parent_conn)
        self.result_queue = multiprocessing.Queue(1)
        self.daemon = True
        self.start()
        self.child_conn.close() # This is the parent. Close the unused end.

    def run(self):
        for conn in ChecksumPipe.all_open_parent_conns:
            conn.close() # This is the child. Close unused ends.
        while True:
            try:
                print "Waiting for more data...", self
                block = self.child_conn.recv_bytes()
                print "Got some data...", self
            except EOFError:
                print "Finished work", self
                break
            self.summer.update(block)
        self.result_queue.put(self.summer.hexdigest())
        self.result_queue.close()
        self.child_conn.close()

    def update(self, block):
        self.parent_conn.send_bytes(block)

    def hexdigest(self):
        self.parent_conn.close()
        return self.result_queue.get()

def main():
    # Calculating the first checksum works
    md5 = ChecksumPipe("md5")
    md5.update("hello")
    print "md5 is", md5.hexdigest()

    # Calculating the second checksum works
    sha1 = ChecksumPipe("sha1")
    sha1.update("hello")
    print "sha1 is", sha1.hexdigest()

    # Calculating both checksums also works fine now
    md5, sha1 = ChecksumPipe("md5"), ChecksumPipe("sha1")
    md5.update("hello")
    sha1.update("hello")
    print "md5 and sha1 is", md5.hexdigest(), sha1.hexdigest()

main()

【问题讨论】:

  • 你可能想在self.parent_conn.close()之后添加ChecksumPipe.all_open_parent_conns.remove(self.parent_conn),让连接被破坏。
  • self.summer = eval("hashlib.%s()" % csname) 看起来很难看。 self.summer = getattr(hashlib, csname)()呢?

标签: python multiprocessing pipe python-2.6


【解决方案1】:

是的,这确实是令人惊讶的行为。

但是,如果您查看两个并行子进程的 lsof 的输出,很容易注意到第二个子进程打开了更多文件描述符。

发生的情况是,当两个并行子进程启动时,第二个子进程继承父级的管道,因此当父级调用self.parent_conn.close()时,第二个子进程仍然打开该管道文件描述符,因此管道文件描述不会在内核中关闭(引用计数大于 0),其效果是第一个并行子进程中的 self.child_conn.recv_bytes() 永远不会被 read()s EOFEOFError 抛出。

您可能需要发送显式关闭消息,而不是仅仅关闭文件描述符,因为似乎几乎无法控制哪些文件描述符在哪些进程之间共享(没有 close-on-fork 文件描述符标志)。

【讨论】:

  • 谢谢!这为我清除了一切。我在我的示例中通过使用包含所有实例中所有打开的连接的共享类变量来解决它,以便孩子可以关闭他们不需要的所有套接字。
猜你喜欢
  • 2018-01-13
  • 2017-07-16
  • 1970-01-01
  • 1970-01-01
  • 2018-06-27
  • 2019-02-15
  • 1970-01-01
  • 2020-12-07
  • 2013-07-06
相关资源
最近更新 更多