【问题标题】:How to achieve final cleanup when a Python async function is interrupted?Python异步函数中断时如何实现最终清理?
【发布时间】:2021-09-18 23:15:23
【问题描述】:

对于 Python 的协程/异步函数,有一个问题困扰着我: 当内存在这个函数的堆栈框架之外的异步函数中分配时,我们如何清理存储以防故障或中断?我知道 Go 编程语言的 defer 语句,我基本上是在寻找 Python 中的等价物。

让我举一个例子来证明问题存在并使其更具体:

import asyncio


class CallManager:
    def __init__(self):
        self._inputs = {}
        self._next_input_id = 0
        self._ready_event = asyncio.Event()

    async def run_calculation_loop(self):
        while True:
            await asyncio.sleep(1)
            if self._inputs:
                print("Doing some calculation and preparing outputs")
                self._ready_event.set()

    async def process_in_loop(self, arg):
        my_input_id = self._next_input_id
        self._next_input_id += 1

        self._inputs[my_input_id] = arg
        await self._ready_event.wait()

        # This del only happens if the call is not interrupted
        del self._inputs[my_input_id]


async def good_case():
    cm = CallManager()
    calculation_loop = asyncio.create_task(cm.run_calculation_loop())
    print("Buffer size:", len(cm._inputs))
    mytask = asyncio.create_task(cm.process_in_loop("data"))
    await asyncio.sleep(0.1)
    print("Buffer size:", len(cm._inputs))
    await mytask
    print("Buffer size:", len(cm._inputs))
    calculation_loop.cancel()


async def memory_leak():
    cm = CallManager()
    calculation_loop = asyncio.create_task(cm.run_calculation_loop())
    print("Buffer size:", len(cm._inputs))
    mytask = asyncio.create_task(cm.process_in_loop("data"))
    await asyncio.sleep(0.1)
    print("Buffer size:", len(cm._inputs))
    mytask.cancel()
    print("Buffer size:", len(cm._inputs))
    calculation_loop.cancel()
    print("Buffer size:", len(cm._inputs))


print("Good case:")
asyncio.run(good_case())

print("")
print("Memory leak:")
asyncio.run(memory_leak())

输出:

Good case:
Buffer size: 0
Buffer size: 1
Doing some calculation and preparing outputs
Buffer size: 0

Memory leak:
Buffer size: 0
Buffer size: 1
Buffer size: 1
Buffer size: 1

在第一种情况下,“好”的情况,process_in_loop() 被调用,并有机会完成并清理内部字典中的项目。

在第二种“内存泄漏”的情况下,内部缓冲区永远不会被清理,因为process_in_loop() 在完成之前就被取消了。

在实际代码中,调用者不知道内部状态,最初负责的协程可能会在中间取消。 这个问题在这种情况下可能看起来有点人为,但实际上我在实际代码中遇到了一个非常相似的问题,只是将其归结为问题。

【问题讨论】:

  • 调用者不知道内部状态,但无论进程调用取消,它似乎。难道不能有一个自定义的cancel() 方法来检查缓冲区大小并在取消时清理它吗?
  • 是的,没错。但是取消只是创建工作示例的最简单方法。我更担心由于超时、异常等可能发生的意外中断。我不喜欢依赖一切正常,但想确保清洁总是发生。
  • 如果后端取消产生异常,您可以使用类似 Promise 的设置(如对这个 SO 问题的回答:stackoverflow.com/questions/43325501/…

标签: python python-asyncio


【解决方案1】:

阅读了一些类似的问题后,我再次意识到,这些问题通常可以通过 Python 中的上下文管理器来解决。确实可以创建一个小型上下文管理器,以确保最终释放资源:

import asyncio
from contextlib import contextmanager


class CallManager:
    def __init__(self):
        self._inputs = {}
        self._next_input_id = 0
        self._ready_event = asyncio.Event()

    async def run_calculation_loop(self):
        while True:
            await asyncio.sleep(1)
            if self._inputs:
                print("Doing some calculation and preparing outputs")
                self._ready_event.set()

    async def process_in_loop(self, arg):
        with self._store_input(arg):
            await self._ready_event.wait()

    @contextmanager
    def _store_input(self, arg):
        local_id = self._next_input_id
        self._inputs[local_id] = arg
        self._next_input_id += 1
        try:
            yield
        finally:
            del self._inputs[local_id]


async def good_case():
    cm = CallManager()
    calculation_loop = asyncio.create_task(cm.run_calculation_loop())
    print("Buffer size:", len(cm._inputs))
    mytask = asyncio.create_task(cm.process_in_loop("data"))
    await asyncio.sleep(0.1)
    print("Buffer size:", len(cm._inputs))
    await mytask
    print("Buffer size:", len(cm._inputs))
    calculation_loop.cancel()


async def memory_leak():
    cm = CallManager()
    calculation_loop = asyncio.create_task(cm.run_calculation_loop())
    print("Buffer size:", len(cm._inputs))
    mytask = asyncio.create_task(cm.process_in_loop("data"))
    await asyncio.sleep(0.1)
    print("Buffer size:", len(cm._inputs))
    mytask.cancel()
    print("Buffer size:", len(cm._inputs))
    calculation_loop.cancel()
    await asyncio.sleep(0.1)
    print("Buffer size:", len(cm._inputs))


print("Good case:")
asyncio.run(good_case())

print("")
print("Memory leak:")
asyncio.run(memory_leak())

现在输出:

Good case:
Buffer size: 0
Buffer size: 1
Doing some calculation and preparing outputs
Buffer size: 0

Memory leak:
Buffer size: 0
Buffer size: 1
Buffer size: 1
Buffer size: 0

所以资源最终在上下文管理器的 finally 块中被清理。这也应该涵盖其他中断原因,例如正常异常。

【讨论】:

    猜你喜欢
    • 2020-06-19
    • 1970-01-01
    • 1970-01-01
    • 2020-04-30
    • 2020-02-09
    • 2022-09-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多