【问题标题】:Why do we need `async for` and `async with`?为什么我们需要 `async for` 和 `async with`?
【发布时间】:2021-04-14 12:45:58
【问题描述】:

介绍async forasync with有什么意义?我知道这些陈述有 PEP,但它们显然是针对语言设计者的,而不是像我这样的普通用户。非常感谢附有示例的高级理由。

我自己做了一些研究,发现this answer

async forasync with 语句是必需的,因为您会用纯 forwith 语句破坏 yield from/await 链。

作者并没有举例说明链条是如何被破坏的,所以我还是很困惑。此外,我注意到 Python 有async forasync with,但没有async whileasync try ... except。这听起来很奇怪,因为 forwith 分别只是 whiletry ... except 的语法糖。我的意思是,考虑到它们是前者的构建块,后一种语句的async 版本不会提供更大的灵活性吗?

another answer在讨论async for,但它只涵盖了它不是的用途,并没有说明它的用途。

作为奖励,async forasync with 是语法糖吗?如果是,它们的详细等价形式是什么?

【问题讨论】:

  • "forwith 只是 whiletry ... except 的语法糖" — 不,远非如此,它们各有各的特色。
  • @deceze 好吧,官方文档states 认为with 声明“在语义上等同于”try...except...finally。您可以使用whilenext 轻松实现for 循环。也许它们不是语法糖,但它们也没有不同。
  • 您需要这种新语法,因为如果它们被“糖”with/for 声明?
  • 不,你不能,因为这只是一个执行异步函数的阻塞调用。它不允许事件循环执行任何其他计划的协程,因为您只是启动和停止一个事件循环以解决一个异步enter
  • 如果你想这样说,是的。 forwith 封装了涉及特定方法的特定模式的协议,您可以使用whiletry..except..finally“手动”复制这些模式。但关键在于使这些模式可重用,而不是每次都编写大量样板文件。由于异步版本的样板文件不同,因此您需要它们的特定 async 版本。

标签: python asynchronous async-await python-asyncio coroutine


【解决方案1】:

TLDR:forwith非平凡语法糖,封装了调用相关方法的几个步骤。这使得无法在这些步骤之间手动添加awaits - 但正确可用的async for/with 需要这样做。同时,这意味着为他们提供async 支持至关重要。


为什么我们不能await 好东西

Python 的语句和表达式由所谓的协议支持:当某个对象用于某些特定的语句/表达式时,Python 会在该对象上调用相应的“特殊方法”以允许自定义。例如,x in [1, 2, 3] 委托给 list.__contains__ 以定义 in 的实际含义。
大多数协议都很简单:每个语句/表达式都有一个特殊方法。如果我们唯一的async 特征是原始await,那么我们仍然可以通过在正确的位置添加await 来制作所有这些“一种特殊方法”语句/表达式“async”。

相比之下,forwith 语句都对应于多个步骤:for 使用iterator protocol重复获取迭代器的__next__ 项,并且@ 987654345@ 使用context manager protocol 进入和退出上下文。
重要的部分是两者都有可能需要异步的多个步骤。虽然我们可以在其中一个步骤中手动添加await,但我们无法全部命中。

  • 比较容易看的情况是with:我们可以分别寻址__enter____exit__方法。

    我们可以用 异步 特殊方法天真地定义一个同步上下文管理器。要进入这个实际上可以通过战略性地添加await

    with AsyncEnterContext() as acm:
        context = await acm
        print("I entered an async context and all I got was this lousy", context)
    

    但是,如果我们对多个上下文使用单个with语句,它已经崩溃了:我们将首先进入所有上下文一次,然后立即等待所有这些

    with AsyncEnterContext() as acm1, AsyncEnterContext() as acm2:
        context1, context2 = await acm1, await acm2  # wrong! acm1 must be entered completely before loading acm2
        print("I entered many async contexts and all I got was a rules lawyer telling me I did it wrong!")
    

    更糟糕的是,我们无法正确地await退出

