【问题标题】:asyncio yield from concurrent.futures.Future of an Executorasyncio yield from concurrent.futures.Future of an Executor
【发布时间】:2014-04-22 02:36:23
【问题描述】:

我有一个 long_task 函数,它运行大量的 cpu 绑定计算,我想通过使用新的 asyncio 框架使其异步。生成的 long_task_async 函数使用 ProcessPoolExecutor 将工作卸载到不受 GIL 约束的不同进程。

问题在于,由于某种原因,从 ProcessPoolExecutor.submit 返回的 concurrent.futures.Future 实例在从抛出 TypeError 时产生。这是设计使然吗?那些期货是否与asyncio.Future 类不兼容?什么是解决方法?

我还注意到生成器是不可提取的,因此向ProcessPoolExecutor 提交协程会失败。是否也有任何干净的解决方案?

import asyncio
from concurrent.futures import ProcessPoolExecutor

@asyncio.coroutine
def long_task():
    yield from asyncio.sleep(4)
    return "completed"

@asyncio.coroutine
def long_task_async():
    with ProcessPoolExecutor(1) as ex:
        return (yield from ex.submit(long_task)) #TypeError: 'Future' object is not iterable
                                                 # long_task is a generator, can't be pickled

loop = asyncio.get_event_loop()

@asyncio.coroutine
def main():
    n = yield from long_task_async()
    print( n )

loop.run_until_complete(main())

【问题讨论】:

    标签: python python-3.x python-asyncio concurrent.futures


    【解决方案1】:

    您想使用loop.run_in_executor,它使用concurrent.futures 执行程序,但将返回值映射到asyncio 未来。

    原来的asyncio PEP suggests that concurrent.futures.Future 可能有一天会发展出__iter__ 方法,因此它也可以与yield from 一起使用,但目前该库被设计为只需要yield from 支持仅此而已。 (否则有些代码在 3.3 中实际上无法工作。)

    【讨论】:

      【解决方案2】:

      我们可以通过调用asyncio.wrap_future(Future)concurrent.futures.Future 包装成asyncio.future。我用下面的代码试过了。工作正常

      from asyncio import coroutine
      import asyncio
      from concurrent import futures
      
      
      def do_something():
          ls = []
          for i in range(1, 1000000):
              if i % 133333 == 0:
                  ls.append(i)
          return ls
      
      
      @coroutine
      def method():
          with futures.ProcessPoolExecutor(max_workers=10) as executor:
              job = executor.submit(do_something)
              return (yield from asyncio.wrap_future(job))
      
      @coroutine
      def call_method():
          result = yield from method()
          print(result)
      
      
      def main():
          loop = asyncio.get_event_loop()
          try:
              loop.run_until_complete(call_method())
          finally:
              loop.close()
      
      
      if __name__ == '__main__':
          main()
      

      【讨论】:

        猜你喜欢
        • 2014-01-10
        • 1970-01-01
        • 2014-01-28
        • 2022-12-02
        • 2023-01-25
        • 1970-01-01
        • 2022-12-02
        • 2019-03-11
        • 2020-05-25
        相关资源
        最近更新 更多