【问题标题】:asynchronous subprocess Popen python 3.5异步子进程Popen python 3.5
【发布时间】:2017-02-03 00:54:40
【问题描述】:

我正在尝试从子进程异步运行 Popen 命令,以便我可以在后台运行其他东西。

import subprocess
import requests
import asyncio
import asyncio.subprocess    

    async def x(message):
        if len(message.content.split()) > 1:
            #output = asyncio.create_subprocess_shell(message.content[3:], shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT)
            output = subprocess.Popen(message.content[3:], shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT)
            return output.communicate()[0].decode('utf-8')

我试图了解https://docs.python.org/3/library/asyncio-subprocess.html,但我不确定协议工厂是什么。

【问题讨论】:

  • 有什么问题?
  • 这不会异步运行。它等待这个过程完成,但我希望它同时运行其他东西。所以如果我先调用这个函数,然后调用另一个函数,这个函数大约需要 20 秒,另一个大约需要 2 秒。第二个函数必须等待 20 秒才能运行。
  • 你有理由避免使用多线程吗?
  • 你是什么意思?
  • 你可以使用多处理或多线程来做同样的事情,显然看起来更容易......

标签: python asynchronous subprocess python-3.5


【解决方案1】:

使用 python 3.5 测试。有问题就问吧。

import threading
import time
import subprocess
import shlex
from sys import stdout


# Only data wihtin a class are actually shared by the threads.
# Let's use a class as communicator (there could be problems if you have more than
# a single thread)
class Communicator(object):
    counter = 0
    stop = False
    arg = None
    result = None

# Here we can define what you want to do. There are other methods to do that
# but this is the one I prefer.
class ThreadedFunction(threading.Thread):

    def run(self, *args, **kwargs):
        super().run()
        command = c.arg

        # Here what you want to do...
        command = shlex.split(command)
        print(time.time()) # this is just to check that the command (sleep 5) is executed
        output = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
        print('\n',time.time())
        c.result = output
        if c.stop: return None # This is useful only within loops within threads

# Create a class instance
c = Communicator()
c.arg = 'time sleep 5' # Here I used the 'time' only to have some output

# Create the thread and start it
t = ThreadedFunction()
t.start() # Start the thread and do something else...

# ...for example count the seconds in the mean time..
try:
    for j in range(100):
        c.counter += 1
        stdout.write('\r{:}'.format(c.counter))
        stdout.flush()
        time.sleep(1)
        if c.result != None:
            print(c.result)
            break
except:
    c.stop = True

【讨论】:

  • 我如何从中获得输出?
  • 输出在c.result
【解决方案2】:

这个比较简单,我是在另一个回复之后发现的,反正很有趣……所以我离开了。

import time
import subprocess
import shlex
from sys import stdout


command = 'time sleep 5' # Here I used the 'time' only to have some output

def x(command):
    cmd = shlex.split(command)
    p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    return p

# Start the subprocess and do something else...
p = x(command)
# ...for example count the seconds in the mean time..

try: # This take care of killing the subprocess if problems occur
    for j in range(100):
        stdout.write('\r{:}'.format(j))
        stdout.flush()
        time.sleep(1)
        if p.poll() != None:
            print(p.communicate())
            break
except:
    p.terminate() # or p.kill()

当后台进程运行sleep 命令时,python 脚本在标准输出上打印计数器值这一事实可以明显看出异步性。 python 脚本在打印 bash time 命令的输出约 5 秒后退出,同时打印计数器的事实证明该脚本有效。

【讨论】:

  • 我试过这个,但它似乎不是异步的。我不得不不断等待子流程完成。
  • @JinyangLiu 一旦你运行p=x(command),那么脚本可以继续,而子进程将在后台运行。一旦您需要后台进程的输出,您只需运行p.communicate(),python 脚本将等待命令结束(如果尚未完成)。在某个时刻,您需要将脚本与子流程重新加入。如果您需要在子进程完成之前重新加入它们,则脚本必须等待。
  • 是的,我明白了。我发现它对我来说没有异步运行的原因是 .communicate() 部分。如果我删除它。它会打印出类似<subprocess.Popen object at 0x7f60b99d8278> 的内容。它不显示输出,但让其他程序异步运行。
  • docs.python.org/3/library/asyncio-subprocess.html,也许这对我有帮助?但我不明白什么是协议工厂。
  • @JinyangLiu 子进程是这样​​工作的: 1. Popen 的创建在后台启动一个进程; 2.你可以在后台进程运行时做你想做的事; 3. 当您需要后台进程的结果时,您可以使用通信(就像我在上一个示例中所做的那样)。 4. 如果后台进程还没有结束,通信会等到进程结束。 ---- 我所做的是一个异步后台进程。你到底需要什么?
