【问题标题】:Converting small functions to coroutines将小函数转换为协程
【发布时间】:2019-09-15 09:01:11
【问题描述】:

我觉得我对异步 IO 的理解存在差距:在更大的协程范围内,将小函数包装到协程中是否有好处?这在发信号方面有好处吗?事件循环正确吗?这种好处的程度是否取决于被包装的函数是 IO 还是 CPU 密集型的?

示例:我有一个协程,download(),它:

  1. 通过 aiohttp 从 HTTP 端点下载 JSON 序列化字节。
  2. 通过bz2.compress() 压缩这些字节 - 这本身不能等待
  3. 通过aioboto3将压缩字节写入S3

所以第 1 部分和第 3 部分使用来自这些库的预定义协程;默认情况下,第 2 部分没有。

简单的例子:

import bz2
import io
import aiohttp
import aioboto3

async def download(endpoint, bucket_name, key):
    async with aiohttp.ClientSession() as session:
        async with session.request("GET", endpoint, raise_for_status=True) as resp:
            raw = await resp.read()  # payload (bytes)
            # Yikes - isn't it bad to throw a synchronous call into the middle
            # of a coroutine?
            comp = bz2.compress(raw)
            async with (
                aioboto3.session.Session()
                .resource('s3')
                .Bucket(bucket_name)
            ) as bucket:
                await bucket.upload_fileobj(io.BytesIO(comp), key)

正如上面的评论所暗示的,我的理解一直是把一个像bz2.compress() 这样的同步函数扔到一个协程中会弄乱它。 (即使bz2.compress() 的 IO 绑定可能比 CPU 绑定更多。)

那么,这种类型的样板文件通常有什么好处吗?

async def compress(*args, **kwargs):
    return bz2.compress(*args, **kwargs)

(现在comp = await compress(raw)download() 内。)

哇啦,这现在是一个可等待的协程,因为唯一的return 在本机协程中是有效的。有没有理由使用它?

根据this answer,我听说了以类似方式随机抛出asyncio.sleep(0) 的理由——只是为了单一备份到调用协程想要中断的事件循环。是这样吗?

【问题讨论】:

  • 你关于小型协程的问题很有趣,但也许你会从运行同步函数中获得更多好处in executor?
  • 有人可能会对此做出回答:只是将一个函数放在协程中并不会使其异步:它仍然会阻塞。正如@sanyash 所提到的,如果您在此期间还有其他事情要做,将它放在执行程序中将有助于在另一个线程中运行它。

标签: python python-3.x python-asyncio


【解决方案1】:

协程允许你同时运行一些东西,不是并行。它们允许单线程协作多任务处理。这在两种情况下是有意义的:

  • 您需要同步生成结果,就像两个生成器一样。
  • 您希望在另一个协程等待 I/O 时完成一些有用的事情。

http 请求或磁盘 I/O 之类的东西会允许其他协程在等待操作完成时运行。

bz2.compress() 是同步的并且,我想, 在运行时不会释放 GIL but does release GIL这意味着它在运行时不能做任何有意义的工作。也就是说,其他协程在其调用期间不会运行,尽管其他线程会。

如果您预计要压缩的数据量大量,相对而言运行协程的开销很小,您可以使用bz2.BZ2Compressor 并以合理的小块为其提供数据(如 128KB),将结果写入流(S3 支持流,或者您可以使用 StringIO),并在压缩块之间进行await asyncio.sleep(0) 以产生控制。

这将允许其他协程与您的压缩协程同时运行。可能异步 S3 上传也将在套接字级别并行发生,而您的协程将处于非活动状态。

顺便说一句,将您的压缩器明确设为async generator 可能是表达相同想法的更简单方法。

【讨论】:

  • bz2.BZ2Compressor 提供较小块的部分很有意义。谢谢。我还认为我可以(可能)将resp 本身提供给compress(),因为它类似于缓冲区。
  • bz2 确实释放了 gil,因此可以在另一个线程中有效使用 [ref: github.com/python/cpython/blob/…]
  • @BradSolomon:我赞成另一个答案,这可能是一个更好的解决方案。
  • @Max 你会说分块压缩/解压缩例程仍然有效吗? pastebin.com/523W9zXU
  • @BradSolomon 您必须对其进行测量,但假设您在多核系统上,并且您的块足够大以至于线程协调是值得的,那么单独的线程会更快。
【解决方案2】:

那么,这种类型的样板文件通常有什么好处吗?

async def compress(*args, **kwargs):
    return bz2.compress(*args, **kwargs)

没有任何好处。与预期相反,添加一个 await doesn't guarantee 将控制传递给事件循环 - 只有在等待的协程实际挂起时才会发生这种情况。由于compress 不等待任何东西,它永远不会挂起,所以它只是名义上的协程。

注意在协程中添加await asyncio.sleep(0)并不能解决问题;有关更详细的讨论,请参阅this answer。如果需要运行阻塞函数,请使用run_in_executor

async def compress(*args, **kwargs):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(None, lambda: bz2.compress(*args, **kwargs))

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-01-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-07-31
    • 2021-12-12
    • 2014-08-25
    • 1970-01-01
    相关资源
    最近更新 更多