我想我会安排它与你正在做的相反。也就是说,我将创建一个特定大小的线程池来负责产生结果。提交到此池的任务将作为参数传递给处理器池,工作线程可以使用该处理器池来提交受 CPU 限制的工作部分。换句话说,线程池工作人员将主要执行所有与磁盘相关的操作,并将任何 CPU 密集型工作移交给处理器池。
处理器池的大小应该就是您环境中的处理器数量。很难给线程池一个精确的大小;这取决于在收益递减定律发挥作用之前它可以处理多少并发磁盘操作。它还取决于您的内存:池越大,占用的内存资源就越大,尤其是在必须将整个文件读入内存进行处理的情况下。因此,您可能必须尝试使用此值。下面的代码概述了这些想法。您从线程池中获得的 I/O 操作的重叠比您仅使用小型处理器池所能达到的效果要大:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from functools import partial
import os
def cpu_bound_function(arg1, arg2):
...
return some_result
def io_bound_function(process_pool_executor, file_name):
with open(file_name, 'r') as f:
# Do disk related operations:
. . . # code omitted
# Now we have to do a CPU-intensive operation:
future = process_pool_executor.submit(cpu_bound_function, arg1, arg2)
result = future.result() # get result
return result
file_list = [file_1, file_2, file_n]
N_FILES = len(file_list)
MAX_THREADS = 50 # depends on your configuration on how well the I/O can be overlapped
N_THREADS = min(N_FILES, MAX_THREADS) # no point in creating more threds than required
N_PROCESSES = os.cpu_count() # use the number of processors you have
with ThreadPoolExecutor(N_THREADS) as thread_pool_executor:
with ProcessPoolExecutor(N_PROCESSES) as process_pool_executor:
results = thread_pool_executor.map(partial(io_bound_function, process_pool_executor), file_list)
重要提示:
另一种更简单的方法是只使用一个处理器池,其大小大于您拥有的 CPU 处理器数量,例如 25 个。工作进程将同时执行 I/O和 CPU 操作。即使您的进程多于 CPU,许多进程仍将处于等待 I/O 完成的等待状态,从而允许 CPU 密集型工作运行。
这种方法的缺点是创建 N 个进程的开销远大于创建 N 个线程 + 少量进程的开销。但是,随着提交到池的任务的运行时间变得越来越大,这种增加的开销在总运行时间中所占的百分比越来越小。因此,如果您的任务不是微不足道的,那么这可能是一个相当高效的简化。
更新:两种方法的基准
我针对这两种处理 24 个大小约为 10,000KB 的文件的方法做了一些基准测试(实际上,这只是 3 个不同的文件,每个文件处理了 8 次,因此可能已经做了一些缓存):
方法一(线程池+处理器池)
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from functools import partial
import os
from math import sqrt
import timeit
def cpu_bound_function(b):
sum = 0.0
for x in b:
sum += sqrt(float(x))
return sum
def io_bound_function(process_pool_executor, file_name):
with open(file_name, 'rb') as f:
b = f.read()
future = process_pool_executor.submit(cpu_bound_function, b)
result = future.result() # get result
return result
def main():
file_list = ['/download/httpd-2.4.16-win32-VC14.zip'] * 8 + ['/download/curlmanager-1.0.6-x64.exe'] * 8 + ['/download/Element_v2.8.0_UserManual_RevA.pdf'] * 8
N_FILES = len(file_list)
MAX_THREADS = 50 # depends on your configuration on how well the I/O can be overlapped
N_THREADS = min(N_FILES, MAX_THREADS) # no point in creating more threds than required
N_PROCESSES = os.cpu_count() # use the number of processors you have
with ThreadPoolExecutor(N_THREADS) as thread_pool_executor:
with ProcessPoolExecutor(N_PROCESSES) as process_pool_executor:
results = list(thread_pool_executor.map(partial(io_bound_function, process_pool_executor), file_list))
print(results)
if __name__ == '__main__':
print(timeit.timeit(stmt='main()', number=1, globals=globals()))
方法 2(仅限处理器池)
from concurrent.futures import ProcessPoolExecutor
from math import sqrt
import timeit
def cpu_bound_function(b):
sum = 0.0
for x in b:
sum += sqrt(float(x))
return sum
def io_bound_function(file_name):
with open(file_name, 'rb') as f:
b = f.read()
result = cpu_bound_function(b)
return result
def main():
file_list = ['/download/httpd-2.4.16-win32-VC14.zip'] * 8 + ['/download/curlmanager-1.0.6-x64.exe'] * 8 + ['/download/Element_v2.8.0_UserManual_RevA.pdf'] * 8
N_FILES = len(file_list)
MAX_PROCESSES = 50 # depends on your configuration on how well the I/O can be overlapped
N_PROCESSES = min(N_FILES, MAX_PROCESSES) # no point in creating more threds than required
with ProcessPoolExecutor(N_PROCESSES) as process_pool_executor:
results = list(process_pool_executor.map(io_bound_function, file_list))
print(results)
if __name__ == '__main__':
print(timeit.timeit(stmt='main()', number=1, globals=globals()))
结果:
(我有 8 个核心)
线程池 + 处理器池:13.5 秒
仅处理器池:13.3 秒
结论:我会首先尝试更简单的方法,即对所有内容都使用处理器池。现在棘手的一点是确定要创建的最大进程数,这是您最初问题的一部分,当它所做的只是 CPU 密集型计算时,它有一个简单的答案。如果您正在阅读的文件数量不是太多,那么这一点是没有意义的;每个文件可以有一个进程。但是,如果您有数百个文件,您将不希望在池中拥有数百个进程(您可以创建多少个进程也有上限,并且还有那些令人讨厌的内存限制)。我无法给你一个确切的数字。如果您确实有大量文件,请从较小的池大小开始并不断增加,直到您没有进一步的好处(当然,您可能不希望处理超过这些测试的最大数量的文件,否则您将永远运行,只是为实际运行确定一个合适的池大小)。