【问题标题】:How to make tasks in ProcessPoolExecutor behave like daemon process?如何使 ProcessPoolExecutor 中的任务表现得像守护进程?
【发布时间】:2019-05-21 11:39:52
【问题描述】:

Python 3.6.6

代码如下:

import asyncio
import time
from concurrent.futures import ProcessPoolExecutor


executor_processes = ProcessPoolExecutor(2)


def calculate():
    while True:
        print("while")
        time.sleep(1)


async def async_method():
    loop_ = asyncio.get_event_loop()
    loop_.run_in_executor(executor_processes, calculate)
    await asyncio.sleep(1)
    print("finish sleep")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(async_method())
    print("main_thread is finished")

输出:

同时
完成睡眠
main_thread 已完成


...

我希望子进程将被终止,就像使用守护进程属性生成进程时一样:

import asyncio
import time
import multiprocessing


def calculate():
    while True:
        print("while")
        time.sleep(1)


async def async_method():
    proc = multiprocessing.Process(target=calculate)
    proc.daemon = True
    proc.start()
    await asyncio.sleep(1)
    print("finish sleep")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(async_method())
    print("main_thread is finished")

输出:

同时
完成睡眠
main_thread 已完成

问题:如何将loop_.run_in_executor(executor_processes, calculate) 的行为改为“类似守护进程”?

【问题讨论】:

    标签: python python-3.x python-multiprocessing python-asyncio


    【解决方案1】:

    您展示的代码显然只是一个小示例,用于展示您希望实现的目标。我们不知道您的实际任务/问题。但老实说,我不相信你是在正确的道路上。

    ProcessPoolExecutorconcurrent.futures 标准库包的一部分。它在调用submit() 时向调用者返回FutureFuture 是尚未完成的计算结果的代理。这是一个承诺;尽管在这种情况下,该术语在技术上并不完全正确。请参阅Wiki page 了解区别。

    这意味着计算预计在有限时间内完成并产生结果。
    这就是为什么 Python 中的 ThreadPoolExecutorProcessPoolExecutor 实现不允许您产生恶魔工人。要求一个您实际上并不希望实现的结果的承诺没有多大意义。

    你怎样才能实现你的目标?

    1 - 子类 ProcessPoolExecutor?
    您可以拦截新进程的创建和启动以潜入 _adjust_process_count() 中的 p.daemon = True。然而,由于concurrent.futures 的设计并没有考虑到无限期运行的任务,所以这并没有多大帮助。与multiprocessing 不同,concurrent.futures.process 定义了一个不考虑守护进程的exit handler。它只是尝试join() 一切,这可能需要一些时间进行无限循环。

    2 - 定义您自己的退出处理程序!
    您可以同时执行 multiprocessingconcurrent.futures.process 所做的事情:定义一个退出处理程序,当您的 Python 进程即将结束时进行清理去关机。 atexit 可以提供帮助:

    import atexit
    
    executor_processes = ProcessPoolExecutor(2)
    
    def calculate():
        while True:
            print("while")
            time.sleep(1)
    
    def end_processes():
        [proc.terminate() for proc in multiprocessing.active_children()]
    
    async def async_method():
        [...]
    
    if __name__ == '__main__':
        atexit.register(end_processes)
        loop = asyncio.get_event_loop()
        [...]
    

    注意:这将终止所有在进程结束时处于活动状态的子进程。如果您想要优雅地关闭子进程,请保留句柄并在代码中的指令结束之前执行此操作。
    另请注意,进程可以拒绝遵守terminate()kill() 是你最后的手段。

    【讨论】:

      猜你喜欢
      • 2023-03-12
      • 2016-03-31
      • 1970-01-01
      • 2010-11-27
      • 1970-01-01
      • 2014-04-18
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多