【问题标题】:Start async task now, await later现在开始异步任务,稍后等待
【发布时间】:2019-10-08 06:35:04
【问题描述】:

C# 程序员试图学习一些 Python。我正在尝试运行 CPU 密集型计算,同时让 IO 绑定异步方法在后台悄悄地消失。在 C# 中,我通常会设置 awaitable,然后启动 CPU 密集型代码,然后等待 IO 任务,然后合并结果。

这是我在 C# 中的做法

static async Task DoStuff() {
    var ioBoundTask = DoIoBoundWorkAsync();
    int cpuBoundResult = DoCpuIntensizeCalc();
    int ioBoundResult = await ioBoundTask.ConfigureAwait(false);

    Console.WriteLine($"The result is {cpuBoundResult + ioBoundResult}");
}

static async Task<int> DoIoBoundWorkAsync() {
    Console.WriteLine("Make API call...");
    await Task.Delay(2500).ConfigureAwait(false); // non-blocking async call
    Console.WriteLine("Data back.");
    return 1;
}

static int DoCpuIntensizeCalc() {
    Console.WriteLine("Do smart calc...");
    Thread.Sleep(2000);  // blocking call. e.g. a spinning loop
    Console.WriteLine("Calc finished.");
    return 2;
}

这是python中的等效代码

import time
import asyncio

async def do_stuff():
    ioBoundTask = do_iobound_work_async()
    cpuBoundResult = do_cpu_intensive_calc()
    ioBoundResult = await ioBoundTask
    print(f"The result is {cpuBoundResult + ioBoundResult}")

async def do_iobound_work_async(): 
    print("Make API call...")
    await asyncio.sleep(2.5)  # non-blocking async call
    print("Data back.")
    return 1

def do_cpu_intensive_calc():
    print("Do smart calc...")
    time.sleep(2)  # blocking call. e.g. a spinning loop
    print("Calc finished.")
    return 2

await do_stuff()

重要的是,请注意,CPU 密集型任务由无法等待的阻塞睡眠表示,IO 绑定任务由可等待的非阻塞睡眠表示。

在 C# 中运行需要 2.5 秒,在 Python 中运行需要 4.5 秒。不同之处在于 C# 立即运行异步方法,而 python 仅在遇到等待时才启动该方法。下面的输出证实了这一点。我怎样才能达到预期的结果。如果可能的话,将不胜感激能在 Jupyter Notebook 中运行的代码。

--- C# ---
Make API call...
Do smart calc...
Calc finished.
Data back.
The result is 3
--- Python ---
Do smart calc...
Calc finished.
Make API call...
Data back.
The result is 3

更新 1

受 knh190 的回答启发,我似乎可以使用asyncio.create_task(...) 到达那里。这达到了预期的结果(2.5 秒):首先,异步代码设置为运行;接下来,阻塞CPU代码同步运行;第三,等待异步代码;最后合并结果。为了让异步调用真正开始运行,我必须输入一个await asyncio.sleep(0),这感觉就像一个可怕的黑客攻击。我们可以在不这样做的情况下设置任务运行吗?一定有更好的办法……

async def do_stuff():
    task = asyncio.create_task(do_iobound_work_async())
    await asyncio.sleep(0)  #   <~~~~~~~~~ This hacky line sets the task running

    cpuBoundResult = do_cpu_intensive_calc()
    ioBoundResult = await task

    print(f"The result is {cpuBoundResult + ioBoundResult}")

