【问题标题】:Python Asyncio/Trio for Asynchronous Computing/Fetching用于异步计算/获取的 Python Asyncio/Trio
【发布时间】:2021-12-22 19:08:29
【问题描述】:

我正在寻找一种方法来有效地从磁盘中获取一大块值,然后对该块执行计算/计算。我的想法是一个 for 循环,它首先运行磁盘获取任务,然后对获取的数据运行计算。我想让我的程序在运行计算时获取下一批,这样我就不必在每次计算完成时等待另一个数据获取。我预计计算将比从磁盘获取数据花费更长的时间,并且由于单个计算任务已经将 cpu 使用率固定在接近 100%,因此可能无法真正并行完成。

我在 python 中使用 trio 提供了一些代码(但也可以与 asyncio 一起使用以达到相同的效果)来说明我在使用异步编程执行此操作时的最佳尝试:

import trio
import numpy as np
from datetime import datetime as dt
import time

testiters=10
dim = 6000


def generateMat(arrlen):
    for _ in range(30):
        retval= np.random.rand(arrlen, arrlen)
    # print("matrix generated")
    return retval

def computeOpertion(matrix):
    return np.linalg.inv(matrix)


def runSync():
    for _ in range(testiters):
        mat=generateMat(dim)
        result=computeOpertion(mat)
    return result

async def matGenerator_Async(count):
    for _ in range(count):
        yield generateMat(dim)

async def computeOpertion_Async(matrix):
    return computeOpertion(matrix)

async def runAsync():
    async with trio.open_nursery() as nursery:
        async for value in matGenerator_Async(testiters): 
            nursery.start_soon(computeOpertion_Async,value)
            #await computeOpertion_Async(value)

            

print("Sync:")
start=dt.now()
runSync()
print(dt.now()-start)

print("Async:")
start=dt.now()
trio.run(runAsync)
print(dt.now()-start)

此代码将通过生成 30 个随机矩阵来模拟从磁盘获取数据,这会使用少量 cpu。然后它将对生成的矩阵执行矩阵求逆,该矩阵使用 100% cpu(在 numpy 中使用 openblas/mkl 配置)。我通过计时同步和异步操作来比较运行任务所花费的时间。

据我所知,两个作业完成的时间完全相同,这意味着异步操作并没有加快执行速度。观察每次计算的行为,顺序操作按顺序运行提取和计算,异步操作先运行所有提取,然后再运行所有计算。

有没有办法使用异步获取和计算?也许有期货或类似收集()的东西? Asyncio 具有这些功能,而 trio 将它们放在单独的包 trio_future 中。我也对通过其他方法(线程和多处理)的解决方案持开放态度。

我相信可能存在一种多处理解决方案,可以使磁盘读取操作在单独的进程中运行。但是,进程间通信和阻塞就变得很麻烦,因为由于内存限制,我需要某种信号量来控制一次可以生成多少块,并且多处理往往非常繁重和缓慢。

编辑

感谢 VPfB 的回答。我无法在操作中sleep(0),但我认为即使我这样做了,它也必然会阻止计算以执行磁盘操作。我认为这可能是 python 线程和 asyncio 的硬限制,它一次只能执行 1 个线程。如果两个不同的进程都需要等待一些外部资源从你的 CPU 响应,那么同时运行两个不同的进程是不可能的。

也许有一种方法可以使用 executor 来实现多处理池。我在下面添加了以下代码:

import asyncio
import concurrent.futures

async def asynciorunAsync():
    loop = asyncio.get_running_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:    
         async for value in matGenerator_Async(testiters):              
            result = await loop.run_in_executor(pool, computeOpertion,value)


print("Async with PoolExecutor:")
start=dt.now()
asyncio.run(asynciorunAsync())
print(dt.now()-start)

尽管计时,它仍然需要与同步示例相同的时间。我认为我将不得不采用更复杂的解决方案,因为似乎 async 和 await 是一种过于粗糙的工具,无法正确执行此类任务切换。

