【问题标题】:How can I have a synchronous facade over asyncpg APIs with Python asyncio?如何使用 Python asyncio 在 asyncpg API 上实现同步外观?
【发布时间】:2021-10-25 15:32:08
【问题描述】:

想象一个异步 aiohttp Web 应用程序,它由通过 asyncpg 连接的 Postgresql 数据库支持并且不执行其他 I/O。如何让中间层托管应用程序逻辑,即 not 异步? (我知道我可以简单地使一切异步 - 但想象我的应用程序具有 大量 应用程序逻辑,仅受数据库 I/O 约束,我无法触及它的所有内容。

伪代码:

async def handler(request):
    # call into layers over layers of application code, that simply emits SQL
    ...

def application_logic():
    ...
    # This doesn't work, obviously, as await is a syntax
    # error inside synchronous code.
    data = await asyncpg_conn.execute("SQL")
    ...
    # What I want is this:
    data = asyncpg_facade.execute("SQL")
    ...

如何构建asyncpg 上的同步外观,以允许应用程序逻辑进行数据库调用?使用async.run()asyncio.run_coroutine_threadsafe() 等的配方在这种情况下不起作用,因为我们来自已经异步的上下文。我认为这不可能,因为已经有一个事件循环原则上可以运行 asyncpg 协程。

额外问题:使await inside sync 成为语法错误的设计原理是什么?允许await 来自任何源自协程的上下文不是很有用吗,这样我们就可以通过简单的方法将应用程序分解为功能构建块?

编辑 额外奖励:除了Paul's very good answer,留在“安全区”内,我会对避免阻塞主线程的解决方案感兴趣(导致更多gevent-ish) .另请参阅我对保罗回答的评论......

【问题讨论】:

  • 您是否正在寻找不在“安全区”内的答案?您显然已经知道几种可能的解决方案,但您还是问了这个问题。我不知道你想要什么。您是否想要一个无需使用 async/await 语法即可执行任务切换的程序?这不完全是你问的,但这就是“不阻塞事件循环”的含义。即使您可以这样做,您也会遇到任何功能都可能执行任务切换的情况。这难道不是多线程的缺点而没有好处吗?
  • @PaulCornelius 当我问的时候,我一直在寻找任何答案,因为我也环顾四周,找到了我提到的东西。您的回答非常有帮助,因为从一开始我就不清楚我需要在线程中使用单独的循环。现在我尝试了解所有选项。
  • 很难看出将异步外观放在同步代码上会如何做除了阻塞之外的任何事情。如果迁移路径是到 asyncio(对于 IO 绑定调用恕我直言,这是一个很好的决定),那么在一个异步函数中同步运行大型同步代码来排队任务并等待由 async pg 队列处理的响应似乎是可行的。我尝试了类似stackoverflow.com/a/69175259/6242321 的方法,尽管您可能会在我使用 (4) 的地方使用 Semaphore(1)。

标签: python python-asyncio aiohttp asyncpg


【解决方案1】:

您需要创建一个辅助线程来运行您的异步代码。您使用自己的事件循环初始化辅助线程,该循环永远运行。通过调用 run_coroutine_threadsafe() 并在返回的对象上调用 result() 来执行每个异步函数。那是 concurrent.futures.Future 的一个实例,它的 result() 方法在协程的结果从辅助线程准备好之前不会返回。

然后,您的主线程实际上是在调用每个异步函数,就好像它是一个同步函数一样。在每个函数调用完成之前,主线程不会继续。顺便说一句,你的同步函数是否真的在事件循环上下文中运行并不重要。

对 result() 的调用当然会阻塞主线程的事件循环。如果您想获得从同步代码运行异步函数的效果,这是无法避免的。

不用说,这是一件丑陋的事情,它暗示了错误的程序结构。但是您正在尝试转换旧程序,这可能会有所帮助。

import asyncio
import threading
from datetime import datetime

def main():
    def thr(loop):
        asyncio.set_event_loop(loop)
        loop.run_forever()
    
    loop = asyncio.new_event_loop()
    t = threading.Thread(target=thr, args=(loop, ), daemon=True)
    t.start()

    print("Hello", datetime.now())
    t1 = asyncio.run_coroutine_threadsafe(f1(1.0), loop).result()
    t2 = asyncio.run_coroutine_threadsafe(f1(2.0), loop).result()
    print(t1, t2)
 

if __name__ == "__main__":
    main()

>>> Hello 2021-10-26 20:37:00.454577
>>> Hello 1.0 2021-10-26 20:37:01.464127
>>> Hello 2.0 2021-10-26 20:37:03.468691
>>> 1.0 2.0

【讨论】:

  • 谢谢,这是一个很好的答案!不过,对于那些试图逐步升级现有代码库的人来说,阻塞主线程是一个严重的限制。知道人们对周围浮动的黑客的看法会很有趣,以避免阻塞主线程的事件循环,例如github.com/oremanj/greenbackgithub.com/erdewit/nest_asyncio 或 zzzeek 的 SQLAlchemy 解决方案。我会为此提供赏金,希望人们为此做出贡献
猜你喜欢
  • 1970-01-01
  • 2012-10-22
  • 1970-01-01
  • 2014-09-17
  • 2021-05-04
  • 1970-01-01
  • 2017-05-28
  • 2016-09-03
相关资源
最近更新 更多