【问题标题】:Python: asyncio loops with threadsPython:带线程的异步循环
【发布时间】:2019-03-23 19:55:16
【问题描述】:

您能否告诉我这是否是在自己的线程中构建多个独立异步循环的正确方法?

def init():
    print("Initializing Async...")
    global loop_heavy
    loop_heavy = asyncio.new_event_loop()
    start_loop(loop_heavy)

def start_loop(loop):
    thread = threading.Thread(target=loop.run_forever)
    thread.start()

def submit_heavy(task):
    future = asyncio.run_coroutine_threadsafe(task, loop_heavy)
    try:
        future.result()
    except Exception as e:
        print(e)

def stop():
    loop_heavy.call_soon_threadsafe(loop_heavy.stop)

async def heavy():
    print("3. heavy start %s" % threading.current_thread().name)
    await asyncio.sleep(3) # or await asyncio.sleep(3, loop=loop_heavy)
    print("4. heavy done")

然后我正在测试它:

if __name__ == "__main__":
    init()
    print("1. submit heavy: %s" % threading.current_thread().name)
    submit_heavy(heavy())
    print("2. submit is done")
    stop()

我期待看到1->3->2->4,但实际上是1->3->4->2

Initializing Async...
1. submit heavy: MainThread
3. heavy start Thread-1
4. heavy done
2. submit is done

我认为我在理解异步和线程方面遗漏了一些东西。
线程不同。我为什么要在MainThread 里面等到Thread-1 里面的工作完成?

【问题讨论】:

    标签: python-3.x python-asyncio python-multithreading


    【解决方案1】:

    我为什么要在MainThread 里面等到Thread-1 里面的工作完成?

    好问题,你为什么?

    一个可能的答案是,因为您实际上想要阻塞当前线程直到作业完成。这是将事件循环放在另一个线程中并使用run_coroutine_threadsafe 的原因之一。

    另一个可能的答案是,如果您不想要,则不必这样做。您可以简单地从submit_heavy() 返回由run_coroutine_threadsafe 返回的concurrent.futures.Future 对象,然后让调用者自行等待结果(或check if one is ready)。

    最后,如果您的目标只是“在后台”运行常规函数(不阻塞当前线程),那么您可能根本不需要 asyncio。看看concurrent.futures 模块,它的ThreadPoolExecutor 允许您轻松地将函数提交到线程池并让它在无人协助的情况下执行。

    【讨论】:

    • 我不想屏蔽MainThread。如果我将 Futurerun_coroutine_threadsafe 返回到 MainThread - MainThread 将被阻止,直到 Future 完成,不是吗?
    • @IgorZ 不,不会,这就是Future 的重点。 (请注意不要将 asyncio Futures 与 concurrent.future Futures 混淆,尽管在这方面它们是相同的。)当您手头有一个未来对象时,您可以选择是否以及何时阻止等待其结果.
    【解决方案2】:

    我将添加从 asyncio 文档中找到的一种可能的解决方案。
    我不确定这是不是正确的方法,但它可以按预期工作(MainThread 不会被子线程的执行阻塞)

    Running Blocking Code
    不应直接调用阻塞(CPU 绑定)代码。例如,如果一个函数执行 1 秒的 CPU 密集型计算,则所有并发的 asyncio 任务和 IO 操作都会延迟 1 秒。
    执行器可用于在不同的线程甚至不同的进程中运行任务,以避免阻塞阻塞 OS 线程与事件循环。有关详细信息,请参阅loop.run_in_executor() 方法。

    应用于我的代码:

    import asyncio
    import threading
    import concurrent.futures
    import multiprocessing
    import time
    
    def init():
        print("Initializing Async...")
    
        global loop, thread_executor_pool
    
        thread_executor_pool = concurrent.futures.ThreadPoolExecutor(max_workers=multiprocessing.cpu_count())
        loop = asyncio.get_event_loop()
    
        thread = threading.Thread(target=loop.run_forever)
        thread.start()
    
    def submit_task(task, *args):
        loop.run_in_executor(thread_executor_pool, task, *args)
    
    def stop():
        loop.call_soon_threadsafe(loop.stop)
        thread_executor_pool.shutdown()
    
    def blocked_task(msg1, msg2):
        print("3. task start msg: %s, %s, thread: %s" % (msg1, msg2, threading.current_thread().name))
        time.sleep(3)
        print("4. task is done -->")
    
    if __name__ == "__main__":
        init()
        print("1. --> submit task: %s" % threading.current_thread().name)
        submit_task(blocked_task, "a", "b")
    
        print("2. --> submit is done")
        stop()
    

    输出:

    Initializing Async...
    
    1. --> submit task: MainThread
    3. task start msg: a, b, thread: ThreadPoolExecutor-0_0
    2. --> submit is done
    4. task is done  -->
    

    如果仍有错误,请纠正我,或者可以用其他方式完成。

    【讨论】:

    • 这是不正确的,因为run_in_executor 返回一个您应该等待的未来,而您的代码永远不会这样做。如果您只想“在后台”运行常规函数(不阻塞当前线程),也许您根本不需要 asyncio。看看concurrent.futures,它允许您将函数提交到线程池并让它在无人协助的情况下执行。
    • 是的,我认为你是对的。它更像是在后台执行任务而不使用异步。感谢您的通知。
    • 我现在更新了我的答案,以涵盖根本不使用 asyncio 的选项。
    猜你喜欢
    • 2018-09-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-10-17
    • 2019-02-17
    • 2015-04-18
    • 2017-07-18
    • 2017-07-07
    相关资源
    最近更新 更多