【问题标题】:Communicating with child process in Python3 asyncio在 Python3 asyncio 中与子进程通信
【发布时间】:2020-07-04 01:58:45
【问题描述】:

我有一个基于 asyncio 的程序(称之为服务器)。要求是从该服务器中启动其他进程,从该进程获取响应(这些是长时间运行的)并进一步处理/发送整理的响应。

请参阅下面的带有 cmets 的骨架代码,了解我想要实现的目标:

import asyncio
import subprocess

loop = asyncio.get_event_loop()


def logFileProcessorBgThread(fName):
    print("Starting logFileProcessorBgThread")
    f = subprocess.Popen(['tail','-f', LOG_FILE], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    while True:
        line = f.stdout.readline().decode().strip()
        print(line)
        # somehow, send back the line to the start_server


async def start_server():
    # I want to start the logFileProcessorBgThread as a separate process,
    # one for each input file
    files = ["/var/log/syslog", "/tmp/somefile.log"]
    for f in files:
        # start logFileProcessorBgThread(f) as a process

    # need to build a way to receive the updates from the 2 process as an
    # when they have data..

    # my requirement is to bring back the responses to this process, so I can
    # further handle them (in my case send it out on a secure channel that
    # this process has established. Other launched process would not have
    # access to this channel..)



def main():
    loop.run_until_complete(start_server())

if __name__ == '__main__':
    main()

我不清楚启动过程中的这种流式响应将如何工作。

【问题讨论】:

    标签: python-3.x python-asyncio python-multiprocessing


    【解决方案1】:

    您可以通过在事件循环中结合使用ProcessPoolExecutorrun_in_executor(..) 函数来做到这一点。这是您示例的基本调整:

    import asyncio
    import os
    import time
    from concurrent.futures import ProcessPoolExecutor
    
    loop = asyncio.get_event_loop()
    
    
    def process(filename):
        # do some work
        print("{} from process {}".format(filename, os.getpid()))
        time.sleep(2)
        return "response {}".format(filename)
    
    async def start_server():
        # create the process pool executors with 2 workers (# of processes to use)
        pool = ProcessPoolExecutor(2)
        filenames = ["/var/log/syslog", "/tmp/somefile.log"]
    
        # launch them in the pool and then await on the results
        futures = [loop.run_in_executor(pool, process, filename) for filename in filenames]
        for future in futures:
            response = await future
            print("response was '{}'".format(response))
    
    
    def main():
        loop.run_until_complete(start_server())
    
    
    if __name__ == '__main__':
        main()
    

    【讨论】:

    • 在您发布的示例中, process() 正在返回一些东西。我的要求(如果您看到 logFileProcessorBgThread)是它在无限循环中等待,拖尾特定文件。目标是将换行符(它们出现在尾部)传递回父级。
    猜你喜欢
    • 2012-01-28
    • 1970-01-01
    • 2015-02-19
    • 2012-07-07
    • 1970-01-01
    • 1970-01-01
    • 2017-02-15
    • 2014-02-14
    • 1970-01-01
    相关资源
    最近更新 更多