TLDR:
您可以使用多处理库并行运行 var 函数。但是,正如所写,您可能没有对var 进行足够的调用,因为它的开销导致多处理具有性能优势。如果您需要做的只是运行这两个调用,那么串行运行可能是您将获得的最快速度。但是,如果您需要进行大量调用,多处理可以帮助您。
我们需要使用进程池来并行运行它,线程不会在这里工作,因为 Python 的全局解释器锁会阻止我们实现真正的并行性。进程池的缺点是进程是重量级的。在仅对 var 运行两次调用的示例中,创建池的时间超过了运行 var 本身所花费的时间。
为了说明这一点,让我们使用进程池并使用 asyncio 并行运行对 var 的调用,并将其与仅按顺序运行的情况进行比较。请注意,运行此示例时,我使用了来自 Pysheds 库 https://github.com/mdbartos/pysheds/tree/master/data 的图像 - 如果您的图像更大,则以下可能不成立。
import functools
import time
from concurrent.futures.process import ProcessPoolExecutor
import asyncio
a = 'diem.tif'
xs = 10, 20, 30, 40, 50
ys = 10, 20, 30, 40, 50
async def main():
loop = asyncio.get_event_loop()
pool_start = time.time()
with ProcessPoolExecutor() as pool:
task_one = loop.run_in_executor(pool, functools.partial(var, a))
task_two = loop.run_in_executor(pool, functools.partial(var, a))
results = await asyncio.gather(task_one, task_two)
pool_end = time.time()
print(f'Process pool took {pool_end-pool_start}')
serial_start = time.time()
result_one = var(a)
result_two = var(a)
serial_end = time.time()
print(f'Running in serial took {serial_end - serial_start}')
if __name__ == "__main__":
asyncio.run(main())
在我的机器(2.4 GHz 8 核 Intel Core i9)上运行上述程序,我得到以下输出:
Process pool took 1.7581260204315186
Running in serial took 0.32335805892944336
在此示例中,进程池的速度要慢五倍以上!这是由于创建和管理多个进程的开销。也就是说,如果您需要多次调用var,则进程池可能更有意义。让我们修改它以运行 var 100 次并比较结果:
async def main():
loop = asyncio.get_event_loop()
pool_start = time.time()
tasks = []
with ProcessPoolExecutor() as pool:
for _ in range(100):
tasks.append(loop.run_in_executor(pool, functools.partial(var, a)))
results = await asyncio.gather(*tasks)
pool_end = time.time()
print(f'Process pool took {pool_end-pool_start}')
serial_start = time.time()
for _ in range(100):
result = var(a)
serial_end = time.time()
print(f'Running in serial took {serial_end - serial_start}')
运行 100 次,我得到以下输出:
Process pool took 3.442288875579834
Running in serial took 13.769982099533081
在这种情况下,在进程池中运行大约快 4 倍。您可能还希望尝试同时运行循环的每个迭代。您可以通过创建一个函数来执行此操作,该函数一次处理一个 x,y 坐标,然后在进程池中运行您要检查的每个点:
def process_poi(interest, x, y):
grid = Grid.from_raster(interest, data_name='map')
grid.catchment(data='map', x=x, y=y, out_name='catch')
variable = grid.view('catch', nodata=np.nan)
variable = np.array(variable)
return variable.mean()
async def var_loop_async(interest, pool, loop):
tasks = []
for (x,y) in zip(xs,ys):
function_call = functools.partial(process_poi, interest, x, y)
tasks.append(loop.run_in_executor(pool, function_call))
return await asyncio.gather(*tasks)
async def main():
loop = asyncio.get_event_loop()
pool_start = time.time()
tasks = []
with ProcessPoolExecutor() as pool:
for _ in range(100):
tasks.append(var_loop_async(a, pool, loop))
results = await asyncio.gather(*tasks)
pool_end = time.time()
print(f'Process pool took {pool_end-pool_start}')
serial_start = time.time()
在这种情况下,我得到Process pool took 3.2950568199157715 - 所以并不比我们的第一个版本快,每次调用var 一个进程。这可能是因为此时的限制因素是我们的 CPU 上有多少内核可用,将我们的工作分成更小的增量不会增加太多价值。
也就是说,如果您希望跨两个图像检查 1000 个 x 和 y 坐标,那么最后一种方法可能会带来性能提升。