【问题标题】:How do you tell whether sys.stdin.readline() is going to block?你如何判断 sys.stdin.readline() 是否会阻塞?
【发布时间】:2018-10-19 13:19:59
【问题描述】:

如何确定对 sys.stdin.readline() 的调用(或更一般地,对任何基于文件描述符的文件对象的 readline() 调用)是否会阻塞?

当我在 python 中编写基于行的文本过滤程序时会出现这种情况; 也就是说,程序反复从输入中读取一行文本,可能会对其进行转换,然后将其写入输出。

我想实施一个合理的输出缓冲策略。 我的标准是:

  1. 在处理数以百万计的数据时应该是高效的 批量行 - 主要缓冲输出,偶尔刷新。
  2. 它不应该在保持缓冲输出的同时阻塞输入。

因此,无缓冲输出不好,因为它违反了 (1)(对操作系统的写入过多)。 并且行缓冲输出不好,因为它仍然违反(1) (在一百万行中的每一行上将输出刷新到操作系统是没有意义的)。 并且默认缓冲输出不好,因为它违反了(2)(如果输出到文件或管道,它将不适当地保留输出)。

我认为,在大多数情况下,一个好的解决方案是: “每当(其缓冲区已满或)sys.stdin.readline() 即将阻塞时刷新 sys.stdout”。 可以实现吗?

(请注意,我并不认为此策略适用于所有案例。例如, 在程序受 CPU 限制的情况下,这可能并不理想;在这种情况下,这可能是明智的 更频繁地刷新,以避免在进行长时间计算时保留输出。)

为了明确起见,假设我正在 python 中实现 unix 的“cat -n”程序。

(实际上 "cat -n" 比 line-at-a-time 更智能;也就是说,它知道如何 在读取整行之前读取和写入行的一部分; 但是,对于这个例子,无论如何我都会一次一行地实现它。)

行缓冲实现

(表现良好,但违反标准(1),即由于冲洗过多,速度过慢):

#!/usr/bin/python
# cat-n.linebuffered.py
import sys
num_lines_read = 0
while True:
  line = sys.stdin.readline()
  if line == '': break
  num_lines_read += 1
  print("%d: %s" % (num_lines_read, line))
  sys.stdout.flush()

默认缓冲实现

(快速但违反标准(2),即不友好的输出扣留)

#!/usr/bin/python
# cat-n.defaultbuffered.py
import sys
num_lines_read = 0
while True:
  line = sys.stdin.readline()
  if line == '': break
  num_lines_read += 1
  print("%d: %s" % (num_lines_read, line))

期望的实现:

#!/usr/bin/python
num_lines_read = 0
while True:
  if sys_stdin_readline_is_about_to_block():  # <--- How do I implement this??
    sys.stdout.flush()
  line = sys.stdin.readline()
  if line == '': break
  num_lines_read += 1
  print("%d: %s" % (num_lines_read, line))

那么问题来了:是否可以实现sys_stdin_readline_is_about_to_block()

我想要一个适用于 python2 和 python3 的答案。 我已经研究了以下每种技术,但到目前为止都没有成功。

  • 使用select([sys.stdin],[],[],0) 确定从 sys.stdin 读取是否会阻塞。 (当 sys.stdin 是缓冲文件对象时,这不起作用,至少有一个可能有两个原因:(1)如果部分行准备好从底层输入管道读取,它会错误地说“不会阻塞”, (2) 如果 sys.stdin 的缓冲区包含完整的输入行但底层管道还没有准备好进行额外读取,它会错误地说“将阻塞”......我认为)。

  • 非阻塞 io,使用 os.fdopen(sys.stdin.fileno(), 'r')fcntlO_NONBLOCK (在任何 python 版本中,我都无法让它与 readline() 一起使用: 在 python2.7 中,只要有部分行进入,它就会丢失输入; 在python3中,似乎无法区分“会阻塞” 和输入结束。 ??)

  • asyncio(我不清楚 python2 中有什么可用的;我认为它不适用于 sys.stdin;但是,我仍然对仅有效的答案感兴趣从 subprocess.Popen()) 返回的管道读取时。

  • 创建一个线程来执行readline()循环并将每一行传递给主线程 通过 queue.Queue 编程;然后主程序可以在之前轮询队列 从中读取每一行,每当它看到它即将阻塞时,首先刷新标准输出。 (我试过了,实际上它可以工作了,见下文,但它非常慢,比行缓冲慢得多。)

线程实现:

请注意,这并没有严格回答“如何判断 sys.stdin.readline() 是否会阻塞”的问题,但无论如何它设法实现了所需的缓冲策略。不过速度太慢了。

