【问题标题】:Detect an idle asyncio event loop检测空闲的异步事件循环
【发布时间】:2017-11-18 17:54:52
【问题描述】:

是否有某种编程模式可以让我检测到异步事件循环何时在以下意义上变得空闲?假设我的执行路径以某种复杂的方式分支,例如使用 asyncio.gather(),但我知道每个分支最终都会等待一些空闲的协程,例如套接字或子进程。假设我知道这些协程实际上永远不会产生,所以事件循环将执行它可以执行的任何 python 代码,但最终只会等待那些空闲的协程。有没有一种编程方式来检测这种状态并停止循环?

【问题讨论】:

  • 为什么需要停止循环?如果它在套接字或子进程作业完成之前停止,它将无法将结果传播到等待它的协程。
  • 那些永不屈服的协程有副作用,我想执行。但是一旦所有这些都被执行,我希望应用程序停止。这是应用程序的一个特定用例场景,其中执行路径被简单地中断。
  • 你能提供一些可复现的代码,其中包含这些协程,这些协程永远不会屈服并且你想检测吗?

标签: python-asyncio


【解决方案1】:

正如 Philip Couling 所指出的,下面介绍的解决方案不起作用。 StackOverflow 不允许删除已接受的答案,因此我添加此免责声明。


您所说的“空闲”可能更准确地描述为“等待 IO 或超时”。在正确编写的 asyncio 代码中,不需要检测循环是否处于该状态,因为它不应该重要 - 循环正在完成它的工作,这取决于@987654324 之类的工具@、asyncio.waitloop.run_until_complete 以确保它在适当的时间结束。然而,事情并不总是完美的,如果你真的想这样做,那当然是可能的。

在事件循环的每一步,它都会检查准备好运行的任务。如果有,则调用它们的步骤。一旦没有更多任务准备就绪,事件循环将等待 IO 事件或最早的超时,以先发生者为准。需要注意的重要一点是,运行任务总是优先于等待 IO。因此,为了检测没有任务准备好的情况,可以安排一个已知立即触发的虚拟 IO 事件。

下面的协程设置这样一个事件并等待它触发:

import socket, asyncio

async def detect_iowait():
    loop = asyncio.get_event_loop()
    rsock, wsock = socket.socketpair()
    wsock.close()
    await loop.sock_recv(rsock, 1)
    rsock.close()

它设置了一个socket pair,从一个套接字读取返回写入另一个套接字的数据。它立即关闭其中一个套接字,以便从另一个套接字读取立即返回 EOF,表示为一个空字节数组。等待从该套接字读取基本上是非阻塞的 - 但asyncio 不知道这一点,因此它将套接字放在 IO 等待列表中。如上所述,一旦没有可运行的任务存在,asyncio 将等待 IO,detect_iowait 将等待套接字上的读取并退出。因此 awaiting detect_iowait() 本身会检测到 IO 等待。

使用detect_iowait() 的测试代码可能如下所示:

# stop loop.run_forever once iowait is detected
async def stop_on_iowait():
    await detect_iowait()
    print('iowait detected, stopping!')
    asyncio.get_event_loop().stop()

# a dummy calculation coroutine, emulating your execution path
async def calc(n):
    print('calc %d start' % n)
    async def noop():
        pass
    for i in range(n):
        await noop()
    print('calc %d end' % n)

# coroutine that waits on IO forever, also (ab)using a socket pair,
# this time creating a socket whose recv will never complete
async def io_forever():
    loop = asyncio.get_event_loop()
    sock, _ = socket.socketpair()
    sock.setblocking(False)
    await loop.sock_recv(sock, 1)

loop = asyncio.get_event_loop()
for t in calc(1000), calc(10000), calc(100000), io_forever():
    loop.create_task(t)
loop.create_task(stop_on_iowait())
loop.run_forever()

【讨论】:

猜你喜欢
  • 2017-07-07
  • 1970-01-01
  • 2012-07-05
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-04-10
  • 2018-10-17
相关资源
最近更新 更多