【问题标题】:asynchronous python itertools chain multiple generators异步 python itertools 链多个生成器
【发布时间】:2020-06-11 01:00:29
【问题描述】:

澄清的更新问题:

假设我有 2 个处理生成器函数:

def gen1(): # just for examples,
  yield 1   # yields actually carry 
  yield 2   # different computation weight 
  yield 3   # in my case

def gen2():
  yield 4
  yield 5
  yield 6

我可以用 itertools 链接它们

from itertools import chain

mix = chain(gen1(), gen2())

然后我可以用它创建另一个生成器函数对象,

def mix_yield():
   for item in mix:
      yield item

或者如果我只想next(mix),它就在那里。

我的问题是,我怎样才能在异步代码中做同样的事情?

因为我需要它:

  • 返回收益(一个一个),或使用next迭代器
  • 最快解决的产量优先(异步)

上一个。更新:

经过实验和研究,我发现aiostream library 声明为 itertools 的异步版本,所以我做了什么:

import asyncio
from aiostream import stream

async def gen1(): 
     await asyncio.sleep(0) 
     yield 1 
     await asyncio.sleep(0) 
     yield 2 
     await asyncio.sleep(0) 
     yield 3 

async def gen2(): 
     await asyncio.sleep(0) 
     yield 4 
     await asyncio.sleep(0) 
     yield 5 
     await asyncio.sleep(0) 
     yield 6 

a_mix = stream.combine.merge(gen1(),gen2())

async def a_mix_yield():
   for item in a_mix:
      yield item

但我还是做不到next(a_mix)

TypeError: 'merge' object is not an iterator

next(await a_mix)

raise StreamEmpty()

虽然我仍然可以将其列入列表:

print(await stream.list(a_mix))
# [1, 2, 4, 3, 5, 6]

所以一个目标完成了,还有一个目标:

  • 返回收益(一个一个),或使用next迭代器

    - 最快解决的 yield first (async)

【问题讨论】:

  • 您上面的代码只是创建了几个生成器并遍历它们。因此,为什么您会看到它们按顺序打印。您可以先遍历 gen2,它会打印 4、5、6、1、2、3。也许你应该找一个不同的例子来展示你想要做什么。
  • 在我的情况下 gen1() 和 gen2() 不是同时产生的,我会更新我的问题,我认为已经用 aiostream 找到了答案(我希望)。
  • 抱歉给大家带来了困惑,为了清楚起见,我更新了问题。

标签: python python-3.x asynchronous python-asyncio sequence-generators


【解决方案1】:

Python 的next 内置函数只是一种在对象上调用底层__next__ 方法的便捷方式。 __next__ 的异步等效项是异步迭代器上的 __anext__ 方法。没有anext 全局函数,但很容易写出来:

async def anext(aiterator):
    return await aiterator.__anext__()

但是节省的费用是如此之少,以至于在极少数需要这样做的情况下,不妨直接调用__anext__。异步迭代器又是通过调用__aiter__(类似于常规可迭代对象提供的__iter__)从异步iterable 中获得的。手动驱动的异步迭代如下所示:

a_iterator = obj.__aiter__()          # regular method
elem1 = await a_iterator.__anext__()  # async method
elem2 = await a_iterator.__anext__()  # async method
...

__anext__ 将在没有更多可用元素时引发 StopAsyncIteration。要遍历异步迭代器,应该使用async for

这是一个可运行的示例,基于您的代码,使用__anext__async for 来耗尽aiostream.stream.combine.merge 设置的流:

async def main():
    a_mix = stream.combine.merge(gen1(), gen2())
    async with a_mix.stream() as streamer:
        mix_iter = streamer.__aiter__()    
        print(await mix_iter.__anext__())
        print(await mix_iter.__anext__())
        print('remaining:')
        async for x in mix_iter:
            print(x)

asyncio.get_event_loop().run_until_complete(main())

【讨论】:

  • 非常感谢,我得重读几遍,连贯起来才能完全理解。
  • @Ardhi 另一个很好的资源是 the PEP 介绍了他们。
  • 刚刚提到在 aiostream 中输入流式上下文意味着使用 async with zs.stream() as streamer: 完成,如 demonstration 所示。
  • @Vincent 谢谢,我现在修改了答案以使用广告模式。
  • +1。对这个答案的一个潜在改进是强调你可以很容易地制作一个 async 等效于 next 给定这些知识 - 就像 async def anext(async_iterator): return await async_iterator.__anext__()
【解决方案2】:

我遇到了这个答案,我查看了 aiostream 库。这是我想出的合并多个异步生成器的代码。它不使用任何库。

async def merge_generators(gens:Set[AsyncGenerator[Any, None]]) -> AsyncGenerator[Any, None]:
    pending = gens.copy()
    pending_tasks = { asyncio.ensure_future(g.__anext__()): g for g in pending }
    while len(pending_tasks) > 0:
        done, _ = await asyncio.wait(pending_tasks.keys(), return_when="FIRST_COMPLETED")
        for d in done:
            try:
                result = d.result()
                yield result
                dg = pending_tasks[d]
                pending_tasks[asyncio.ensure_future(dg.__anext__())] = dg
            except StopAsyncIteration as sai:
                print("Exception in getting result", sai)
            finally:
                del pending_tasks[d]

希望这对您有所帮助,如果其中有任何错误,请告诉我。

【讨论】:

    猜你喜欢
    • 2019-11-16
    • 1970-01-01
    • 1970-01-01
    • 2010-12-20
    • 2021-08-24
    • 2020-11-20
    • 1970-01-01
    • 1970-01-01
    • 2017-03-17
    相关资源
    最近更新 更多