#!/usr/bin/python
# cat-n.threaded.py
import queue
import sys
import threading
def iter_with_abouttoblock_cb(callable, sentinel, abouttoblock_cb, qsize=100):
  # child will send each item through q to parent.
  q = queue.Queue(qsize)
  def child_fun():
    for item in iter(callable, sentinel):
      q.put(item)
    q.put(sentinel)
  child = threading.Thread(target=child_fun)
  # The child thread normally runs until it sees the sentinel,
  # but we mark it daemon so that it won't prevent the parent
  # from exiting prematurely if it wants.
  child.daemon = True
  child.start()
  while True:
    try:
      item = q.get(block=False)
    except queue.Empty:
      # q is empty; call abouttoblock_cb before blocking
      abouttoblock_cb()
      item = q.get(block=True)
    if item == sentinel:
      break  # do *not* yield sentinel
    yield item
  child.join()

num_lines_read = 0
for line in iter_with_abouttoblock_cb(sys.stdin.readline,
                                      sentinel='',
                                      abouttoblock_cb=sys.stdout.flush):
  num_lines_read += 1
  sys.stdout.write("%d: %s" % (num_lines_read, line))

验证缓冲行为:

以下命令(在 linux 上的 bash 中)显示了预期的缓冲行为:“defaultbuffered”缓冲过于激进,而“linebuffered”和“threaded”缓冲恰到好处。

(注意流水线末尾的| cat是默认做python block-buffer而不是line-buffer的。)

for which in defaultbuffered linebuffered threaded; do
  for python in python2.7 python3.5; do
    echo "$python cat-n.$which.py:"
      (echo z; echo -n a; sleep 1; echo b; sleep 1; echo -n c; sleep 1; echo d; echo x; echo y; echo z; sleep 1; echo -n e; sleep 1; echo f) | $python cat-n.$which.py | cat
  done
done

输出:

python2.7 cat-n.defaultbuffered.py:
[... pauses 5 seconds here. Bad! ...]
1: z
2: ab
3: cd
4: x
5: y
6: z
7: ef
python3.5 cat-n.defaultbuffered.py:
[same]
python2.7 cat-n.linebuffered.py:
1: z
[... pauses 1 second here, as expected ...]
2: ab
[... pauses 2 seconds here, as expected ...]
3: cd
4: x
5: y
6: z
[... pauses 2 seconds here, as expected ...]
6: ef
python3.5 cat-n.linebuffered.py:
[same]
python2.7 cat-n.threaded.py:
[same]
python3.5 cat-n.threaded.py:
[same]

时间安排:

(在 linux 上的 bash 中):

for which in defaultbuffered linebuffered threaded; do
  for python in python2.7 python3.5; do
    echo -n "$python cat-n.$which.py:  "
      timings=$(time (yes 01234567890123456789012345678901234567890123456789012345678901234567890123456789 | head -1000000 | $python cat-n.$which.py >| /tmp/REMOVE_ME) 2>&1)
      echo $timings
  done
done
/bin/rm /tmp/REMOVE_ME

输出:

python2.7 cat-n.defaultbuffered.py:  real 0m1.490s user 0m1.191s sys 0m0.386s
python3.5 cat-n.defaultbuffered.py:  real 0m1.633s user 0m1.007s sys 0m0.311s
python2.7 cat-n.linebuffered.py:  real 0m5.248s user 0m2.198s sys 0m2.704s
python3.5 cat-n.linebuffered.py:  real 0m6.462s user 0m3.038s sys 0m3.224s
python2.7 cat-n.threaded.py:  real 0m25.097s user 0m18.392s sys 0m16.483s
python3.5 cat-n.threaded.py:  real 0m12.655s user 0m11.722s sys 0m1.540s

重申一下,我想要一个在保持缓冲输出时从不阻塞的解决方案 (“线缓冲”和“线程”在这方面都很好), 这也很快:也就是说,速度与“defaultbuffered”相当。

【问题讨论】:

  • 我不确定您是否可以知道不会出错 stdin.readline() 是否会阻止。因此,我不知道您想要的实现是否可行。

标签: python


【解决方案1】:

你当然可以使用select:这就是它的用途,它的性能对于少数文件描述符来说是好的。您必须自己实现行缓冲/中断,以便在缓冲(结果是)部分行之后检测是否有更多可用输入。

你可以自己做all缓冲(这是合理的,因为select在文件描述符级别运行),或者你可以将stdin设置为非阻塞并使用@ 987654324@ 或 BufferedReader.read()(取决于您的 Python 版本)以使用可用的任何内容。如果您的输入可能是 Internet 套接字,您必须使用非阻塞输入而不考虑缓冲,因为 select 的常见实现可能会虚假地指示来自套接字的可读数据。 (在这种情况下,Python 2 版本引发 IOErrorEAGAIN;Python 3 版本返回 None。)

(os.fdopen 在这里没有帮助,因为它不会为fcntl 创建新的文件描述符以供使用。在某些系统上,您可以使用@987654333 打开/dev/stdin @.)

基于默认(缓冲)file.read() 的 Python 2 实现:

import sys,os,select,fcntl,errno

fcntl.fcntl(sys.stdin.fileno(),fcntl.F_SETFL,os.O_NONBLOCK)

rfs=[sys.stdin.fileno()]
xfs=rfs+[sys.stdout.fileno()]

