【问题标题】:Repeatedly write to stdin and read from stdout of a process from python重复写入标准输入并从 python 进程的标准输出读取
【发布时间】:2012-03-31 00:41:13
【问题描述】:

我有一段 fortran 代码,它从 STDIN 读取一些数字并将结果写入 STDOUT。例如:

do
  read (*,*) x
  y = x*x
  write (*,*) y
enddo

所以我可以从 shell 启动程序并获得以下输入/输出序列:

5.0
25.0
2.5
6.25

现在我需要在 python 中执行此操作。在徒劳地与 subprocess.Popen 搏斗并查看了这个网站上的旧问题后,我决定使用 pexpect.spawn:

import pexpect, os
p = pexpect.spawn('squarer')
p.setecho(False)
p.write("2.5" + os.linesep)
res = p.readline()

它有效。问题是,我需要在 python 和我的 fortran 程序之间传递的 真实 数据是一个包含 100,000 个(或更多)双精度浮点数的数组。如果它们包含在一个名为x 的数组中,那么

p.write(' '.join(["%.10f"%k for k in x]) + os.linesep)

超时并出现来自 pexpect 的以下错误消息:

buffer (last 100 chars):   
before (last 100 chars):   
after: <class 'pexpect.TIMEOUT'>  
match: None  
match_index: None  
exitstatus: None
flag_eof: False
pid: 8574
child_fd: 3
closed: False
timeout: 30
delimiter: <class 'pexpect.EOF'>
logfile: None
logfile_read: None
logfile_send: None
maxread: 2000
ignorecase: False
searchwindowsize: None
delaybeforesend: 0.05
delayafterclose: 0.1
delayafterterminate: 0.1

除非 x 的元素少于 303 个。有没有办法向/从另一个程序的 STDIN/STDOUT 传递大量数据?

我曾尝试将数据拆分成更小的块,但随后我的速度损失了 很多

提前致谢。

【问题讨论】:

  • 这是进程间(管道)缓冲的问题。请检查我的答案。
  • 我正是遇到了这个问题。感谢您的报道。

标签: python


【解决方案1】:

找到了使用子流程模块的解决方案,所以如果有人需要做同样的事情,我将其发布在这里以供参考。

import subprocess as sbp

class ExternalProg:

    def __init__(self, arg_list):
        self.opt = sbp.Popen(arg_list, stdin=sbp.PIPE, stdout=sbp.PIPE, shell=True, close_fds=True)

    def toString(self,x):
        return ' '.join(["%.12f"%k for k in x])

    def toFloat(self,x):
        return float64(x.strip().split())

    def sendString(self,string):
        if not string.endswith('\n'):
            string = string + '\n'
        self.opt.stdin.write(string)

    def sendArray(self,x):
        self.opt.stdin.write(self.toString(x)+'\n')

    def readInt(self):
        return int(self.opt.stdout.readline().strip())

    def sendScalar(self,x):
        if type(x) == int:
            self.opt.stdin.write("%i\n"%x)
        elif type(x) == float:
            self.opt.stdin.write("%.12f\n"%x)

    def readArray(self):
        return self.toFloat(self.opt.stdout.readline())

    def close(self):
        self.opt.kill()

该类被一个名为“优化器”的外部程序调用,如下所示:

optim = ExternalProg(['./optimizer'])
optim.sendScalar(500) # send the optimizer the length of the state vector, for example
optim.sendArray(init_x) # the initial guess for x
optim.sendArray(init_g) # the initial gradient g
next_x = optim.readArray() # get the next estimate of x
next_g = evaluateGradient(next_x) # calculate gradient at next_x from within python
# repeat until convergence

在 fortran 方面(编译为提供可执行“优化器”的程序),将读取 500 个元素的向量:

read(*,*) input_vector(1:500)

并且会这样写出来:

write(*,'(500f18.11)') output_vector(1:500)