【问题讨论】:

    标签: python asynchronous multiprocessing python-asyncio python-trio


    【解决方案1】:

    我不使用三重奏,我的回答是基于异步的。

    在这种情况下,我看到的提高异步性能的唯一方法是将计算分成更小的部分,并在它们之间插入await sleep(0)。这将允许数据获取任务运行。

    Asyncio 使用协作调度。一个同步的 CPU 绑定例程不合作,它在运行时会阻塞其他所有内容。

    sleep() 总是暂停当前任务,允许其他任务运行。

    将延迟设置为 0 可提供优化路径以允许其他任务 跑步。长时间运行的函数可以使用它来避免阻塞 整个函数调用期间的事件循环。

    (引自:asyncio.sleep


    如果不可能,请尝试在executor 中运行计算。这为原本纯 asyncio 代码添加了一些多线程功能。

    【讨论】:

      【解决方案2】:

      异步 ​​I/O 的意义在于,在网络 I/O 很多但实际计算(或磁盘 I/O)很少的情况下,可以轻松编写程序。这适用于任何异步库(Trio 或 asyncio)甚至不同的语言(例如 C++ 中的 ASIO)。因此,理想情况下,您的程序不适合异步 I/O!您将需要使用多个线程(或进程)。不过,公平地说,包括 Trio 在内的异步 I/O 对于协调线程上的工作很有用,而且这在您的情况下可能会很好用。

      正如 VPfB 的回答所说,如果您使用的是 asyncio,那么您可以使用执行程序,特别是传递给 loop.run_in_executor()ThreadPoolExecutor。对于 Trio,等效为 trio.to_thread.run_sync()(另请参阅 Trio 文档中的 Threads (if you must)),它更易于使用。在这两种情况下,您都可以await 结果,因此该函数在单独的线程中运行,而主 Trio 线程可以继续运行您的异步代码。您的代码最终会看起来像这样:

      async def matGenerator_Async(count):
          for _ in range(count):
              yield await trio.to_thread.run_sync(generateMat, dim)
      
      async def my_trio_main()
          async with trio.open_nursery() as nursery:
              async for matrix in matGenerator_Async(testiters):
                   nursery.start_soon(trio.to_thread.run_sync, computeOperation, matrix)
      
      trio.run(my_trio_main)
      

      计算函数(generateMatcomputeOperation)不需要异步。事实上,如果它们是有问题的,因为您不能再在单独的线程中运行它们。一般来说,只有在需要 await 某些东西时才创建一个函数 async 或使用 async withasync for

      从上面的例子可以看出如何将数据传递给另一个线程中运行的函数:只需将它们作为参数传递给trio.to_thread.run_sync(),它们就会作为参数传递给函数。从generateMat() 获取结果也很简单——另一个线程中调用的函数的返回值是从await trio.to_thread.run_sync() 返回的。获取computeOperation() 的结果比较棘手,因为它是在nursery 中调用的,所以它的返回值被丢弃了。您需要将可变参数传递给它(如dict)并将结果存储在其中。但是要注意线程安全;最简单的方法是向每个协程传递一个新对象,并且仅在 Nursery 完成后检查它们。

      一些你可能会忽略的最后脚注:

      • 需要明确的是,上面代码中的yield await 并不是某种特殊语法。只是await foo(),它在foo() 完成后返回一个值,然后是该值的yield
      • 您可以通过传递CapacityLimiter object 或通过查找默认值并设置计数来更改Trio 用于调用to_thread.run_sync() 的线程数。看起来默认值目前是 40,因此您可能希望将其调低一点,但这可能不是太重要。
      • 有一个普遍的说法是 Python 不支持线程,或者至少不能同时在多个线程中进行计算,因为它有一个全局锁(全局解释器锁,或 GIL)。这意味着您需要使用多个进程而不是线程,以便您的程序真正并行计算事物。确实,Python 中有一个 GIL,但只要您使用 numpy 之类的东西进行计算,那么它就不会阻止多线程有效地工作。
      • Trio 实际上对async file I/O 有很大的支持。但我认为这对您的情况没有帮助。

      【讨论】:

        【解决方案3】:

        为了补充我的其他答案(它使用 Trio,就像你问的那样),这里是如何做到这一点,只使用没有任何异步库的线程。最简单的方法是使用Future objectsThreadPoolExecutor

        futures = []
        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            for matrix in matGenerator(testiters):
                futures.append(executor.submit(computeOperation, matrix))
        results = [f.result() for f in futures]
        

        代码实际上与异步代码非常相似,但如果有的话,它更简单。如果你不需要做网络 I/O,你最好用这种方法。

        【讨论】:

          猜你喜欢
          • 2023-01-22
          • 2018-12-23
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2020-04-02
          • 2019-05-31
          相关资源
          最近更新 更多