【发布时间】:2013-04-21 14:41:28
【问题描述】:
在我的应用程序中,我使用来自多处理模块的管道在 python 进程之间进行通信。 最近我观察到一种奇怪的行为,具体取决于我通过它们发送的数据的大小。 根据 python 文档,这些管道基于连接并且应该以异步方式运行,但有时它们会在发送时卡住。如果我在每个连接中启用全双工,一切正常,即使我没有使用连接来发送和收听。 谁能解释这种行为?
-
100 个浮点数,全双工禁用
代码工作,利用异步性。 -
100 个浮点数,启用全双工
该示例按预期运行良好。 -
10000 个浮点数,全双工禁用
即使处理较小的数据也没问题,但执行会被永久阻止。 -
10000 个浮点数,启用全双工
又好了。
代码(这不是我的生产代码,它只是说明了我的意思):
from collections import deque
from multiprocessing import Process, Pipe
from numpy.random import randn
from os import getpid
PROC_NR = 4
DATA_POINTS = 100
# DATA_POINTS = 10000
def arg_passer(pipe_in, pipe_out, list_):
my_pid = getpid()
print "{}: Before send".format(my_pid)
pipe_out.send(list_)
print "{}: After send, before recv".format(my_pid)
buf = pipe_in.recv()
print "{}: After recv".format(my_pid)
if __name__ == "__main__":
pipes = [Pipe(False) for _ in range(PROC_NR)]
# pipes = [Pipe(True) for _ in range(PROC_NR)]
pipes_in = deque(p[0] for p in pipes)
pipes_out = deque(p[1] for p in pipes)
pipes_in.rotate(1)
pipes_out.rotate(-1)
data = [randn(DATA_POINTS) for foo in xrange(PROC_NR)]
processes = [Process(target=arg_passer, args=(pipes_in[foo], pipes_out[foo], data[foo]))
for foo in xrange(PROC_NR)]
for proc in processes:
proc.start()
for proc in processes:
proc.join()
【问题讨论】:
标签: python linux multiprocessing pipe