【问题标题】:Make a Python asyncio call from a Flask route从 Flask 路由进行 Python 异步调用
【发布时间】:2018-05-30 05:35:31
【问题描述】:

我想在每次执行 Flask 路由时执行一个异步函数。为什么abar 函数从未执行?

import asyncio
from flask import Flask

async def abar(a):
    print(a)

loop = asyncio.get_event_loop()
app = Flask(__name__)

@app.route("/")
def notify():
    asyncio.ensure_future(abar("abar"), loop=loop)
    return "OK"

if __name__ == "__main__":
    app.run(debug=False, use_reloader=False)
    loop.run_forever()

我还尝试将阻塞调用放在单独的线程中。但它仍然没有调用abar 函数。

import asyncio
from threading import Thread
from flask import Flask

async def abar(a):
    print(a)

app = Flask(__name__)

def start_worker(loop):
    asyncio.set_event_loop(loop)
    try:
        loop.run_forever()
    finally:
        loop.close()

worker_loop = asyncio.new_event_loop()
worker = Thread(target=start_worker, args=(worker_loop,))

@app.route("/")
def notify():
    asyncio.ensure_future(abar("abar"), loop=worker_loop)
    return "OK"

if __name__ == "__main__":
    worker.start()
    app.run(debug=False, use_reloader=False)

【问题讨论】:

  • app.runloop.run_forever 都处于阻塞状态。使用线程可能会更好。如果你需要使用 asyncio,你应该研究在它之上构建的类似 Flask 的框架之一。
  • @dim 非常感谢。我试图将一个阻塞移动到一个单独的线程中。 S. 我编辑的问题!

标签: python flask python-asyncio


【解决方案1】:

您可以将一些异步功能合并到 Flask 应用程序中,而无需将它们完全转换为 asyncio。

import asyncio
from flask import Flask

async def abar(a):
    print(a)

loop = asyncio.get_event_loop()
app = Flask(__name__)

@app.route("/")
def notify():
    loop.run_until_complete(abar("abar"))
    return "OK"

if __name__ == "__main__":
    app.run(debug=False, use_reloader=False)

这将阻止 Flask 响应,直到异步函数返回,但它仍然允许您做一些聪明的事情。我已经使用此模式使用aiohttp 并行执行许多外部请求,然后当它们完成后,我又回到传统的烧瓶中进行数据处理和模板渲染。

import aiohttp
import asyncio
import async_timeout
from flask import Flask

loop = asyncio.get_event_loop()
app = Flask(__name__)

async def fetch(url):
    async with aiohttp.ClientSession() as session, async_timeout.timeout(10):
        async with session.get(url) as response:
            return await response.text()

def fight(responses):
    return "Why can't we all just get along?"

@app.route("/")
def index():
    # perform multiple async requests concurrently
    responses = loop.run_until_complete(asyncio.gather(
        fetch("https://google.com/"),
        fetch("https://bing.com/"),
        fetch("https://duckduckgo.com"),
        fetch("http://www.dogpile.com"),
    ))

    # do something with the results
    return fight(responses)

if __name__ == "__main__":
    app.run(debug=False, use_reloader=False)

【讨论】:

  • 由于在生产环境中使用诸如 gevent、meinheld 或 eventlet 之类的异步工作程序运行 Flask 是很典型的,我认为重要的是要注意此解决方案会阻塞 gevent/meinheld/eventlet 事件循环.这反过来会否定使用它们的一些优势。
  • 使用 aiohttp wsgi worker 会发生什么? aiohttp-wsgi.readthedocs.io/en/stable/index.html。即使这样,worker中的事件循环也会被阻塞吗?
  • 你的例子给了我RuntimeError: There is no current event loop in thread 'Thread-1'.。重现:1)我已将您的 sn-p 保存到 soexamp.py; 2)跑python soexamp.py; 3) 比`curl localhost:5000/`。我的 flask.__version__ 是 '1.0.2' 而 aiohttp.__version__ 是 '3.5.4'。
  • 这是不是线程安全的,你不能简单地在任意线程中使用loop.run_until_complete()。异步循环是线程特定的。任何现实生活中的 WSGI 部署都将使用线程。而不是调用asyncio.get_event_loop(),您必须创建一个新的事件循环每个线程。这……不过有点矫枉过正。
  • @ravimalhotra:非线程安全意味着事情可能会中断,因为多个线程正在更改相同的数据结构,除非您考虑线程。除了一些explicitly documented functions,异步事件循环实现不是线程安全的。此处的代码不会为每个线程创建一个新的事件循环,也不会正确地将协程传递给单个线程。请注意,我还发布了这个问题的答案,可以更好地解决这些问题。
