【问题标题】:python - how to implement a C-function as awaitable (coroutine)python - 如何将 C 函数实现为可等待(协程)
【发布时间】:2018-12-04 08:41:14
【问题描述】:

环境:C 和 micropython 虚拟机中的协作 RTOS 是任务之一。

为了使 VM 不会阻塞其他 RTOS 任务,我在 vm.c:DISPATCH() 中插入了 RTOS_sleep(),以便在执行每个字节码后,VM 将控制权交给下一个 RTOS 任务。

我创建了一个 uPy 接口,用于从物理数据总线(可以是 CAN、SPI、以太网)异步获取数据,使用生产者-消费者设计模式。

在 uPy 中的用法:

can_q = CANbus.queue()
message = can_q.get()

C 中的实现是can_q.get() 不会阻塞 RTOS:它会轮询 C 队列,如果没有收到消息,它会调用 RTOS_sleep() 给另一个任务填充队列的机会。事情是同步的,因为 C 队列仅由另一个 RTOS 任务更新,并且 RTOS 任务仅在调用 RTOS_sleep() 时切换,即 cooperative

C 实现基本上是:

// gives chance for c-queue to be filled by other RTOS task
while(c_queue_empty() == true) RTOS_sleep(); 
return c_queue_get_message();

虽然 Python 语句 can_q.get() 不会阻止 RTOS,但它会阻止 uPy 脚本。 我想重写它,以便我可以将它与 async def 一起使用,即 coroutine 并且它不会阻止 uPy 脚本。

不确定the syntax,但类似这样:

can_q = CANbus.queue()
message = await can_q.get()

问题

我如何编写一个 C 函数以便我可以在上面await

我更喜欢 CPython 和 micropython 的答案,但我会接受仅 CPython 的答案。