虽然forwith 确实是语法糖,但它们是非平凡 语法糖:它们使多重 动作更好。结果,人们不能天真地await individual 他们的行为。只有async withasync for 的毯子才能覆盖每一步。

为什么我们要async 好东西

forwith 都是抽象:它们完全封装迭代/上下文化的想法。

再次选择两者之一,Python 的forinternal 迭代的抽象——相比之下,whileexternal 迭代的抽象。 In short,这意味着for 的全部意义在于程序员不必知道迭代实际上是如何工作的。

  • 比较如何使用forwhile 迭代list
    some_list = list(range(20))
    index = 0                      # lists are indexed from 0
    while index < len(some_list):  # lists are indexed up to len-1
        print(some_list[index])    # lists are directly index'able
        index += 1                 # lists are evenly spaced
    
    for item in some_list:         # lists are iterable
        print(item)
    
    外部while 迭代依赖于关于列表具体如何工作的知识:它从可迭代对象中提取实现细节out 并将它们放入循环中。相反,内部for 迭代仅依赖于知道列表是可迭代的。它适用于任何列表的实现,实际上也适用于任何可迭代对象的实现。

底线是for 的全部要点——而with——是去打扰实现细节。这包括必须知道 哪些 步骤我们需要使用异步。只有 async withasync for 的毯子可以覆盖每一步,而我们不知道是哪一步。

为什么我们需要async 好东西

一个有效的问题是为什么forwith 得到async 变体,而其他的却没有。 forwith 有一个微妙的地方在日常使用中并不明显:它们都代表并发——并发是async 的域。

无需过多详细说明,简单的解释是处理例程 (())、可迭代对象 (for) 和上下文管理器 (with) 的等效性。正如answer cited in the question 中已经确定的那样,协程实际上是一种生成器。显然,生成器也是可迭代的,实际上我们可以通过生成器表达任何可迭代的对象。不太明显的部分是上下文管理器也等同于生成器——最重要的是,contextlib.contextmanager 可以将生成器转换为上下文管理器。

为了始终如一地处理各种并发,我们需要async 用于例程 (await)、可迭代对象 (async for) 和上下文管理器 (async with) 的变体。只有毯子async withasync for 才能始终如一地覆盖每一步。

