更新
如果你不想炸毁内存,你需要做 3 件事:
- 您需要有一个 iterable 来生成您的值,并传递给
dummy_func 以增量方式生成值。 itertools.product 实际上在产生第一个值之前会在内存中生成所有值,因此无论您做什么,它都会炸毁内存。
- 您必须使用一个函数来逐个处理 iterable,并为每个结果将结果附加到使用合适的非零 maxlen 参数初始化的
deque .您当前的代码正在使用 map 函数的完整输出初始化 deque,该函数将具有传递的 iterable 的长度。这会破坏记忆。
- 即使您为工作函数
dummy_func 生成值,增量使用 imap,您生成任务的速度也可能比生成结果的速度快,因此池的输入队列将继续增长,您将内存爆炸。
为了克服1中描述的问题。我正在使用permutations生成器函数。
为了克服 2 中描述的问题。我使用 maxlen=10 初始化了一个空双端队列。由于每个值都是从dumy_func 返回的,所以我会将其附加到双端队列。
要克服 3. 中描述的问题,您需要使用 BoundedQueueProcessPool 或 BoundedQueueThreadPool 类。它使用imap 方法提交带有回调函数的新任务来处理结果。它与标准池函数的不同之处在于,一旦输入队列大小达到池中的进程数或线程数视情况而定(您可以手动指定最大队列大小),它默认会阻止主线程提交更多任务max_waiting_tasks 参数):
import multiprocessing.pool
import multiprocessing
import threading
from functools import wraps, partial
name = 'bounded_pool'
class ImapResult():
def __init__(self, semaphore, result):
self.semaphore = semaphore
self.it = result.__iter__()
def __iter__(self):
return self
def __next__(self):
try:
elem = self.it.__next__()
self.semaphore.release()
return elem
except StopIteration:
raise
except:
self.semaphore.release()
raise
class BoundedQueuePool:
def __init__(self, semaphore):
self.semaphore = semaphore
def release(self, result, callback=None):
self.semaphore.release()
if callback:
callback(result)
def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None):
self.semaphore.acquire()
callback_fn = self.release if callback is None else partial(self.release, callback=callback)
error_callback_fn = self.release if error_callback is None else partial(self.release, callback=error_callback)
return super().apply_async(func, args, kwds, callback=callback_fn, error_callback=error_callback_fn)
def imap(self, func, iterable, chunksize=1):
def new_iterable(iterable):
for elem in iterable:
self.semaphore.acquire()
yield elem
result = super().imap(func, new_iterable(iterable), chunksize)
return ImapResult(self.semaphore, result)
def imap_unordered(self, func, iterable, chunksize=1):
def new_iterable(iterable):
for elem in iterable:
self.semaphore.acquire()
yield elem
result = super().imap_unordered(func, new_iterable(iterable), chunksize)
return ImapResult(self.semaphore, result)
class BoundedQueueProcessPool(BoundedQueuePool, multiprocessing.pool.Pool):
def __init__(self, *args, max_waiting_tasks=None, **kwargs):
multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
if max_waiting_tasks is None:
max_waiting_tasks = self._processes
elif max_waiting_tasks < 0:
raise ValueError(f'Invalid negative max_waiting_tasks value: {max_waiting_tasks}')
BoundedQueuePool.__init__(self, multiprocessing.BoundedSemaphore(self._processes + max_waiting_tasks))
class BoundedQueueThreadPool(BoundedQueuePool, multiprocessing.pool.ThreadPool):
def __init__(self, *args, max_waiting_tasks=None, **kwargs):
multiprocessing.pool.ThreadPool.__init__(self, *args, **kwargs)
if max_waiting_tasks is None:
max_waiting_tasks = self._processes
elif max_waiting_tasks < 0:
raise ValueError(f'Invalid negative max_waiting_tasks value: {max_waiting_tasks}')
BoundedQueuePool.__init__(self, threading.BoundedSemaphore(self._processes + max_waiting_tasks))
def threadpool(pool):
def decorate(f):
@wraps(f)
def wrap(*args, **kwargs):
return pool.apply_async(f, args, kwargs)
return wrap
return decorate
def processpool(pool):
def decorate(f):
@wraps(f)
def wrap(*args, **kwargs):
return pool.apply_async(f, args, kwargs)
return wrap
return decorate
##################################################################
import queue
from itertools import permutations
def dummy_func(values, keys):
#print( dict(zip(keys, values)))
...
return dict(zip(keys, values))
def main():
num_threads = multiprocessing.cpu_count()
parameters = {'a': ['7.0', '9.0', '11.0'], 'b': ['125p', '200p', '275p'],
'c': ['320n', '440n', '560n'], 'd': ['400p', '500p', '600p'],
'e': ['262p', '374p', '486p'], 'f': ['13p', '25p', '37p'],
'g': ['19p', '40p', '61p'], 'h': ['7p', '16p', '22p'],
'i': ['7p', '16p', '22p'],
'j': ['0.7200000000000004', '1.1500000000000008', '1.5700000000000012'],
'k': ['4', '8', '11'], 'l': ['41', '77', '113'], 'm': ['4', '8', '11'],
'n': ['16p', '31p', '46p'], 'o': ['20n', '30n', '35n']
}
# A more reasonably sized parameters:
parameters = {'a': ['7.0', '9.0', '11.0'], 'b': ['125p', '200p', '275p'],
'c': ['320n', '440n', '560n'], 'd': ['400p', '500p', '600p'],
'e': ['262p', '374p', '486p'], 'f': ['13p', '25p', '37p'],
'g': ['19p', '40p', '61p'], 'h': ['7p', '16p', '22p'],
'i': ['7p', '16p', '22p'],
'j': ['0.7200000000000004', '1.1500000000000008', '1.5700000000000012'],
}
keys = list(parameters)
# process simulations for all permutations using single process
#for values in itertools.product(*map(parameters.get, keys)):
# dummy_func(values, keys)
q = queue.deque(maxlen=10)
pool = BoundedQueueThreadPool(num_threads)
for v in pool.imap(partial(dummy_func, keys=keys), permutations(parameters.values(), len(keys))):
q.append(v)
return q
if __name__ == '__main__':
import time
t = time.time()
q = main()
print(q)
print(time.time() - t)