【问题标题】:Non-blocking read from multiple subprocesses (Python)从多个子进程进行非阻塞读取(Python)
【发布时间】:2013-07-16 02:23:51
【问题描述】:

我目前有以下代码,灵感来自Non-blocking read on a subprocess.PIPE in python 的答案。 它似乎可以正常工作,将行输出到屏幕上,但是它只对第一个创建的进程这样做,所有其他进程(正在运行)不会打印任何数据。

如何确保可以从多个子进程中读取数据(以非阻塞方式)?

#!/usr/bin/env python
import sys
import os
import subprocess
from threading import Thread
from Queue import Queue, Empty

STREAMER_URL = 'rtmp://127.0.0.1/app'
RTMPDUMP_EXECUTEABLE = 'rtmpdump'

def enqueue_output(out, queue):
    for line in iter(lambda: out.read(16), b''):
        queue.put(line)
    out.close()

def download_rtmp(media, filename):
  # Create parameters
  args=[RTMPDUMP_EXECUTEABLE]
  args.extend(['-r',media[0],'-y',media[1]])

  # Set output file
  OUTPUT_FILE = filename
  args.extend(['-o',OUTPUT_FILE])

  # Send rtmpdump any extra arguments
  if len(sys.argv) > 2:
    args.extend(sys.argv[2:])

  # Execute rtmpdump
  p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
  q = Queue()
  t = Thread(target=enqueue_output, args=(p.stdout, q))
  t.daemon = True # thread dies with the program
  t.start()
  return (p, q, t)

def main():

  # actual data is from somewhere else on the internet
  for (name, playpath, filepath) in data:
    print 'Spawning %s download...' % name
    PROCESSES.append(download_rtmp((STREAMER_URL, playpath), filepath))

  BUFS = dict()

  # infinite loop checking if all processes have finished
  while True:
    done = True
    for (process, queue, thread) in PROCESSES:
      try:
        readdata = queue.get_nowait()
      except Empty:
        pass
      else:
        if process in BUFS:
          readdata = BUFS[process] + readdata
        lines = readdata.split('\n')
        if len(lines) > 1:
          for line in lines[:-1]:
            print 'Line: %s' % line
        if '\r' in lines[-1]:
          lines = readdata.split('\r')
          for line in lines[:-1]:
            print 'Line2: %s' % line
        BUFS[process] = lines[-1]

      process.poll()

      if process.returncode is None:
        done = False
        break
    if done:
      break

  print "Done"

if __name__ == "__main__":
    main()

【问题讨论】:

标签: python io subprocess nonblocking


【解决方案1】:

我还没有弄清楚整个事情,但是if process.returncode is None: 的中断意味着在第一个进程完全退出之前您不会查看其他进程队列。而且我不确定你从哪里得到多队列轮询的东西,但它绝对可怕。

这个问题最好通过所有工作线程使用的单个返回队列来解决。 worker 传递 (process, line) 的元组,主线程阻塞等待所有 worker 的数据。

这确实是伪代码,但它看起来像:

STREAMER_URL = 'rtmp://127.0.0.1/app'
RTMPDUMP_EXECUTEABLE = 'rtmpdump'

def enqueue_output(process, queue):
    """read process stdout in small chunks and queue for processing"""
    for line in iter(lambda: out.read(16), b''):
        queue.put((process, line))
    process.wait()
    queue.put((process, None))

def download_rtmp(media, filename):
  # Create parameters
  args=[RTMPDUMP_EXECUTEABLE, '-r', media[0], '-y', media[1], '-o', filename]

  # Send rtmpdump any extra arguments
  # if len(sys.argv) > 2: no need for the if in list comprehension
  args.extend(sys.argv[2:])

  # Execute rtmpdump
  p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
  t = Thread(target=enqueue_output, args=(p, return_q))
  t.daemon = True # thread dies with the program
  t.start()
  return (p, t)

def main():
  THREADS = []
  BUFS = dict()

  # actual data is from somewhere else on the internet
  for (name, playpath, filepath) in data:
    print 'Spawning %s download...' % name
    process, thread = download_rtmp((STREAMER_URL, playpath), filepath)
    BUFS[process] = ''
    THREADS.append(thread)

  # all processes write to return_q and we process them here
  while BUFS:
    process, line = return_q.get()
    readdata = BUFS[process] + (line or '')
    if line is None:
        del BUFS[process]
    # I didn't try to figure this part out... basically, when line is
    # None, process is removed from BUFS so you know your end condition
    # and the following stuff should do its final processing.
    lines = readdata.split('\n')
    if len(lines) > 1:
      for line in lines[:-1]:
        print 'Line: %s' % line
    if '\r' in lines[-1]:
      lines = readdata.split('\r')
      for line in lines[:-1]:
        print 'Line2: %s' % line
    if line is not None:
        BUFS[process] = lines[-1]

【讨论】:

  • 问题是我想要命令完成之前的数据,即。我正在尝试解析并显示下载进度,因此使用 wait() 阻止并不能真正帮助实现该目标。
  • @adam - 如果您在谈论 enqueue_output,那么这不是问题。 for 循环一直运行,直到子进程标准输出在进程退出时关闭。该等待只是清理僵尸进程并获取返回码。
  • @adam - 如果你在谈论 return_q.get(),它会在每次工作线程发布数据时唤醒。
  • +1:这里一个队列就足够了。添加out.close() 以避免在enqueue_output() 中泄漏文件描述符。
  • 还是没有答案? @jfs 你没有任何示例代码吗?我从字面上看,在每个帖子中看到你,但仍然找不到答案
猜你喜欢
  • 2018-07-15
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2010-09-27
相关资源
最近更新 更多