【发布时间】:2018-10-04 20:18:18
【问题描述】:
我正在使用来自
的map()函数
from concurrent.futures import ProcessPoolExecutor
为了做一个简单的数据并行化。
我要处理400个文件,使用map()对其调用处理函数。
infiles = glob.glob(os.path.join(input_path, '**/*.xls'), recursive=True) + glob.glob(os.path.join(input_path, '**/*.xlsx'), recursive=True)
outfiles = [os.path.join(os.path.dirname(infile), os.path.basename(infile).split('.')[0]+'.csv') for infile in infiles]
with ProcessPoolExecutor(max_workers=None) as executor:
executor.map(excel2csv, infiles, outfiles)
所以应该为每个文件调用excel2csv(),并传递其所需的输入和输出路径。它将独立处理每个文件,将结果写入磁盘,并且不返回任何内容。
大约 100 个文件后,应用程序抛出异常,抱怨队列已满。
Exception in thread Thread-1:
Traceback (most recent call last):
File "/home/mapa17/miniconda3/envs/pygng/lib/python3.5/threading.py", line 914, in _bootstrap_inner
self.run()
File "/home/mapa17/miniconda3/envs/pygng/lib/python3.5/threading.py", line 862, in run
self._target(*self._args, **self._kwargs)
File "/home/mapa17/miniconda3/envs/pygng/lib/python3.5/concurrent/futures/process.py", line 295, in _queue_management_worker
shutdown_worker()
File "/home/mapa17/miniconda3/envs/pygng/lib/python3.5/concurrent/futures/process.py", line 253, in shutdown_worker
call_queue.put_nowait(None)
File "/home/mapa17/miniconda3/envs/pygng/lib/python3.5/multiprocessing/queues.py", line 129, in put_nowait
return self.put(obj, False)
File "/home/mapa17/miniconda3/envs/pygng/lib/python3.5/multiprocessing/queues.py", line 83, in put
raise Full
queue.Full
我发现的最相似的问题在here讨论。
但在我的情况下,传递给 worker 函数的数据很少(包含两个字符串)。检查票价大于 400 的默认队列大小(来自 _multiprocessing.SemLock.SEM_VALUE_MAX)。
有什么想法吗? 谢谢
【问题讨论】:
标签: python queue multiprocessing