【解决方案3】:

我最终找到了我的问题的答案,它使用了异步。 http://pastebin.com/Zj8SK1CG

【讨论】:

  • 这更像是一个 hack,等待一秒钟然后检查进程是否已经完成并不是很干净,但是以某种方式完成了工作......
【解决方案4】:

当我来到这个问题时,我期望答案是真正使用 asyncio 进行进程间通信。

我发现以下资源很有用: https://github.com/python/asyncio/blob/master/examples/child_process.py

下面是我的简化示例(使用 3.5+ async/await 语法),它读取行并将它们排序输出:

import asyncio

from subprocess import Popen, PIPE


async def connect_write_pipe(file):
    """Return a write-only transport wrapping a writable pipe"""
    loop = asyncio.get_event_loop()
    transport, _ = await loop.connect_write_pipe(asyncio.Protocol, file)
    return transport


async def connect_read_pipe(file):
    """Wrap a readable pipe in a stream"""
    loop = asyncio.get_event_loop()
    stream_reader = asyncio.StreamReader(loop=loop)

    def factory():
        return asyncio.StreamReaderProtocol(stream_reader)

    transport, _ = await loop.connect_read_pipe(factory, file)
    return stream_reader, transport


async def main(loop):
    # start subprocess and wrap stdin, stdout, stderr
    p = Popen(['/usr/bin/sort'], stdin=PIPE, stdout=PIPE, stderr=PIPE)

    stdin = await connect_write_pipe(p.stdin)
    stdout, stdout_transport = await connect_read_pipe(p.stdout)
    stderr, stderr_transport = await connect_read_pipe(p.stderr)

    # interact with subprocess
    name = {stdout: 'OUT', stderr: 'ERR'}
    registered = {
        asyncio.Task(stderr.read()): stderr,
        asyncio.Task(stdout.read()): stdout
    }

    to_sort = b"one\ntwo\nthree\n"
    stdin.write(to_sort)
    stdin.close()  # this way we tell we do not have anything else

    # get and print lines from stdout, stderr
    timeout = None
    while registered:
        done, pending = await asyncio.wait(
            registered, timeout=timeout,
            return_when=asyncio.FIRST_COMPLETED)
        if not done:
            break
        for f in done:
            stream = registered.pop(f)
            res = f.result()
            if res != b'':
                print(name[stream], res.decode('ascii').rstrip())
                registered[asyncio.Task(stream.read())] = stream
        timeout = 0.0

    stdout_transport.close()
    stderr_transport.close()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main(loop))
    finally:
        loop.close()

NB:不采取特殊措施,写入管道的数据量是有限的。在我的系统中,可以在用完管道缓冲区之前写入超过 700000 个字节。

还有其他例子,使用create_subprocess_shell

我还没有在实际项目中使用过 asyncio,所以欢迎在 cmets 中提出改进建议。

【讨论】:

    【解决方案5】:

    这是正确的方法......!使用

    异步/等待

    在 Python - 3.X [Windows, MacOS] 上测试过

    import asyncio  from asyncio.subprocess import PIPE, STDOUT  import subprocess  import signal
    
    
    def signal_handler(signal, frame):
        loop.stop()
        client.close()
        sys.exit(0)
    
    async def run_async(loop = ''):
        cmd = 'sudo long_running_cmd --opt1=AAAA --opt2=BBBB'
    
        print ("[INFO] Starting script...")
        await asyncio.create_subprocess_shell(cmd1, stdin = PIPE, stdout = PIPE, stderr = STDOUT)
        print("[INFO] Script is complete.")
    
    
    loop = asyncio.get_event_loop()  signal.signal(signal.SIGINT, signal_handler)  tasks = [loop.create_task(run_async())]  wait_tasks = asyncio.wait(tasks)  loop.run_until_complete(wait_tasks)
    
    loop.close()
    

    核心逻辑:

    process = await asyncio.create_subprocess_shell(cmd1, stdin = PIPE, stdout PIPE, stderr = STDOUT)
    await process.wait()
    

    【讨论】:

      猜你喜欢
      • 2021-01-27
      • 1970-01-01
      • 2017-11-22
      • 2018-12-05
      • 2014-09-15
      • 1970-01-01
      • 2012-03-03
      • 2011-08-05
      • 2017-08-22
      相关资源
      最近更新 更多