【解决方案2】:

解决您的问题的一个更简单的方法(在我的偏见中)是从 Flask 切换到 Quart。如果是这样,您的 sn-p 简化为,

import asyncio
from quart import Quart

async def abar(a):
    print(a)

app = Quart(__name__)

@app.route("/")
async def notify():
    await abar("abar")
    return "OK"

if __name__ == "__main__":
    app.run(debug=False)

如其他答案中所述,Flask 应用程序运行被阻塞,并且不与异步循环交互。另一方面,Quart 是基于 asyncio 构建的 Flask API,因此它应该可以按照您的预期工作。

同样作为更新,Flask-Aiohttp 不再是 maintained

【讨论】:

  • 我有几个具有同步/阻塞功能的库当我切换到 quart 时会发生什么?当我调用这些库中的函数时,它会阻塞事件循环吗?
  • 是的,他们会阻止。您可以使用asyncio.run_in_executor 包装对这些函数的调用并等待(默认情况下在另一个线程中运行这些函数)。或者,您可以切换到基于 asyncio 的替代库。
  • 我很抱歉投了反对票,但是告诉您在希望能够触发后台任务时切换整个框架的答案并没有真正的帮助
  • Quart 是一个不错的建议,但您的回答实际上并没有正确解决这个问题,因为您 await OP 想要异步发生的调用,独立于服务器响应。
【解决方案3】:

您的错误是在调用app.run() 后尝试运行异步事件循环。后者不返回,而是运行 Flask 开发服务器。

事实上,这就是大多数 WSGI 设置的工作方式;要么主线程忙于分派请求,要么 Flask 服务器作为模块导入到 WSGI 服务器中,而您无法在此处启动事件循环要么

您将不得不在单独的线程中运行您的异步事件循环,然后通过asyncio.run_coroutine_threadsafe() 在该单独的线程中运行您的协程。请参阅文档中的 Coroutines and Multithreading section 了解这意味着什么。

这是一个模块的实现,它将运行这样一个事件循环线程,并为您提供了安排协同程序在该循环中运行的实用程序:

import asyncio
import itertools
import threading

__all__ = ["EventLoopThread", "get_event_loop", "stop_event_loop", "run_coroutine"]

class EventLoopThread(threading.Thread):
    loop = None
    _count = itertools.count(0)

    def __init__(self):
        self.started = threading.Event()
        name = f"{type(self).__name__}-{next(self._count)}"
        super().__init__(name=name, daemon=True)

    def __repr__(self):
        loop, r, c, d = self.loop, False, True, False
        if loop is not None:
            r, c, d = loop.is_running(), loop.is_closed(), loop.get_debug()
        return (
            f"<{type(self).__name__} {self.name} id={self.ident} "
            f"running={r} closed={c} debug={d}>"
        )

    def run(self):
        self.loop = loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop.call_later(0, self.started.set)

        try:
            loop.run_forever()
        finally:
            try:
                shutdown_asyncgens = loop.shutdown_asyncgens()
            except AttributeError:
                pass
            else:
                loop.run_until_complete(shutdown_asyncgens)
            try:
                shutdown_executor = loop.shutdown_default_executor()
            except AttributeError:
                pass
            else:
                loop.run_until_complete(shutdown_executor)
            asyncio.set_event_loop(None)
            loop.close()

    def stop(self):
        loop, self.loop = self.loop, None
        if loop is None:
            return
        loop.call_soon_threadsafe(loop.stop)
        self.join()

