【问题标题】:Using Python's async/await with PyGObject has made my code synchronous again. Why?使用 Python 的 async/await 和 PyGObject 使我的代码再次同步。为什么?
【发布时间】:2022-01-16 02:53:46
【问题描述】:

好的,节奏是这样的:我正在开发一个Rhythmbox 插件,它允许我将播放列表中的歌曲从一个地方复制到另一个地方。

我使用Gio.File.copy_async 来复制文件。这是我一开始做的(代码实际上更复杂,但坚持我):

files = []
pending = []

def copy_file(file: Gio.File, cancellable: Gio.Cancellable):
    destination_path = ""  # Compute new path

    cancellable.connect(on_cancel)

    self.__file.copy_async(
        Gio.File.new_for_path(destination_path),
        Gio.FileCopyFlags.ALL_METADATA
        | Gio.FileCopyFlags.NOFOLLOW_SYMLINKS
        | Gio.FileCopyFlags.OVERWRITE,
        GLib.PRIORITY_DEFAULT,
        cancellable,
        on_progress,
        (),
        on_file_copied,
        None,
    )

def on_cancel():
  ...

def on_progress(self, current_num_bytes: int, total_num_bytes: int):
  ...

def on_file_copied(self, file: Gio.File, res: Gio.AsyncResult, _):
  pending.remove(file)
  
  if len(pending) == 0:
    on_batch_done()
  else
    ...
    
def on_batch_done():
  # Do things after files have been copied
  ...

def copy_files():
  files = []
  cancellable = Gio.Cancallable()
  for file in files:
    copy_file(file, cancellable)

起初它是可以管理的,但随着代码变得越来越复杂,使用 Gio 的 *_async 函数将整个事情变成了回调地狱,并使代码更难推理。

所以我决定使用 Python 的 async/await 将回调转换为可等待的协程,就像我以前在 Kotlin 中所做的那样:

class TransfertTask(GObject.Object):
    def __init__(
        self,
        destination: str,
        file: Gio.File,
        cancellable: Gio.Cancellable,
        loop: AbstractEventLoop,
    ):
        ...
        super().__init__()

    def start(self):
        self.__cancellable.connect(self.__on_cancel)

        self.__file.copy_async(
            Gio.File.new_for_path(self.destination),
            Gio.FileCopyFlags.ALL_METADATA
            | Gio.FileCopyFlags.NOFOLLOW_SYMLINKS
            | Gio.FileCopyFlags.OVERWRITE,
            GLib.PRIORITY_DEFAULT,
            self.__cancellable,
            self.__on_progress,
            (),
            self.__on_file_copied,
            None,
        )

        return self.__future

    def __on_progress(self, current_num_bytes: int, total_num_bytes: int):
      ...

    def __on_cancel(self):
        self.__future.cancel()

    def __on_file_copied(self, file: Gio.File, res: Gio.AsyncResult, _):
        async def set_future_result():
            self.__future.set_result(self)

        self.__finished = True
        run_coroutine_threadsafe(set_future_result(), self.__loop)
        try:
            file.copy_finish(res)
        except GLib.Error as e:
            self.error = e
  
async def copy_files():
  cancellable = Gio.Cancellable()
  loop = get_running_loop()
  
  await gather(*[
    TransfertTask("...", file, cancellable, loop).start()
    for file in files
  ])
  
def start():
  asyncio.run(copy_files())

问题是,现在代码会阻止 UI,直到文件被复制。为了恢复异步处理,我必须像这样在单独的线程中启动主协程:

def start():
  loop = get_event_loop()
  thr = Thread(target=loop.run_forever)
  thr.daemon = True
  thr.start()
  run_coroutine_threadsafe(copy_files(), loop)

我对 asyncio 还很陌生,所以有些东西我还是不太明白。我希望await 或事件asyncio.run() 会阻塞,直到协程完成运行。但是我在互联网上找不到任何方法来告诉 Python“只需启动这个协程并继续前进,我不在乎结果”。也许 Python 做不到,我还是想太多,就像我在写 Kotlin。

谁能告诉我我做错了什么?

编辑: 扩展 Simon Hawe 的答案,这里的主要问题是我在大多数 asyncio 示例中找到的 asyncio.run() 函数,它等待等待的协程。您可以在 Internet 上找到的所有示例都是 您可以控制程序的入口点的示例,这不是我的情况。在 Rhythmbox 中,您必须像这样定义插件:

from gi.repository import GObject, RB, Peas

class SomePlugin (GObject.Object, Peas.Activatable):
  object = GObject.property(type=GObject.Object)

  def __init__(self):
    super().__init__()

  def do_activate(self):
    ... 

  def do_deactivate(self):
    ...

然后创建类并通过 Rhythmbox 执行do_activate() 方法。

如您所见,我无法控制程序的入口点,因此没有正确的位置可以调用 asyncio.run() 并且它不会阻塞 UI。

也许只是我不知道如何询问搜索引擎,但我很惊讶以前似乎没有人遇到过这个用例。

【问题讨论】:

    标签: python asynchronous async-await python-asyncio pygobject


    【解决方案1】:

    如果您处于纯异步世界中,您可以使用 create_task 或 ensure_future 直接触发协程而无需等待结果。一个例子是这样的

    import asyncio
    
    async def dosomething():
        print("Hell")
        await asyncio.sleep(10)
        print("world")
    
    async def main():
        asyncio.create_task(dosomething())
        for i in range(10):
            print(f"What {i}")
            await asyncio.sleep(2)
    
    asyncio.run(main())
    

    重要的是,这仅在您不结合使用异步和同步代码时才有效。或者更准确地说,刚刚“盲目”启动的协程必须在提供给 asyncio.run 的函数完成之前完成。这意味着代码

    import asyncio
    from time import sleep
    
    async def dosomething():
        print("Hell")
        await asyncio.sleep(10)
        print("world")
    
    async def main():
        asyncio.create_task(dosomething())
    
    print("Start")
    asyncio.run(main())
    print("Stop")
    
    for i in range(10):
        print(f"What {i}")
        sleep(2)
    

    永远不会将 world 打印为 main 立即完成。因此,我认为您使用新 Thread 解决了您手头的问题是正确的。

    【讨论】:

    • 这正是我的问题。没有任何相当于asyncio.run()的“盲目启动协程”吗?由于这个过程是如何启动的(它是一个按钮回调),我无法控制代码的执行方式。只要插件存在并且我可以将主协程附加到启动时,是否有某种事件循环运行?
    猜你喜欢
    • 2019-01-10
    • 2021-06-08
    • 1970-01-01
    • 2017-12-05
    • 2014-09-28
    • 2017-10-26
    • 2021-11-05
    相关资源
    最近更新 更多