【问题讨论】:

    标签: python async-await jupyter-notebook


    【解决方案1】:

    我认为您的测试几乎是不言自明的。 Python 中awaitasync 的前身是生成器(in Python 2)。 Python 只会创建一个协程,但在你显式调用它之前不会启动它。

    所以如果你想像 C# 一样立即触发协程,你需要将 await 线向前移动。

    async def do_stuff():
        ioBoundTask = do_iobound_work_async() # created a coroutine
        ioBoundResult = await ioBoundTask     # start the coroutine
        cpuBoundResult = do_cpu_intensive_calc()
        print(f"The result is {cpuBoundResult + ioBoundResult}")
    

    这相当于:

    def do_stuff():
        # create a generator based coroutine
        # cannot mix syntax of asyncio
        ioBoundTask = do_iobound_work_async()
        ioBoundResult = yield from ioBoundTask
        # whatever
    

    另请参阅此帖子:In practice, what are the main uses for the new "yield from" syntax in Python 3.3?


    我注意到您的 C# 和 Python 并非严格等效。 Python 中只有 asyncio.Task 是并发的:

    async def do_cpu_intensive_calc():
        print("Do smart calc...")
        await asyncio.sleep(2)
        print("Calc finished.")
        return 2
    
    # 2.5s
    async def do_stuff():
        task1 = asyncio.create_task(do_iobound_work_async())
        task2 = asyncio.create_task(do_cpu_intensive_calc())
    
        ioBoundResult = await task1
        cpuBoundResult = await task2
        print(f"The result is {cpuBoundResult + ioBoundResult}")
    

    现在执行时间应该是一样的。

    【讨论】:

    • stackoverflow.com/questions/36342899/… 表示应该使用asyncio.ensure_future
    • @DanD。它准确地说create_task 是有利的。官方文档也使用了create_task
    • 感谢您的回复,但这不是我想要做的 - 抱歉。 Python 和 C# 在 IO 代码中都有非阻塞睡眠,在 CPU 密集型调用中有阻塞睡眠。任何一种语言都不能等待 CPU 任务。 sleep 的目的是表示类似于积极嵌套循环的东西。如果它提高了清晰度,请将time.sleep(2) 替换为max(i % 3 for i in range(10000000))
    • @BigAL 阻塞调用自然不能与异步调用一起使用,据我所知。这就是为什么我们除了著名的requests(阻塞库)之外还有异步库aiohttpThis post 建议您可以使用 executor 调用阻塞函数,但我认为如果您的代码库很复杂,则无法保证并发。
    【解决方案2】:

    因此,通过更多研究,这似乎是可能的,但不像在 C# 中那么容易。 do_stuff() 的代码变为:

    async def do_stuff():
        task = asyncio.create_task(do_iobound_work_async())  # add task to event loop
        await asyncio.sleep(0)                               # return control to loop so task can start
        cpuBoundResult = do_cpu_intensive_calc()             # run blocking code synchronously
        ioBoundResult = await task                           # at last, we can await our async code
    
        print(f"The result is {cpuBoundResult + ioBoundResult}")
    

    相对于 C#,两个不同点是:

    1. asyncio.create_task(...) 需要将任务添加到正在运行的事件循环中
    2. await asyncio.sleep(0) 暂时将控制权返回给事件循环,以便它可以启动任务。

    完整的代码示例如下:

    import time
    import asyncio
    
    async def do_stuff():
        task = asyncio.create_task(do_iobound_work_async())  # add task to event loop
        await asyncio.sleep(0)                               # return control to loop so task can start
        cpuBoundResult = do_cpu_intensive_calc()             # run blocking code synchronously
        ioBoundResult = await task                           # at last, we can await our async code
    
        print(f"The result is {cpuBoundResult + ioBoundResult}")
    
    async def do_iobound_work_async(): 
        print("Make API call...")
        await asyncio.sleep(2.5)  # non-blocking async call. Hence the use of asyncio
        print("Data back.")
        return 1
    
    def do_cpu_intensive_calc():
        print("Do smart calc...")
        time.sleep(2)  # long blocking code that cannot be awaited. e.g. a spinning loop
        print("Calc finished.")
        return 2
    
    await do_stuff()
    

    我不太喜欢必须记住添加额外的await asyncio.sleep(0) 才能开始任务。拥有像begin_task(...) 这样的可等待函数可能会更简洁,它可以自动启动任务运行,以便可以在稍后阶段等待它。例如,如下所示:

    async def begin_task(coro):
        """Awaitable function that adds a coroutine to the event loop and sets it running."""
        task = asyncio.create_task(coro)
        await asyncio.sleep(0)
        return task
    
    async def do_stuff():
        io_task = await begin_task(do_iobound_work_async())
        cpuBoundResult = do_cpu_intensive_calc()
        ioBoundResult = await io_task
        print(f"The result is {cpuBoundResult + ioBoundResult}")
    

    【讨论】:

      猜你喜欢
      • 2020-03-15
      • 1970-01-01
      • 2014-09-06
      • 1970-01-01
      • 2014-03-18
      • 1970-01-01
      • 2014-06-28
      • 2013-02-10
      • 2017-07-22
      相关资源
      最近更新 更多