【发布时间】:2015-04-07 17:56:32
【问题描述】:
我正在处理一些非常大的文件。
我可以使用 collections.Counter 集合计算特定字符串的出现次数,该集合使用 multiprocessing.BaseManager 子类在进程之间共享。
虽然我可以共享计数器并且看似腌制,但似乎没有正确腌制。我可以将字典复制到可以腌制的新字典中。
我正在尝试了解如何在选择共享计数器之前避免“复制”它。
这是我的(伪代码):
from multiprocessing.managers import BaseManager
from collections import Counter
class MyManager(BaseManager):
pass
MyManager.register('Counter', Counter)
def main(glob_pattern):
# function that processes files
def worker_process(files_split_to_allow_naive_parallelization, mycounterdict):
# code that loops through files
for line in file:
# code that processes line
my_line_items = line.split()
index_for_read = (my_line_items[0],my_line_items[6])
mycounterdict.update((index_for_read,))
manager = MyManager()
manager.start()
mycounterdict = manager.Counter()
# code to get glob files , split them with unix shell split and then chunk then
for i in range(NUM_PROCS):
p = multiprocessing.Process(target=worker_process , args = (all_index_file_tuples[chunksize * i:chunksize * (i + 1)],mycounterdict))
procs.append(p)
p.start()
# Now we "join" the processes
for p in procs:
p.join()
# This is the part I have trouble with
# This yields a pickled file that fails with an error
pickle.dump(mycounterdict,open("Combined_count_gives_error.p","wb"))
# This however works
# How can I avoid doing it this way?
mycopydict = Counter()
mydictcopy.update(mycounterdict.items())
pickle.dump(mycopydict,open("Combined_count_that_works.p","wb"))
当我尝试加载总是较小的固定大小的“腌制”错误文件时,我收到一个没有意义的错误。
如何在不通过上面伪代码中创建新字典的情况下腌制共享字典。
>>> p = pickle.load(open("Combined_count_gives_error.p"))
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 1378, in load
return Unpickler(file).load()
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 858, in load
dispatch[key](self)
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 880, in load_eof
raise EOFError
EOFError
【问题讨论】:
-
您的代码中存在语法错误:
mycounterdict.update((index_for_read,)。 -
您的方法很复杂。我不会使用共享的 Counter 对象,因为通信和同步开销实际上可能会限制单个工作进程的文件 I/O 性能。替代方法:让单个工作人员各自处理一个文件(或文件部分)(不要将原始数据发送给他们,让他们从文件系统读取数据),让每个工作人员创建一个 Counter 对象,将所有“子”Counter 对象发送到您的主进程(通过队列或管道),然后合并它们。事实上,我最近在这里提供了一个工作示例:stackoverflow.com/a/28097807/145400
-
@Jan-PhilipGehrcke 方法是您应该说服的方法——在进行多处理时切勿尝试拥有全局状态——保持共享数据对每个线程不可变。为了补充 Jan 的建议,我建议你先运行所有子进程,然后让你的主线程最后将它们合并在一起。
标签: python multiprocessing pickle python-multiprocessing