【问题标题】:How can I synchronize asyncio with other OS threads?如何将 asyncio 与其他操作系统线程同步?
【发布时间】:2019-04-09 00:45:23
【问题描述】:

我有一个带有一个主线程的程序,我在其中生成了第二个使用 asyncio 的线程。是否提供了任何工具来同步这两个线程?如果一切都是异步的,我可以用它的同步原语来做,例如:

import asyncio

async def taskA(lst, evt):
    print(f'Appending 1')
    lst.append(1)
    evt.set()

async def taskB(lst, evt):
    await evt.wait()
    print('Retrieved:', lst.pop())

lst = []
evt = asyncio.Event()
asyncio.get_event_loop().run_until_complete(asyncio.gather(
    taskA(lst, evt),
    taskB(lst, evt),
))

但是,这不适用于多线程。如果我只使用threading.Event,那么它将阻塞异步线程。我想我可以将等待推迟到执行人:

import asyncio
import threading

def taskA(lst, evt):
    print(f'Appending 1')
    lst.append(1)
    evt.set()

async def taskB(lst, evt):
    asyncio.get_event_loop().run_in_executor(None, evt.wait)
    print('Retrieved:', lst.pop())

def targetA(lst, evt):
    taskA(lst, evt)

def targetB(lst, evt):
    asyncio.set_event_loop(asyncio.new_event_loop())
    asyncio.get_event_loop().run_until_complete(taskB(lst, evt))

lst = []
evt = threading.Event()
threadA = threading.Thread(target=targetA, args=(lst, evt))
threadB = threading.Thread(target=targetB, args=(lst, evt))
threadA.start()
threadB.start()
threadA.join()
threadB.join()

但是,让执行线程只等待互斥锁似乎不自然。这是应该这样做的方式吗?或者有没有其他方法可以异步等待操作系统线程之间的同步?

【问题讨论】:

  • 在 taskB 中等待 asyncio.Event,然后使用 loop.call_soon_threadsafe 从 taskA 设置它。
  • @user4815162342 这是一个合理的选择。感觉有点像我在颠倒逻辑,理想情况下我希望主线程不必直接处理另一个线程的异步循环,但是是的,这可能适用于我认为的情况。
  • @user4815162342 我用你的建议解决了我的问题。因为我想要“函数语义”(在主线程中处理这些数据并返回一些结果),所以我使用 queue.Queue 将期货从 asyncio 线程发送到主线程,并使用 call_soon_threadsafe 从主线程。随意发布它作为接受的答案。
  • 为此目的使用futures绝对是要走的路——理想情况下taskB会创建两个future,一个concurrent.futures和一个asyncio,然后连接它们,将并发发送给taskA,然后等待异步的。 run_in_executor 实现了相当通用的未来链接,可以为此重用,但我不确定其中是否是公开的。
  • 我现在已经发布了带有原始评论的答案,但也是一种替代方法,它应该消除对显式队列的需求。

标签: python multithreading synchronization python-asyncio


【解决方案1】:

将 asyncio 协程与来自另一个线程的事件同步的一种简单方法是在 taskB 中等待 asyncio.Event,然后使用 loop.call_soon_threadsafe 从 taskA 设置它。

为了能够在两者之间传递值和异常,可以使用futures;但是,您正在发明run_in_executor 的大部分内容。如果 taskA 的唯一工作是将任务从队列中取出,那么您不妨创建一个单工作“池”并将其用作工作线程。然后您可以按预期使用run_in_executor

worker = concurrent.futures.ThreadPoolExecutor(max_workers=1)

async def taskB(lst):
    loop = asyncio.get_event_loop()
    # or result = await ..., if taskA has a useful return value
    # This will also propagate exceptions raised by taskA
    await loop.run_in_executor(worker, taskA, lst)
    print('Retrieved:', lst.pop())

语义与带有显式队列的版本相同 - 队列仍然存在,它就在 ThreadPoolExecutor 内。

【讨论】:

  • 谢谢,这也是个好主意,而且肯定更干净。但是在我的情况下,我可能不得不编写自己的执行程序...上下文是虚幻引擎的 Python 脚本,它有一个主线程,可以在每一帧上“标记”对象,我想要一个异步线程有时会发送要在下一个 UE “tick” 上执行的东西(因为需要在主线程上完成一些东西)。无论如何,我必须用你的建议来解决它,这个解决方案的想法也很好。
  • @jdehesa 够公平的。感谢您提出有趣的问题!
猜你喜欢
  • 2022-06-15
  • 2023-03-06
  • 2011-05-24
  • 2011-02-28
  • 1970-01-01
  • 2011-11-17
  • 1970-01-01
  • 2017-11-30
  • 2011-06-07
相关资源
最近更新 更多