就是这样!我已经用最多 200,000 个元素的状态向量对其进行了测试(这是我现在需要的上限)。希望这对我以外的人有所帮助。此解决方案适用于 ifort 和 xlf90,但不适用于 gfortran,原因我不明白。

【讨论】:

  • 您说您找到了使用“通信”的解决方案,但您没有在代码中的任何地方使用它。不过谢谢你写下的内容对我有用!
【解决方案2】:

示例 squarer.py 程序(它恰好在 Python 中,使用您的 Fortran 可执行文件):

#!/usr/bin/python
import sys
data= sys.stdin.readline() # expecting lots of data in one line
processed_data= data[-2::-1] # reverse without the newline
sys.stdout.write(processed_data+'\n')

target.py 程序示例:

import thread, Queue
import subprocess as sbp

class Companion(object):
    "A companion process manager"
    def __init__(self, cmdline):
        "Start the companion process"
        self.companion= sbp.Popen(
            cmdline, shell=False,
            stdin=sbp.PIPE,
            stdout=sbp.PIPE)
        self.putque= Queue.Queue()
        self.getque= Queue.Queue()
        thread.start_new_thread(self._sender, (self.putque,))
        thread.start_new_thread(self._receiver, (self.getque,))

    def _sender(self, que):
        "Actually sends the data to the companion process"
        while 1:
            datum= que.get()
            if datum is Ellipsis:
                break
            self.companion.stdin.write(datum)
            if not datum.endswith('\n'):
                self.companion.stdin.write('\n')

    def _receiver(self, que):
        "Actually receives data from the companion process"
        while 1:
            datum= self.companion.stdout.readline()
            que.put(datum)

    def close(self):
        self.putque.put(Ellipsis)

    def send(self, data):
        "Schedule a long line to be sent to the companion process"
        self.putque.put(data)

    def recv(self):
        "Get a long line of output from the companion process"
        return self.getque.get()

def main():
    my_data= '12345678 ' * 5000
    my_companion= Companion(("/usr/bin/python", "squarer.py"))

    my_companion.send(my_data)
    my_answer= my_companion.recv()
    print my_answer[:20] # don't print the long stuff
    # rinse, repeat

    my_companion.close()

if __name__ == "__main__":
    main()

main 函数包含您将使用的代码:设置 Companion 对象、companion.send 长行数据、companion.recv 一行。根据需要重复。