buf=""
lnum=0
timeout=None
rd=True
while rd:
  rl,_,xl=select.select(rfs,(),xfs,timeout)
  if xl: raise IOError          # "exception" occurred (TCP OOB data?)
  if rl:
    try: rd=sys.stdin.read()    # read whatever we have
    except IOError as e:        # spurious readiness?
      if e.errno!=errno.EAGAIN: raise # die on other errors
    else: buf+=rd
    nl0=0                       # previous newline
    while True:
      nl=buf.find('\n',nl0)
      if nl<0:
        buf=buf[nl0:]           # hold partial line for "processing"
        break
      lnum+=1
      print "%d: %s"%(lnum,buf[nl0:nl])
      timeout=0
      nl0=nl+1
  else:                         # no input yet
    sys.stdout.flush()
    timeout=None

if buf: sys.stdout.write("%d: %s"%(lnum+1,buf)) # write any partial last line

对于cat -n,我们可以在获得部分行后立即写出它们,但这会保留它们以表示一次处理整行。

在我(不起眼的)机器上,您的 yes 测试需要“真正的 0m2.454s 用户 0m2.144s sys 0m0.504s”。

【讨论】:

  • 感谢您的提示。关于用于非阻塞 i/o 的 fdopen/fcnt,我使用正确,只是在写问题时输入错误;我刚刚修好了。我认为我遇到的问题是我仍在尝试使用文本模式和 sys.stdin.readline() 即使使用非阻塞 i/o,但这根本不起作用(以各种不优雅的方式,例如,在 python3 中,当没有可读取的内容时返回“”,使其与文件结尾无法区分)。
  • @DonHatch:我很乐意编辑fdopen 部分,但首先要问一个问题:你为什么要fdopen stdin?它甚至没有创建一个新的文件描述符,更不用说一个可以支持O_NONBLOCK的新打开文件description了。
  • 你说得对,我对设置 O_NONBLOCK 的描述仍然很混乱。我实际上也尝试了 fdopen stdin.fileno() ,但这本身并不是为了获得非阻塞,而是尝试获取具有各种不同缓冲模式的文件对象......特别是 buffering=0 (这反过来显然要求我将其置于非文本模式'rb'......不幸的是侵入性,因为我希望不要弄乱原来的 readline() 语义)。
  • @DonHatch:至少在 Python 3 中,您可以使用sys.stdin.buffer.raw 获取底层的无缓冲流。您不能拥有无缓冲的文本流,因为它们可能必须保留不到一个字符的字节。
  • 你的回答是“你可以”,但听起来像是“你必须”,因为我只看到了一条穿过迷宫的方法,它涉及放弃 readline()。总结一下:“要判断读取是否会阻塞,请在文件描述符上使用 select(...,timeout=0),这意味着您必须自己实现缓冲(包括 readline()),这意味着您需要输入流要无缓冲,这反过来意味着它必须是二进制的(因此,如果您想要文本模式,您也必须自己实现);而且由于 select 可能会误报,您需要 fcntl 将 fd 设为非阻塞”。我有这个权利吗?
【解决方案2】:
# -*- coding: utf-8 -*-
import os
import sys
import select
import fcntl
import threading


class StdInput:
    def __init__(self):
        self.close_evt = threading.Event()

        fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFL, fcntl.fcntl(sys.stdin.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK);
        self.input = (sys.stdin.original_stdin if hasattr(sys.stdin, "original_stdin") else sys.stdin)
        self.epoll = select.epoll()
        self.epoll.register(sys.stdin.fileno(), select.EPOLLIN | select.EPOLLPRI | select.EPOLLERR | select.EPOLLHUP | select.EPOLLRDBAND)

    def read(self):
        while not self.close_evt.is_set():
            input_line = self.input.readline()
            # If the object is in non-blocking mode and no bytes are available, None is returned.
            if input_line is not None and len(input_line) > 0:
                break           
            print("Nothing yet...")
            evt_lst = self.epoll.poll(1.0)  # Timeout 1s
            print("Poll exited: event list size={}".format(len(evt_lst)))
            if len(evt_lst) > 0:
                assert len(evt_lst) == 1
                if (evt_lst[0][1] & (select.EPOLLERR | select.EPOLLHUP)) > 0:
                    raise Exception("Ooops!!!")
        return input_line


if __name__ == "__main__":
    i = StdInput()

    def alm_handle():
        i.close_evt.set()
    threading.Timer(4, alm_handle).start()

    print("Reading...")
    input_line = i.read()
    print("Read='{}'".format(input_line))

【讨论】:

  • 虽然此代码可能会回答问题,但提供有关它如何和/或为什么解决问题的额外上下文将提高​​答案的长期价值。
猜你喜欢
  • 1970-01-01
  • 2019-04-01
  • 1970-01-01
  • 2014-03-23
  • 1970-01-01
  • 2013-02-21
  • 1970-01-01
  • 2013-06-23
  • 2012-08-23
相关资源
最近更新 更多