_lock = threading.Lock()
_loop_thread = None

def get_event_loop():
    global _loop_thread

    if _loop_thread is None:
        with _lock:
            if _loop_thread is None:
                _loop_thread = EventLoopThread()
                _loop_thread.start()
                # give the thread up to a second to produce a loop
                _loop_thread.started.wait(1)

    return _loop_thread.loop

def stop_event_loop():
    global _loop_thread
    with _lock:
        if _loop_thread is not None:
            _loop_thread.stop()
            _loop_thread = None

def run_coroutine(coro):
    """Run the coroutine in the event loop running in a separate thread

    Returns a Future, call Future.result() to get the output

    """
    return asyncio.run_coroutine_threadsafe(coro, get_event_loop())

您可以使用此处定义的run_coroutine() 函数来安排异步例程。使用返回的Future instance控制协程:

  • 使用Future.result() 获取结果。你可以给它一个超时时间;如果在超时时间内没有产生结果,则协程自动取消。
  • 您可以使用.cancelled().running().done()方法查询协程的状态。
  • 您可以向未来添加回调,它将在协程完成、取消或引发异常时调用(考虑到这可能会从事件循环线程调用,而不是从你打电话给run_coroutine())。

对于您的具体示例,abar() 不返回任何结果,您可以忽略返回的未来,如下所示:

@app.route("/")
def notify():
    run_coroutine(abar("abar"))
    return "OK"

请注意,在 Python 3.8 之前,您不能使用在单独线程上运行的事件循环来创建子进程!请参阅我对 Python3 Flask asyncio subprocess in route hangs 的回答,了解 Python 3.8 ThreadedChildWatcher 类的反向移植,以了解此问题的解决方法。

【讨论】:

  • 假设我们在 abar() 函数中进行递归异步调用。如果abar() 调用另一个async 函数,例如:async def abar_1,我们应该调用run_coroutine(abar_1()) 还是await abar_1()。如果abart_1() 调用另一个异步函数等等,会不会一样?我有一个库,它作为 await func() 定义,据我了解,我必须将所有内容转换为 run_coroutine(func()) 格式,以便它们与您的代码一起工作。可以为他们提供一个 wrapper() 函数吗?
  • @alper 你在这里不是在谈论递归,只是普通的异步调用。通常你会在其他协程上 await 或者创建一个任务对象来同时运行其他协程。见Coroutines and Tasks。我的答案中的代码仅用于将 asyncio 与 Flask 集成,一旦进入事件循环使用异步编程技术
【解决方案4】:

出于同样的原因,您不会看到此打印:

if __name__ == "__main__":
    app.run(debug=False, use_reloader=False)
    print('Hey!')
    loop.run_forever()

loop.run_forever() 永远不会被调用,因为 @dirn 已经注意到 app.run 也处于阻塞状态。

运行全局阻塞事件循环 - 是您可以运行 asyncio 协程和任务的唯一方法,但它与运行阻塞 Flask 应用程序(或任何其他类似的东西)不兼容。

如果您想使用异步 Web 框架,您应该选择一个创建为异步的。例如,现在最流行的可能是aiohttp

from aiohttp import web


async def hello(request):
    return web.Response(text="Hello, world")


if __name__ == "__main__":
    app = web.Application()
    app.router.add_get('/', hello)
    web.run_app(app)  # this runs asyncio event loop inside

更新:

关于您尝试在后台线程中运行事件循环。我没有进行太多调查,但似乎问题与踏面安全有关:许多 asyncio 对象不是线程安全的。如果您以这种方式更改代码,它将起作用:

def _create_task():
    asyncio.ensure_future(abar("abar"), loop=worker_loop)

@app.route("/")
def notify():
    worker_loop.call_soon_threadsafe(_create_task)
    return "OK"

但同样,这是一个非常糟糕的主意。这不仅非常不方便,而且我想也没有多大意义:如果你要使用线程来启动 asyncio,为什么不用just use threads in Flask 而不是 asyncio?您将拥有所需的 Flask 和并行化。