【问题讨论】:

    标签: python c async-await coroutine micropython


    【解决方案1】:

    注意:此答案涵盖 CPython 和 asyncio 框架。然而,这些概念应该适用于其他 Python 实现以及其他异步框架。

    我如何编写一个 C 函数以便我可以在上面await

    编写一个可以等待结果的 C 函数的最简单方法是让它返回一个已经生成的可等待对象,例如 asyncio.Future。在返回Future 之前,代码必须安排未来的结果由某种异步机制设置。所有这些基于协程的方法都假设您的程序在某个知道如何调度协程的事件循环下运行。

    但是返回一个未来并不总是足够的——也许我们想定义一个具有任意数量的悬挂点的对象。返回一个未来只会暂停一次(如果返回的未来不完整),一旦未来完成就会恢复,就是这样。等价于包含多个awaitasync def 的可等待对象不能通过返回未来来实现,它必须实现协程通常实现的协议。这有点像实现自定义 __next__ 的迭代器,并被用来代替生成器。

    定义一个自定义的可等待对象

    要定义我们自己的等待类型,我们可以求助于 PEP 492,其中specifies 究竟哪些对象可以传递给await。除了使用async def 定义的Python 函数之外,用户定义的类型可以通过定义__await__ 特殊方法使对象成为可等待对象,该方法Python/C 映射到PyTypeObject 结构的tp_as_async.am_await 部分。

    这意味着在 Python/C 中,您必须执行以下操作:

    • 为您的扩展类型的 tp_as_async 字段指定一个非 NULL 值。
    • 将其am_await 成员指向一个C 函数,该函数接受您的类型的实例并返回实现iterator protocol 的另一个扩展类型的实例,即定义tp_iter(通常定义为PyIter_Self)和tp_iternext
    • 迭代器的tp_iternext 必须推进协程的状态机。 tp_iternext 的每一个非异常返回都对应一个暂停,最后的StopIteration 异常表示协程的最终返回。返回值存储在StopIterationvalue 属性中。

    为了使协程有用,它还必须能够与驱动它的事件循环通信,以便它可以指定在暂停后何时恢复。大多数由 asyncio 定义的协程都希望在 asyncio 事件循环下运行,并在内部使用 asyncio.get_event_loop()(和/或接受显式 loop 参数)来获取其服务。

    示例协程

    为了说明 Python/C 代码需要实现什么,让我们考虑用 Python async def 表示的简单协程,例如 asyncio.sleep() 的等价物:

    async def my_sleep(n):
        loop = asyncio.get_event_loop()
        future = loop.create_future()
        loop.call_later(n, future.set_result, None)
        await future
        # we get back here after the timeout has elapsed, and
        # immediately return
    

    my_sleep 创建一个Future,安排它在 n 秒内完成(其结果变为设置),并暂停自身直到未来完成。最后一部分使用await,其中await x 的意思是“允许x 决定我们现在是暂停还是继续执行”。不完整的未来总是决定挂起,asyncio Task 协程驱动特例产生未来无限期地挂起它们并将它们的完成连接到恢复任务。其他事件循环(curio 等)的暂停机制在细节上可能有所不同,但基本思想是相同的:await 是可选的执行暂停。

    __await__() 返回一个生成器

    要将其转换为 C,我们必须摆脱神奇的 async def 函数定义以及 await 暂停点。删除async def 相当简单:等效的普通函数只需要返回一个实现__await__ 的对象:

    def my_sleep(n):
        return _MySleep(n)
    
    class _MySleep:
        def __init__(self, n):
            self.n = n
    
        def __await__(self):
            return _MySleepIter(self.n)
    

    my_sleep() 返回的_MySleep 对象的__await__ 方法将由await 运算符自动调用,以将awaitable 对象(传递给await 的任何内容)转换为一个迭代器。该迭代器将用于询问等待的对象是选择挂起还是提供值。这很像for o in x 语句调用x.__iter__() 以将iterable x 转换为具体的iterator

    当返回的迭代器选择挂起时,它只需要产生一个值。该值的含义(如果有)将由协程驱动程序解释,通常是事件循环的一部分。当迭代器选择停止执行并从await返回时,需要停止迭代。使用生成器作为便利的迭代器实现,_MySleepIter 看起来像这样:

    def _MySleepIter(n):
        loop = asyncio.get_event_loop()
        future = loop.create_future()
        loop.call_later(n, future.set_result, None)
        # yield from future.__await__()
        for x in future.__await__():
            yield x
    

    由于await x 映射到yield from x.__await__(),我们的生成器必须耗尽future.__await__() 返回的迭代器。 Future.__await__ 返回的迭代器将在未来不完整时产生,否则返回未来的结果(我们在这里忽略,但 yield from 实际上提供)。

    __await__() 返回自定义迭代器

    在 C 中实现 my_sleep 的最后一个障碍是使用 _MySleepIter 的生成器。幸运的是,任何生成器都可以转换为有状态的迭代器,其__next__ 执行这段代码直到下一个等待或返回。 __next__ 实现了生成器代码的状态机版本,其中yield 通过返回一个值来表示,return 通过提高StopIteration 来表示。例如:

    class _MySleepIter:
        def __init__(self, n):
            self.n = n
            self.state = 0
    
        def __iter__(self):  # an iterator has to define __iter__
            return self
    
        def __next__(self):
            if self.state == 0:
                loop = asyncio.get_event_loop()
                self.future = loop.create_future()
                loop.call_later(self.n, self.future.set_result, None)
                self.state = 1
            if self.state == 1:
                if not self.future.done():
                    return next(iter(self.future))
                self.state = 2
            if self.state == 2:
                raise StopIteration
            raise AssertionError("invalid state")
    

    翻译成 C

    上面的输入相当多,但它可以工作,并且只使用可以用原生 Python/C 函数定义的结构。

    实际上将这两个类转换为 C 非常简单,但超出了此答案的范围。

    【讨论】:

      猜你喜欢
      • 2019-07-31
      • 1970-01-01
      • 2022-07-19
      • 1970-01-01
      • 2020-07-03
      • 2020-10-10
      • 2018-06-05
      • 1970-01-01
      • 2017-11-05
      相关资源
      最近更新 更多