【发布时间】:2022-11-12 21:42:49
【问题描述】:
我正在尝试加速受 CPU 限制的 Python 脚本(在 Windows11 上)。 Python 中的威胁似乎不在不同的 cpu(核心)上运行,所以我唯一的选择是多处理。
我有一个很大的字典数据结构(从文件加载后占用 11GB 内存),我正在检查计算值是否在该字典中。计算的输入也来自一个文件(大小为 100GB)。这个输入我可以分批池映射到进程,没问题。但是我不能将字典复制到所有进程,因为没有足够的内存。所以我需要找到一种方法让进程检查值(实际上是一个字符串)是否在字典中。
有什么建议吗?
伪程序流程:
--main--
- load dictionary structure from file # 11GB memory footprint
- ...
- While not all chuncks loaded
- Load chunk of calcdata from file # (10.000 lines per chunk)
- Distribute (map) calcdata-chunck to processes
- Wait for processes to complete all chunks
--process--
- for each element in subchunk
- perform calculation
- check if calculation in dictionary # here is my problem!
- store result in file
编辑,在下面实现 cmets 之后,我现在在:
def ReadDictFromFile()
cnt=0
print("Reading dictionary from " + dictfilename)
with open(dictfilename, encoding=("utf-8"), errors=("replace")) as f:
next(f) #skip first line (header)
for line in f:
s = line.rstrip("\n")
(key,keyvalue) = s.split()
shared_dict[str(key)]=keyvalue
cnt = cnt + 1
if ((cnt % 1000000) == 0): #log each 1000000 where we are
print(cnt)
return #temp to speed up testing, not load whole dictionary atm
print("Done loading dictionary")
def checkqlist(qlist)
try:
checkvalue = calculations(qlist)
(found, keyval) = InMem(checkvalue)
if (found):
print("FOUND!!! " + checkvalue + ' ' + keyvalue)
except Exception as e:
print("(" + str(os.getpid()) + ")Error log: %s" % repr(e))
time.sleep(15)
def InMem(checkvalue):
if(checkvalue in shared_dict):
return True, shared_dict[checkvalue]
else:
return False, ""
if __name__ == "__main__":
start_time = time.time()
global shared_dict
manager = Manager()
shared_dict = manager.dict()
ReadDictFromFile()
with open(filetocheck, encoding=("utf-8"), errors=("replace")) as f:
qlist = []
for line in f:
s = line.rstrip("\n")
qlist.append(s)
if (len(qlist) > 10000):
try:
with multiprocessing.Pool() as pool:
pool.map(checkqlist, qlist)
except Exception as e:
print("error log: %s" % repr(e))
time.sleep(15)
logit("Completed! " + datetime.datetime.now().strftime("%I:%M%p on %B %d, %Y"))
print("--- %s seconds ---" % (time.time() - start_time))
【问题讨论】:
-
如果“字典”有任何自然结构,那么您可以使用它来索引进程。您需要自己处理这些流程,但它可能会起作用。否则,也许使用 WSL,然后你可以使用基于 fork 的并行性,它可能会起作用吗?
-
无论如何,使用分叉子进程的@SamMason 最终都会复制数据,因为在 Python 中仅触摸数据就是“写入”(由于引用计数)。如果您使用
array.array或numpy.ndarray之类的东西,在原始缓冲区上使用一些包装器,则有时这是可以避免的,因此只有包装器被复制,但这通常是脆弱的
标签: python python-3.x dictionary multiprocessing large-data