【问题标题】:Python multiprocessing - sharing large datasetPython 多处理 - 共享大型数据集
【发布时间】:2022-11-12 21:42:49
【问题描述】:

我正在尝试加速受 CPU 限制的 Python 脚本(在 Windows11 上)。 Python 中的威胁似乎不在不同的 cpu(核心)上运行,所以我唯一的选择是多处理。

我有一个很大的字典数据结构(从文件加载后占用 11GB 内存),我正在检查计算值是否在该字典中。计算的输入也来自一个文件(大小为 100GB)。这个输入我可以分批池映射到进程,没问题。但是我不能将字典复制到所有进程,因为没有足够的内存。所以我需要找到一种方法让进程检查值(实际上是一个字符串)是否在字典中。

有什么建议吗?

伪程序流程:

--main--
- load dictionary structure from file   # 11GB memory footprint
- ...
- While not all chuncks loaded
-    Load chunk of calcdata from file   # (10.000 lines per chunk)
-    Distribute (map) calcdata-chunck to processes
-    Wait for processes to complete all chunks

--process--
- for each element in subchunk
-    perform calculation
-    check if calculation in dictionary  # here is my problem!
-    store result in file

编辑,在下面实现 cmets 之后,我现在在:

def ReadDictFromFile()
    cnt=0
    print("Reading dictionary from " + dictfilename)
    with open(dictfilename, encoding=("utf-8"), errors=("replace")) as f:
        next(f) #skip first line (header)
        for line in f:
            s = line.rstrip("\n")
            (key,keyvalue) = s.split()
            shared_dict[str(key)]=keyvalue
            cnt = cnt + 1
            if ((cnt % 1000000) == 0): #log each 1000000 where we are
                print(cnt)
                return #temp to speed up testing, not load whole dictionary atm
    print("Done loading dictionary")        


def checkqlist(qlist)
    try:
        checkvalue = calculations(qlist)
        
        (found, keyval) = InMem(checkvalue)
                
        if (found):
            print("FOUND!!! " + checkvalue + ' ' + keyvalue)            
    except Exception as e:
        print("(" + str(os.getpid()) + ")Error log: %s" % repr(e))
        time.sleep(15)


def InMem(checkvalue):
    if(checkvalue in shared_dict):
        return True, shared_dict[checkvalue]
    else:
        return False, ""


if __name__ == "__main__":
    start_time = time.time()

    global shared_dict 
    manager = Manager()
    shared_dict = manager.dict()

    ReadDictFromFile()
    with open(filetocheck, encoding=("utf-8"), errors=("replace")) as f:
        qlist = []
        for line in f:
            s = line.rstrip("\n")
            qlist.append(s)
            if (len(qlist) > 10000):
                try:
                    with multiprocessing.Pool() as pool:
                        pool.map(checkqlist, qlist)            
                except Exception as e:
                    print("error log: %s" % repr(e))
                    time.sleep(15)
    logit("Completed! " + datetime.datetime.now().strftime("%I:%M%p on %B %d, %Y"))
    print("--- %s seconds ---" % (time.time() - start_time))

【问题讨论】:

  • 如果“字典”有任何自然结构,那么您可以使用它来索引进程。您需要自己处理这些流程,但它可能会起作用。否则,也许使用 WSL,然后你可以使用基于 fork 的并行性,它可能会起作用吗?
  • 无论如何,使用分叉子进程的@SamMason 最终都会复制数据,因为在 Python 中仅触摸数据就是“写入”(由于引用计数)。如果您使用 array.arraynumpy.ndarray 之类的东西,在原始缓冲区上使用一些包装器,则有时这是可以避免的,因此只有包装器被复制,但这通常是脆弱的

标签: python python-3.x dictionary multiprocessing large-data


【解决方案1】:

你可以使用multiprocessing.Manager.dict,这是最快的IPC,你可以用来在python中的进程之间进行检查,对于内存大小,只需通过将所有值更改为None来使其更小,在我的电脑上它可以做到33k成员每秒检查一次……比普通字典慢约 400 倍。

manager = Manager()
shared_dict = manager.dict()
shared_dict.update({x:None for x in main_dictionary})
shared_dict["new_element"] = None  # to set another value
del shared_dict["new_element"]  # to delete a certain value

您还可以为此使用专用的内存数据库,例如 redis,它可以同时处理多个进程的轮询。

@Sam Mason 建议使用 WSL 和 fork 可能会更好,但这个是最便携的。

编辑:要将其存储在子全局范围内,您必须通过初始化程序传递它。

def define_global(var):
    global shared_dict
    shared_dict = var
...
if __name__ == "__main__":
...

    with multiprocessing.Pool(initializer=define_global, initargs=(shared_dict ,) as pool:

【讨论】:

  • 那很有趣。在我的情况下,字典填充一次,该过程只需检查它是否在字典中。 {if (x in shared_dict):}。所以没有更新。我在尝试实现时得到的奇怪的事情是进程中的“NameError shared_dict is not defined”。目前不知道为什么。
  • @Hasse您必须将其作为参数传递给被调用的函数,或者将其传递给初始化程序并将其存储在全局范围内,因为当您将其作为参数传递时,子进程不会从父进程“继承”全局变量对孩子来说,它不会被复制。
  • 是的,我读到这在 Windows 上再次有所不同(就像 fork 一样)。我只是不确定在使用地图时如何做到这一点。请参阅上面的代码更新。使用 multiprocessing.Pool() 作为池: pool.map(checkqlist, qlist) -> checkqlist 是进程函数,而 qlist 是一个应该映射到所有进程的列表。如果我添加 shared_dict 作为参数, map 会用它做什么?
  • @Hasse 我编辑了答案以显示如何将其转移到儿童的全球范围内。
猜你喜欢
  • 2021-06-23
  • 1970-01-01
  • 2020-02-18
  • 2011-01-06
  • 2023-03-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-06-12
相关资源
最近更新 更多