如果我还没有说服你,至少看看Flask-aiohttp 项目。它与 Flask api 很接近,我认为比你想要做的更好。

【讨论】:

  • 非常感谢您的解释。这就说得通了。它也是一个不错的小型 aiohttp 示例。不幸的是,我被绑定到烧瓶/烧瓶,要求 alexa 技能。我修改了我原来的问题,并在一个单独的线程中移动了一个阻塞调用。但仍然没有运气
  • 使用 Flask 运行 asyncio 循环是一个绝妙的主意,而且完全没有问题,只要你小心一点。线程和异步协程有非常不同的优缺点,当执行大量阻塞 I/O 时,asyncio 比线程更可取。
【解决方案5】:

正如@Martijn Pieters@Mikhail Gerasimov 在其他答案中已经解释的那样,主要问题是app.run 被阻塞,所以loop.run_forever() 线永远不会被调用。您将需要在单独的线程上手动设置和维护运行循环。

幸运的是,使用 Flask 2.0,您不再需要创建、运行和管理自己的事件循环。您可以将您的路由定义为async def,并在您的路由函数中的协程上直接定义await

https://flask.palletsprojects.com/en/2.0.x/async-await/

使用异步和等待

2.0 版中的新功能。

路由、错误处理程序、请求前、请求后和拆卸 如果安装了 Flask,函数都可以是协程函数 async 额外 (pip install flask[async])。它需要 Python 3.7+ 其中contextvars.ContextVar 可用。这允许视图是 使用async def 定义并使用await

Flask 将负责为每个请求创建事件循环。您所要做的就是定义您的协程并在它们上await 来完成:

https://flask.palletsprojects.com/en/2.0.x/async-await/#performance

性能

异步函数需要事件循环才能运行。 Flask,作为 WSGI 应用程序,使用一名工作人员处理一个请求/响应周期。 当请求进入异步视图时,Flask 将启动一个事件 在一个线程中循环,在那里运行视图函数,然后返回结果。

即使对于异步视图,每个请求仍会占用一名工作人员。这 好处是您可以在视图中运行异步代码,例如 进行多个并发数据库查询,HTTP 请求到一个 外部 API 等。但是,您的应用程序的请求数 一次可以处理的将保持不变。

从问题中调整原始示例:

import asyncio
from flask import Flask, jsonify

async def send_notif(x: int):
    print(f"Called coro with {x}")
    await asyncio.sleep(1)
    return {"x": x}

app = Flask(__name__)

@app.route("/")
async def notify():
    futures = [send_notif(x) for x in range(5)]
    results = await asyncio.gather(*futures)

    response = list(results)
    return jsonify(response)

# The recommended way now is to use `flask run`.
# See: https://flask.palletsprojects.com/en/2.0.x/cli/
# if __name__ == "__main__":
#     app.run(debug=False, use_reloader=False)
$ time curl -s -XGET 'http://localhost:5000'
[{"x":0},{"x":1},{"x":2},{"x":3},{"x":4}]


real    0m1.016s
user    0m0.005s
sys     0m0.006s

使用asyncio 的最常见配方可以以相同的方式应用。需要注意的一件事是,从 Flask 2.0.1 开始,我们不能使用 asyncio.create_task 来生成后台任务:

https://flask.palletsprojects.com/en/2.0.x/async-await/#background-tasks

异步函数将在事件循环中运行,直到它们完成,此时 stage 事件循环将停止。这意味着任何额外产生的 异步功能完成时尚未完成的任务将是 取消。因此,您不能生成后台任务,例如 通过asyncio.create_task

如果您希望使用后台任务,最好使用任务队列 触发后台工作,而不是在视图函数中生成任务。

除了 create_task 的限制之外,它应该适用于您想要进行异步数据库查询或多次调用外部 API 的用例。

【讨论】:

    猜你喜欢
    • 2016-08-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-01-07
    • 2020-11-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多