【问题标题】:How to gather results and using limit with parent child functions如何收集结果并在父子函数中使用限制
【发布时间】:2019-09-21 17:19:50
【问题描述】:

我想要实现的目标是产生多个父母,每个父母都做一些工作,然后产生几个孩子来检查其他事情,并在父母身上获取这些结果以做进一步的工作。 我还尝试设置 2 个不同的生成限制,因为父项的工作可以比子项做的更多。

我将如何做到这一点?

如果我不使用limit2,它可以工作,但我想要两个限制器。

import trio
import asks
import time
import random

async def child(parent, i, sender, limit2):
    async with limit2:
        print('Parent {0}, Child {1}: started! Sleeping now...'.format(parent, i))
        #await trio.sleep(random.randrange(0, 3))
        print('Parent {0}, Child {1}: exiting!'.format(parent, i))
        async with sender:
            await sender.send('Parent {0}, Child {1}: exiting!'.format(parent, i))

async def parent(i, limit):
    async with limit:
        print('Parent {0}: started! Sleeping now...'.format(i))
        #await trio.sleep(random.randrange(0, 3))

        sender, receiver = trio.open_memory_channel(10)
        limit2 = trio.CapacityLimiter(2)
        async with trio.open_nursery() as nursery:
            for j in range(10):
                nursery.start_soon(child, i, j, sender, limit2)

        async with receiver:
            async for value in receiver:
                print('Got value: {!r}'.format(value))
        print('Parent {0}: exiting!'.format(i))

async def main():
    limit = trio.CapacityLimiter(1)
    async with trio.open_nursery() as nursery:
        for i in range(1):
            nursery.start_soon(parent, i, limit)


if __name__ == "__main__":
    start_time = time.perf_counter()
    trio.run(main)
    duration = time.perf_counter() - start_time
    print("Took {:.2f} seconds".format(duration))

【问题讨论】:

  • 当你运行你的代码时会发生什么,它与你期望的有什么不同?
  • 它没有打印print('Parent {0}: exiting!'.format(i)) 或退出程序。我认为这可能与limit2 = trio.CapacityLimiter(3) 有关,因为没有它,它可以按我的预期工作。但我想在那里设置一个限制。
  • 这也是我尝试做一些基本的事情,所以我以后可以开始使用 python-asks,所以我有类似样板的东西,我为多个网站生成一个工人,然后为每个网站我将生成x 工人进行一些检查,然后将该信息与我之前的信息一起返回。

标签: python-3.x python-trio


【解决方案1】:

当我运行你的代码时,我得到:

  File "/tmp/zigb.py", line 12, in child
    await sender.send('Parent {0}, Child {1}: exiting!'.format(parent, i))
  File "/home/njs/.user-python3.7/lib/python3.7/site-packages/trio/_channel.py", line 157, in send
    self.send_nowait(value)
  File "/home/njs/.user-python3.7/lib/python3.7/site-packages/trio/_core/_ki.py", line 167, in wrapper
    return fn(*args, **kwargs)
  File "/home/njs/.user-python3.7/lib/python3.7/site-packages/trio/_channel.py", line 135, in send_nowait
    raise trio.ClosedResourceError
trio.ClosedResourceError

这里发生的情况是您将sender 通道传递给所有10 个子任务,然后每个子任务都在执行async with sender: ...,这会关闭sender 通道。所以第一个任务使用它,然后关闭它,然后下一个任务尝试使用它......但是它已经关闭了,所以它会出错。

幸运的是,Trio 为这个问题提供了一个解决方案:您可以在内存通道对象上使用clone 方法来创建该内存通道的第二个副本,该副本的工作方式完全相同,但会独立关闭。所以诀窍是给每个孩子一个sender的克隆,然后他们各自关闭他们的克隆,然后一旦所有的克隆都关闭,接收器就会收到通知并停止循环。

文档:https://trio.readthedocs.io/en/stable/reference-core.html#managing-multiple-producers-and-or-multiple-consumers

您的代码的固定版本:

import trio
import asks
import time
import random

async def child(parent, i, sender, limit2):
    async with limit2:
        print('Parent {0}, Child {1}: started! Sleeping now...'.format(parent, i))
        #await trio.sleep(random.randrange(0, 3))
        print('Parent {0}, Child {1}: exiting!'.format(parent, i))
        async with sender:
            await sender.send('Parent {0}, Child {1}: exiting!'.format(parent, i))

async def parent(i, limit):
    async with limit:
        print('Parent {0}: started! Sleeping now...'.format(i))
        #await trio.sleep(random.randrange(0, 3))

        sender, receiver = trio.open_memory_channel(10)
        limit2 = trio.CapacityLimiter(2)
        async with trio.open_nursery() as nursery:
            for j in range(10):
                # CHANGED: Give each child its own clone of 'sender', which
                # it will close when it's done
                nursery.start_soon(child, i, j, sender.clone(), limit2)
        # CHANGED: Close the original 'sender', once we're done making clones
        await sender.aclose()

        async with receiver:
            async for value in receiver:
                print('Got value: {!r}'.format(value))
        print('Parent {0}: exiting!'.format(i))

async def main():
    limit = trio.CapacityLimiter(1)
    async with trio.open_nursery() as nursery:
        for i in range(1):
            nursery.start_soon(parent, i, limit)


if __name__ == "__main__":
    start_time = time.perf_counter()
    trio.run(main)
    duration = time.perf_counter() - start_time
    print("Took {:.2f} seconds".format(duration))

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-03-07
    • 1970-01-01
    • 2011-12-29
    • 1970-01-01
    • 2010-09-26
    相关资源
    最近更新 更多