【发布时间】:2022-01-16 02:53:46
【问题描述】:
好的,节奏是这样的:我正在开发一个Rhythmbox 插件,它允许我将播放列表中的歌曲从一个地方复制到另一个地方。
我使用Gio.File.copy_async 来复制文件。这是我一开始做的(代码实际上更复杂,但坚持我):
files = []
pending = []
def copy_file(file: Gio.File, cancellable: Gio.Cancellable):
destination_path = "" # Compute new path
cancellable.connect(on_cancel)
self.__file.copy_async(
Gio.File.new_for_path(destination_path),
Gio.FileCopyFlags.ALL_METADATA
| Gio.FileCopyFlags.NOFOLLOW_SYMLINKS
| Gio.FileCopyFlags.OVERWRITE,
GLib.PRIORITY_DEFAULT,
cancellable,
on_progress,
(),
on_file_copied,
None,
)
def on_cancel():
...
def on_progress(self, current_num_bytes: int, total_num_bytes: int):
...
def on_file_copied(self, file: Gio.File, res: Gio.AsyncResult, _):
pending.remove(file)
if len(pending) == 0:
on_batch_done()
else
...
def on_batch_done():
# Do things after files have been copied
...
def copy_files():
files = []
cancellable = Gio.Cancallable()
for file in files:
copy_file(file, cancellable)
起初它是可以管理的,但随着代码变得越来越复杂,使用 Gio 的 *_async 函数将整个事情变成了回调地狱,并使代码更难推理。
所以我决定使用 Python 的 async/await 将回调转换为可等待的协程,就像我以前在 Kotlin 中所做的那样:
class TransfertTask(GObject.Object):
def __init__(
self,
destination: str,
file: Gio.File,
cancellable: Gio.Cancellable,
loop: AbstractEventLoop,
):
...
super().__init__()
def start(self):
self.__cancellable.connect(self.__on_cancel)
self.__file.copy_async(
Gio.File.new_for_path(self.destination),
Gio.FileCopyFlags.ALL_METADATA
| Gio.FileCopyFlags.NOFOLLOW_SYMLINKS
| Gio.FileCopyFlags.OVERWRITE,
GLib.PRIORITY_DEFAULT,
self.__cancellable,
self.__on_progress,
(),
self.__on_file_copied,
None,
)
return self.__future
def __on_progress(self, current_num_bytes: int, total_num_bytes: int):
...
def __on_cancel(self):
self.__future.cancel()
def __on_file_copied(self, file: Gio.File, res: Gio.AsyncResult, _):
async def set_future_result():
self.__future.set_result(self)
self.__finished = True
run_coroutine_threadsafe(set_future_result(), self.__loop)
try:
file.copy_finish(res)
except GLib.Error as e:
self.error = e
async def copy_files():
cancellable = Gio.Cancellable()
loop = get_running_loop()
await gather(*[
TransfertTask("...", file, cancellable, loop).start()
for file in files
])
def start():
asyncio.run(copy_files())
问题是,现在代码会阻止 UI,直到文件被复制。为了恢复异步处理,我必须像这样在单独的线程中启动主协程:
def start():
loop = get_event_loop()
thr = Thread(target=loop.run_forever)
thr.daemon = True
thr.start()
run_coroutine_threadsafe(copy_files(), loop)
我对 asyncio 还很陌生,所以有些东西我还是不太明白。我希望await 或事件asyncio.run() 会阻塞,直到协程完成运行。但是我在互联网上找不到任何方法来告诉 Python“只需启动这个协程并继续前进,我不在乎结果”。也许 Python 做不到,我还是想太多,就像我在写 Kotlin。
谁能告诉我我做错了什么?
编辑: 扩展 Simon Hawe 的答案,这里的主要问题是我在大多数 asyncio 示例中找到的 asyncio.run() 函数,它等待等待的协程。您可以在 Internet 上找到的所有示例都是 您可以控制程序的入口点的示例,这不是我的情况。在 Rhythmbox 中,您必须像这样定义插件:
from gi.repository import GObject, RB, Peas
class SomePlugin (GObject.Object, Peas.Activatable):
object = GObject.property(type=GObject.Object)
def __init__(self):
super().__init__()
def do_activate(self):
...
def do_deactivate(self):
...
然后创建类并通过 Rhythmbox 执行do_activate() 方法。
如您所见,我无法控制程序的入口点,因此没有正确的位置可以调用 asyncio.run() 并且它不会阻塞 UI。
也许只是我不知道如何询问搜索引擎,但我很惊讶以前似乎没有人遇到过这个用例。
【问题讨论】:
标签: python asynchronous async-await python-asyncio pygobject