【问题标题】:Synchronous/Asynchronous behaviour of python Pipespython管道的同步/异步行为
【发布时间】:2013-04-21 14:41:28
【问题描述】:

在我的应用程序中,我使用来自多处理模块的管道在 python 进程之间进行通信。 最近我观察到一种奇怪的行为,具体取决于我通过它们发送的数据的大小。 根据 python 文档,这些管道基于连接并且应该以异步方式运行,但有时它们会在发送时卡住。如果我在每个连接中启用全双工,一切正常,即使我没有使用连接来发送和收听。 谁能解释这种行为?

  1. 100 个浮点数,全双工禁用
    代码工作,利用异步性。
  2. 100 个浮点数,启用全双工
    该示例按预期运行良好。
  3. 10000 个浮点数,全双工禁用
    即使处理较小的数据也没问题,但执行会被永久阻止。
  4. 10000 个浮点数,启用全双工
    又好了。

代码(这不是我的生产代码,它只是说明了我的意思):

from collections import deque
from multiprocessing import Process, Pipe
from numpy.random import randn
from os import getpid

PROC_NR = 4
DATA_POINTS = 100
# DATA_POINTS = 10000


def arg_passer(pipe_in, pipe_out, list_):
    my_pid = getpid()
    print "{}: Before send".format(my_pid)
    pipe_out.send(list_)
    print "{}: After send, before recv".format(my_pid)
    buf = pipe_in.recv()
    print "{}: After recv".format(my_pid)


if __name__ == "__main__":
    pipes = [Pipe(False) for _ in range(PROC_NR)]
    # pipes = [Pipe(True) for _ in range(PROC_NR)]
    pipes_in = deque(p[0] for p in pipes)
    pipes_out = deque(p[1] for p in pipes)
    pipes_in.rotate(1)
    pipes_out.rotate(-1)

    data = [randn(DATA_POINTS) for foo in xrange(PROC_NR)]
    processes = [Process(target=arg_passer, args=(pipes_in[foo], pipes_out[foo], data[foo]))
                 for foo in xrange(PROC_NR)]

    for proc in processes:
        proc.start()

    for proc in processes:
        proc.join()

【问题讨论】:

    标签: python linux multiprocessing pipe


    【解决方案1】:

    首先,值得注意的是multiprocessing.Pipe类的实现...

    def Pipe(duplex=True):
        '''
        Returns pair of connection objects at either end of a pipe
        '''
        if duplex:
            s1, s2 = socket.socketpair()
            s1.setblocking(True)
            s2.setblocking(True)
            c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
            c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
            s1.close()
            s2.close()
        else:
            fd1, fd2 = os.pipe()
            c1 = _multiprocessing.Connection(fd1, writable=False)
            c2 = _multiprocessing.Connection(fd2, readable=False)
    
        return c1, c2
    

    区别在于半双工“管道”使用anonymous pipe,而全双工“管道”实际上使用Unix domain socket,因为匿名管道本质上是单向的。

    我不确定在这种情况下您所说的“异步”一词是什么意思。如果您的意思是“非阻塞 I/O”,那么值得注意的是这两种实现都默认使用阻塞 I/O。


    其次,值得注意的是您尝试发送的数据的腌制大小......

    >>> from numpy.random import randn
    >>> from cPickle import dumps
    >>> len(dumps(randn(100)))
    2479
    >>> len(dumps(randn(10000)))
    237154
    

    第三,来自pipe(7) 联机帮助页...

    管道容量

    管道的容量有限。如果管道已满,则 write(2) 将阻塞 或失败,取决于是否设置了 O_NONBLOCK 标志(见下文)。不同的 实现对管道容量有不同的限制。应用程序应 不依赖于特定的能力:应用程序的设计应使 读取进程在数据可用时立即消费数据,因此写入进程 不会一直被阻止。

    在Linux 2.6.11之前的版本中,管道的容量与系统相同 页面大小(例如,i386 上的 4096 字节)。从 Linux 2.6.11 开始,管道容量为 65536 字节。


    因此,实际上,您已经创建了一个死锁,其中所有子进程都阻塞了 pipe_out.send() 调用,并且它们都无法从其他进程接收任何数据,因为您正在发送所有 237,154 字节的数据一击即填满了 65,536 字节的缓冲区。

    您可能只想使用 Unix 域套接字版本,但它目前工作的唯一原因是它具有比管道更大的缓冲区大小,并且您会发现如果增加DATA_POINTS 的数量增加到 100,000。

    “quick n'dirty hack”解决方案是将数据分成更小的块进行发送,但依赖于特定大小的缓冲区并不是一个好习惯。

    更好的解决方案是在 pipe_out.send() 调用上使用非阻塞 I/O,尽管我对 multiprocessing 模块不够熟悉,无法确定使用该模块实现它的最佳方法。

    伪代码类似于...

    while 1:
        if we have sent all data and received all data:
            break
        send as much data as we can without blocking
        receive as much data as we can without blocking
        if we didn't send or receive anything in this iteration:
            sleep for a bit so we don't waste CPU time
            continue
    

    ...或者您可以使用 Python select 模块来避免睡眠时间超过必要的时间,但是,再次将其与 multiprocessing.Pipe 集成可能会很棘手。

    multiprocessing.Queue 类可能会为您完成所有这些工作,但我以前从未使用过它,因此您必须进行一些实验。

    【讨论】:

    • 感谢您的详尽回答。现在我明白了这个问题。
    猜你喜欢
    • 2014-12-05
    • 2023-03-29
    • 1970-01-01
    • 2018-04-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-04-06
    • 2019-07-11
    相关资源
    最近更新 更多