好的,假设你有这两个任务:
import time
def cpu_operation(n):
print('Start CPU', n)
for x in range(100):
time.sleep(0.01)
print('End CPU', n)
return n
def expensive_gpu_operation(n):
print('Start GPU', n)
time.sleep(0.3)
print('Stop GPU', n)
return n
现在运行它们的方法如下:
def slow():
results = []
for task in range(5):
cpu_result = cpu_operation(task)
gpu_result = expensive_gpu_operation(cpu_result)
results.append(gpu_result)
return results
我们按顺序运行这些 - CPU、GPU、CPU、GPU...输出如下:
Start CPU 0
End CPU 0
Start GPU 0
Stop GPU 0
Start CPU 1
End CPU 1
Start GPU 1
Stop GPU 1
Start CPU 2
End CPU 2
Start GPU 2
Stop GPU 2
Start CPU 3
End CPU 3
Start GPU 3
Stop GPU 3
Start CPU 4
End CPU 4
Start GPU 4
Stop GPU 4
假设我们可以通过在 GPU 任务 X 完成之前启动 CPU 任务 X+1 来节省一些时间,以便 CPU X+1 和 GPU X 并行运行,对吗?
(我们不能并行运行 CPU X 和 GPU X,因为 GPU X 需要来自 CPU X 输出的输入,因此 +1。)
让我们使用线程!基本上我们想做这样的事情:
- 启动 CPU N,等待它完成
- 等待 GPU N-1 完成,在后台启动 GPU N
所以我们得到了一些并行性。实现它的最简单方法是具有 1 个线程的线程池 - 它可以像队列一样工作。在每个循环中,我们只安排一个任务并存储async_result。完成后,我们将能够检索所有结果。
Incidentally, Python has a thread pool implementation in the multiprocessing module.
from multiprocessing.pool import ThreadPool
def quick():
pool = ThreadPool(processes=1)
results = []
for task in range(5):
cpu_result = cpu_operation(task)
# schedule next GPU operation in background,
# store the async_result instance for this operation
async_result = pool.apply_async(expensive_gpu_operation, (cpu_result, ))
results.append(async_result)
# The results are ready! (Well, the last one probably isn't yet,
# but get() will wait for it
return [x.get() for x in results]
现在输出变成:
Start CPU 0
End CPU 0
Start CPU 1
Start GPU 0
Stop GPU 0
End CPU 1
Start CPU 2
Start GPU 1
Stop GPU 1
End CPU 2
Start CPU 3
Start GPU 2
Stop GPU 2
End CPU 3
Start CPU 4
Start GPU 3
Stop GPU 3
End CPU 4
Start GPU 4
Stop GPU 4
我们可以观察到并行性!
请注意,当expensive_gpu_operation 被调度时,它实际上直到time.sleep 在下一个CPU 操作中才会运行。这是由于全局解释器锁定 - 主线程必须在工作线程有机会做某事之前放弃 GIL,这发生在 time.sleep(),在你的情况下,我希望它会在你做的时候发生一些 i/o - 开始读取下一批图像。