【讨论】:

  • 嗨 ΤZΩΤZΙΟΥ,感谢您的建议。但它不起作用:-(我将您的代码复制并粘贴到两个文件 squarer.py 和 target.py 中。但是当我执行“python target.py”时,我得到了一个无休止的等待期,没有任何反应。所以我执行了“%从 ipython shell 运行 target.py",然后按 Ctrl+C 中断等待,并得到以下回溯: 32 def recv(self): ---> 33 return self.getque.get() /usr/lib /python2.6/Queue.pyc in get(self, block, ti​​meout) --> 168 self.not_empty.wait() /usr/lib/python2.6/threading.pyc in wait(self, timeout) --> 239 waiter.acquire() 救命!
  • 我可以在某处找到这些非常长的行之一(可能在bpaste 或其他粘贴箱中),以便我可以近似您的条件吗?这段代码为我运行……
  • 我只是将这里的代码与我的代码进行了比较,在if not datum.endswith 行中出现了缩进错误。你能用当前版本的代码再试一次吗?
  • 我刚刚更正了缩进错误,您的代码现在可以为我运行(即使是 500,000 个字符的 my_data)。我现在将在我的实际代码中实现它,看看它是否仍然有效:-)
  • 嗨 ΤZΩΤZΙΟΥ,在我的实际应用程序中,recv() 调用无限期地卡住了。我的 fortran 应用程序用“write(,) i”写入一个整数,但由于某种原因,这从未到达 python :-( 有没有办法可以向您发送我正在使用的 fortran 代码?
【解决方案3】:

这是一个巨大的简化:将你的 Python 分成两部分。

python source.py | squarer | python sink.py

squarer 应用程序是您的 Fortran 代码。从标准输入读取,写入标准输出。

你的 source.py 是你的 Python

import sys
sys.stdout.write(' '.join(["%.10f"%k for k in x]) + os.linesep)

或者,也许更简单一点,即

from __future__ import print_function
print( ' '.join(["{0:.10f}".format(k) for k in x]) )

而你的sink.py 是这样的。

import fileinput
for line in fileinput.input():
    # process the line 

分离源、平方和接收器可以获得 3 个单独的进程(而不是 2 个),并且会使用更多内核。更多内核 == 更多并发 == 更多乐趣。

【讨论】:

  • 好建议,谢谢。但对我不起作用,因为对于我的应用程序squarer 实际上是一个优化器,它读取状态向量(许多变量)并建议一个新的。 python 脚本将当前向量提供给优化器,接受新向量,用它进行一些模拟,然后将模拟结果重新提供给优化器。所以source.pysink.py 对我来说是同一个脚本,并且需要知道彼此的变量等等。
  • @TM5:这会无限循环吗?这可以运行多少次有上限吗?与模拟结果分开的数据的原始来源是什么?您的初始要求不反映任何这种复杂性。
  • 不,它不会无限循环,但退出条件可以通过python或fortran来确定。目前,让我们假设 fortran 确定终止条件。恐怕我不明白您所说的“数据的原始来源”是什么意思。基本上,步骤如下:(1)python对x0进行模拟,计算f'(x0),将其提供给fortran,(2)fortran根据x0和f'(x0)建议一个新的x1,将其提供给python, (3) 回到第 1 步,将 x0 替换为 x1。
【解决方案4】:

我认为你这里只添加一个换行符:

p.write(' '.join(["%.10f"%k for k in x]) + os.linesep)

而不是每行添加一个。

【讨论】:

  • 是的,我只在末尾添加了一个换行符,因为 real fortran 代码看起来像:read (*,*) x(1:n_state) 其中n_state 设置为(比如说)100,000。但我也看到,就read 语句而言,是否在数字之间添加换行符并没有什么区别。附言- 为什么原始帖子中的格式规则不适用于 cmets?例如,对于这个注释,我不能缩进四个空格来表示一段代码。我也不能为这个“P.S.”单独写一段
【解决方案5】:

看起来您正在超时(默认超时,我相信是 30 秒),因为准备、发送、接收和处理大量数据需要花费大量时间。根据the docstimeout=expect 方法的可选命名参数,您没有调用它 - 也许有一种未记录的方法可以在初始化程序中设置默认超时,可以通过仔细研究来源(或者,最坏的情况,通过入侵这些来源创建)。

如果 Fortran 程序一次读取并保存(例如)100 个项目,并且有提示,那么同步将变得非常容易。您能否为此目的修改您的 Fortran 代码,或者您更愿意采用无证/破解方法?

【讨论】:

  • 我怀疑读取和写入数据需要太多时间。当我有 303 个数字时,从 python 到 fortran 的传输需要不到一秒的时间(我计时了)。当我有 304 个号码时,它会在 30 秒后超时。 AFAIK,这个神奇的数字303 取决于我写的每个数字的位数,所以我认为这是字节数的问题。
  • @TM5,看起来一些缓冲区正在填满并且没有被正确刷新/读取(至少不是在 30 秒的超时内)。正如我在第 2 段中所建议的那样,将 Fortran 代码更改为一次接受 100 个数字,而不是一次大口地需要它们,这听起来越来越像最简单的解决方案;而且你没有告诉我们这对你的情况是否可行。
  • 我的真实状态向量包含 10,000 个元素(通常),它因运行而异。所以是的,可以修改 fortran 和 python 应用程序以将 I/O 分解成块,但是我必须传递一个“数据结束”信号,这不是很优雅。我想找到一种更优雅的方法。没有其他方法可以建立进程间通信吗?