【讨论】:

    【解决方案2】:

    async forasync with是从低到高发展的逻辑延续。

    过去,编程语言中的 for 循环过去只能简单地迭代线性索引为 0、1、2 ... 最大值的值数组。

    Python 的for 循环是更高级别的构造。它可以迭代任何支持迭代协议的东西,例如在树中设置元素或节点 - 它们都没有编号为 0、1、2 等的项目。

    迭代协议的核心是__next__ 特殊方法。每次连续调用都会返回下一项(可能是计算值或检索到的数据)或表示迭代结束。

    async for 是异步对应的,而不是调用常规的__next__,它等待异步的__anext__,其他一切都保持不变。这允许在异步程序中使用常见的习惯用法:

    # 1. print lines of text stored in a file
    for line in regular_file:
        print(line)
    
    # 2A. print lines of text as they arrive over the network,
    #
    # The same idiom as above, but the asynchronous character makes
    # it possible to execute other tasks while waiting for new data
    async for line in tcp_stream:
        print(line)
    
    # 2B: the same with a spawned command
    async for line in running_subprocess.stdout:
        print(line)
    

    async with 的情况类似。总而言之:try .. finally 结构被更方便的 with 块所取代——现在被认为是惯用的——它可以通过其 __enter____exit__ 进入和退出块的方法与支持上下文管理器协议的任何东西进行通信。自然地,以前在 try .. finally 中使用的所有内容都被重写为上下文管理器(锁、开闭调用对等)

    async with 再次与异步 __aenter____aexit__ 特殊方法对应。当进入或退出with 块的异步代码等待新数据或锁或其他一些条件得到满足时,其他任务可能会运行。

    注意:与for 不同,可以使用带有普通(非异步)with 语句的异步对象:with await lock:,现在已弃用或不受支持。

    【讨论】:

    • 注意with await lock: 仍然可以使用,但它不是async with lock:。这意味着对象生产上下文管理器是async,而不是上下文管理器本身是async
    • 基本上听起来async for 的语法只是为了确保 for 循环与异步代码一起正常工作,因为它的实现并不简单(这是我的猜测)。因此 for 循环正常工作,但现在允许使用 await 关键字。这或多或少是对的吗?
    • @CharlieParker 我会说async for 循环的实现与普通for 的难度级别相同。不同之处在于它循环遍历在其内部实现中异步工作的迭代。换句话说,您必须使用正确的for 来匹配可迭代的类型。与几乎无处不在的常规迭代相比,异步迭代很少。这使得代码中async for 的出现非常罕见。
    • @VPfB 感谢您的留言!让我重复一遍以确保我理解。所以async for 是一个以异步方式从 io 获取东西的生成器必不可少的,因此每次准备好(至关重要)以正确的顺序然后它返回下一个东西。那正确吗?所以async for 不仅允许关键字await 在它的主体中使用,而且还允许迭代器以异步方式获取下一项并尊重迭代器的顺序。对吗?
    • @CharlieParker: Re 1):是的,这正是异步迭代的主要原因,只是一个小提示:比“昂贵”更好的术语是“I/O Bound”(@987654321 @) Re 2):嗯,可能是迭代器组装整行(或其他数据单元)的那些示例,但通常它不是异步交互的主要特征。从磁盘文件中读取行的普通迭代器几乎相同;不同之处在于本地文件 I/O 是非阻塞的,而且通常很快,我们可以认为结果立即可用。
    【解决方案3】:

    我对@9​​87654323@ 的理解是它允许python 在上下文管理器中调用await 关键字,而不会吓到python。从with 中删除async 会导致错误。这很有用,因为创建的对象很可能会执行我们必须等待的昂贵的 io 操作 - 所以我们可能会等待从这个特殊的异步上下文管理器创建的对象的方法。如果不正确地关闭和打开上下文管理器,可能会在 python 中产生问题(否则为什么要用更细微的语法和语义来打扰 python 用户学习?)。

    我还没有完全测试过 async for 的作用或它的复杂性,但我很想看一个例子,以后可能会在我需要它并更新这个答案时对其进行测试。一旦我开始,我会把这个例子放在这里:https://github.com/brando90/ultimate-utils/blob/master/tutorials_for_myself/concurrency/asyncio_for.py

    现在看我用async with 注释的例子(脚本生活https://github.com/brando90/ultimate-utils/blob/master/tutorials_for_myself/concurrency/asyncio_my_example.py):

    """
    1. https://realpython.com/async-io-python/#the-asyncawait-syntax-and-native-coroutines
    2. https://realpython.com/python-concurrency/
    3. https://stackoverflow.com/questions/67092070/why-do-we-need-async-for-and-async-with
    
    todo - async with, async for.
    
    todo: meaning of:
        - The async for and async with statements are only needed to the extent that using plain for or with would “break”
            the nature of await in the coroutine. This distinction between asynchronicity and concurrency is a key one to grasp
        - One exception to this that you’ll see in the next code is the async with statement, which creates a context
            manager from an object you would normally await. While the semantics are a little different, the idea is the
            same: to flag this context manager as something that can get swapped out.
        - download_site() at the top is almost identical to the threading version with the exception of the async keyword on
            the function definition line and the async with keywords when you actually call session.get().
            You’ll see later why Session can be passed in here rather than using thread-local storage.
        - An asynchronous context manager is a context manager that is able to suspend execution in its enter and exit
            methods.
    """
    
    import asyncio
    from asyncio import Task
    
    import time
    
    import aiohttp
    from aiohttp.client_reqrep import ClientResponse
    
    from typing import Coroutine
    
    
    async def download_site(coroutine_name: str, session: aiohttp.ClientSession, url: str) -> ClientResponse:
        """
        Calls an expensive io (get data from a url) using the special session (awaitable) object. Note that not all objects
        are awaitable.
        """
        # - the with statement is bad here in my opion since async with is already mysterious and it's being used twice
        # async with session.get(url) as response:
        #     print("Read {0} from {1}".format(response.content_length, url))
        # - this won't work since it only creates the coroutine. It **has** to be awaited. The trick to have it be (buggy)
        # synchronous is to have the main coroutine call each task we want in order instead of giving all the tasks we want
        # at once to the vent loop e.g. with the asyncio.gather which gives all coroutines, gets the result in a list and
        # thus doesn't block!
        # response = session.get(url)
        # - right way to do async code is to have this await so someone else can run. Note, if the download_site/ parent
        # program is awaited in a for loop this won't work regardless.
        response = await session.get(url)
        print(f"Read {response.content_length} from {url} using {coroutine_name=}")
        return response
    
    async def download_all_sites_not_actually_async_buggy(sites: list[str]) -> list[ClientResponse]:
        """
        Code to demo the none async code. The code isn't truly asynchronous/concurrent because we are awaiting all the io
        calls (to the network) in the for loop. To avoid this issue, give the list of coroutines to a function that actually
        dispatches the io like asyncio.gather.
    
        My understanding is that async with allows the object given to be a awaitable object. This means that the object
        created is an object that does io calls so it might block so it's often the case we await it. Recall that when we
        run await f() f is either 1) coroutine that gains control (but might block code!) or 2) io call that takes a long
        time. But because of how python works after the await finishes the program expects the response to "actually be
        there". Thus, doing await blindly doesn't speed up the code. Do awaits on real io calls and call them with things
        that give it to the event loop (e.g. asyncio.gather).
    
        """
        # - create a awaitable object without having the context manager explode if it gives up execution.
        # - crucially, the session is an aiosession - so it is actually awaitable so we can actually give it to
        # - asyncio.gather and thus in the async code we truly take advantage of the concurrency of asynchronous programming
        async with aiohttp.ClientSession() as session:
        # with aiohttp.ClientSession() as session:  # won't work because there is an await inside this with
            tasks: list[Task] = []
            responses: list[ClientResponse] = []
            for i, url in enumerate(sites):
                task: Task = asyncio.ensure_future(download_site(f'coroutine{i}', session, url))
                tasks.append(task)
                response: ClientResponse = await session.get(url)
                responses.append(response)
            return responses
    
    
    async def download_all_sites_truly_async(sites: list[str]) -> list[ClientResponse]:
        """
        Truly async program that calls creates a bunch of coroutines that download data from urls and the uses gather to
        have the event loop run it asynchronously (and thus efficiently). Note there is only one process though.
        """
        # - indicates that session is an async obj that will likely be awaited since it likely does an expensive io that
        # - waits so it wants to give control back to the event loop or other coroutines so they can do stuff while the
        # - io happens
        async with aiohttp.ClientSession() as session:
            tasks: list[Task] = []
            for i, url in enumerate(sites):
                task: Task = asyncio.ensure_future(download_site(f'coroutine{i}', session, url))
                tasks.append(task)
            responses: list[ClientResponse] = await asyncio.gather(*tasks, return_exceptions=True)
            return responses
    
    
    if __name__ == "__main__":
        # - args
        sites = ["https://www.jython.org", "http://olympus.realpython.org/dice"] * 80
        start_time = time.time()
    
        # - run main async code
        # main_coroutine: Coroutine = download_all_sites_truly_async(sites)
        main_coroutine: Coroutine = download_all_sites_not_actually_async_buggy(sites)
        responses: list[ClientResponse] = asyncio.run(main_coroutine)
    
        # - print stats
        duration = time.time() - start_time
        print(f"Downloaded {len(sites)} sites in {duration} seconds")
        print('Success, done!\a')
    

    【讨论】:

      猜你喜欢
      • 2017-10-26
      • 2021-04-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-08-27
      • 2019-02-26
      相关资源
      最近更新 更多