您的错误是在调用app.run() 后尝试运行异步事件循环。后者不返回,而是运行 Flask 开发服务器。
事实上,这就是大多数 WSGI 设置的工作方式;要么主线程忙于分派请求,要么 Flask 服务器作为模块导入到 WSGI 服务器中,而您无法在此处启动事件循环要么。
您将不得不在单独的线程中运行您的异步事件循环,然后通过asyncio.run_coroutine_threadsafe() 在该单独的线程中运行您的协程。请参阅文档中的 Coroutines and Multithreading section 了解这意味着什么。
这是一个模块的实现,它将运行这样一个事件循环线程,并为您提供了安排协同程序在该循环中运行的实用程序:
import asyncio
import itertools
import threading
__all__ = ["EventLoopThread", "get_event_loop", "stop_event_loop", "run_coroutine"]
class EventLoopThread(threading.Thread):
loop = None
_count = itertools.count(0)
def __init__(self):
self.started = threading.Event()
name = f"{type(self).__name__}-{next(self._count)}"
super().__init__(name=name, daemon=True)
def __repr__(self):
loop, r, c, d = self.loop, False, True, False
if loop is not None:
r, c, d = loop.is_running(), loop.is_closed(), loop.get_debug()
return (
f"<{type(self).__name__} {self.name} id={self.ident} "
f"running={r} closed={c} debug={d}>"
)
def run(self):
self.loop = loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.call_later(0, self.started.set)
try:
loop.run_forever()
finally:
try:
shutdown_asyncgens = loop.shutdown_asyncgens()
except AttributeError:
pass
else:
loop.run_until_complete(shutdown_asyncgens)
try:
shutdown_executor = loop.shutdown_default_executor()
except AttributeError:
pass
else:
loop.run_until_complete(shutdown_executor)
asyncio.set_event_loop(None)
loop.close()
def stop(self):
loop, self.loop = self.loop, None
if loop is None:
return
loop.call_soon_threadsafe(loop.stop)
self.join()
_lock = threading.Lock()
_loop_thread = None
def get_event_loop():
global _loop_thread
if _loop_thread is None:
with _lock:
if _loop_thread is None:
_loop_thread = EventLoopThread()
_loop_thread.start()
# give the thread up to a second to produce a loop
_loop_thread.started.wait(1)
return _loop_thread.loop
def stop_event_loop():
global _loop_thread
with _lock:
if _loop_thread is not None:
_loop_thread.stop()
_loop_thread = None
def run_coroutine(coro):
"""Run the coroutine in the event loop running in a separate thread
Returns a Future, call Future.result() to get the output
"""
return asyncio.run_coroutine_threadsafe(coro, get_event_loop())
您可以使用此处定义的run_coroutine() 函数来安排异步例程。使用返回的Future instance控制协程:
- 使用
Future.result() 获取结果。你可以给它一个超时时间;如果在超时时间内没有产生结果,则协程自动取消。
- 您可以使用
.cancelled()、.running()和.done()方法查询协程的状态。
- 您可以向未来添加回调,它将在协程完成、取消或引发异常时调用(考虑到这可能会从事件循环线程调用,而不是从你打电话给
run_coroutine())。
对于您的具体示例,abar() 不返回任何结果,您可以忽略返回的未来,如下所示:
@app.route("/")
def notify():
run_coroutine(abar("abar"))
return "OK"
请注意,在 Python 3.8 之前,您不能使用在单独线程上运行的事件循环来创建子进程!请参阅我对 Python3 Flask asyncio subprocess in route hangs 的回答,了解 Python 3.8 ThreadedChildWatcher 类的反向移植,以了解此问题的解决方法。