【问题标题】:pool processing use async memory leak池处理使用异步内存泄漏
【发布时间】:2021-09-29 05:44:27
【问题描述】:

在我的代码方面需要一些帮助,我正在尝试使用 pool 在我的列表中运行一些函数。

它是一个包含大量数据的文件。比如 71GB 的文本。

我正在尝试尽可能快地越线。 但是由于某种原因在运行过程中,我被内存耗尽了

我认为这是因为我的 pool.close 和 pool.join 在我的代码末尾。

但我不确定如何修复它以避免内存泄漏。我认为我的进程永远不会死并保存数据。

注意:api.delete_object(item) 返回大量数据。也许它会以某种方式卡在内存中?

这是我的代码:

import pandas as pd
import boto3
from multiprocessing.pool import ThreadPool as Pool

pool_size = 8  # my "parallelness"


def worker(item):
    try:
        result = api.delete_object(item)
    except:
        print('error with item')

pool = Pool(pool_size)        
i=0
for chunk in pd.read_csv("/folder/files_to_delete",chunksize=1000,header=None):
            i+=1000
            for ind in chunk.index:
              if "something.txt" not in chunk[0][ind]:
                    pool.apply_async(worker, (chunk[0][ind],))
                    print("{}".format(i), end='\r')
pool.close()
pool.join()

【问题讨论】:

  • 两件事:即使在使用 fork 时,使用if __name__ == "__main__": 来保护执行也是一个好习惯。您可以在池构造函数中设置maxtasksperchild 关键字,以定期重新启动工作进程,以收集管理不善的资源。
  • 我应该设置多少? maxtasksperchild = 1?对内存泄漏有帮助吗?
  • maxtasksperchild 不能在 TheardPool 上使用,我应该使用不同的处理方法吗? @亚伦
  • 我错过了 ThreadPool 我的错误。也许可以尝试 tracemalloc 或其他一些第三方库来分析内存使用情况以找到泄漏?

标签: python pandas multithreading multiprocessing threadpool


【解决方案1】:

虽然您已使用multiprocessing 标记您的问题,但您使用的是多线程。但在我看来,使用多处理可能会更好,因为我不清楚使用线程执行 api.delete_object(item) 可以实现多少“并行性”。

无论如何,您可以尝试重新排列代码以使用函数imap_unordered,然后迭代它返回的iterable。虽然您的工作函数worker 的返回值不是特别有趣(发布的工作函数只返回None,但也许这是一种简化),检索这些值应该释放用于保存它们的内存.

import pandas as pd
import boto3
from multiprocessing.pool import ThreadPool as Pool

def generate_arguments():
    i = 0
    with pd.read_csv("/folder/files_to_delete",chunksize=1000,header=None) as rdr:
        for chunk in rdr:
            i += 1000
            for ind in chunk.index:
                if "something.txt" not in chunk[0][ind]:
                    yield chunk[0][ind]
                    print("{}".format(i), end='\r')
        

def worker(item):
    try:
        result = api.delete_object(item)
    except:
        print('error with item')

def main():
    pool_size = 8  # my "parallelness"
    pool = Pool(pool_size)
    results = pool.imap_unordered(worker, generate_arguments())
    # You can iterate results to get return values, which are None, from worker function:
    for result in results:
        # result is return value from worker
        pass
    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

如果您确实切换到多处理,您应该在调用imap_unordered 时指定一个chunksize 参数。我建议您估计将提交的任务数除以 4 * 池大小并将结果用作该值。例如,如果您估计要提交 200,000 个任务并且池大小为 8,则使用 chunksize200_000 / (4 * 8) = 6250

请注意,imap_unordered 可以按任意顺序返回结果,但比imap 更有效。如果您实际上是从worker 返回的不是None 并且需要按任务提交顺序排列结果,则使用方法imap 或让imap_unordered 额外返回其传递的参数。

【讨论】:

  • 哇,谢谢。我尝试了解决方案并得到:AttributeError: __enter__ 知道为什么吗?
  • 仍然存在内存泄漏 :( 知道为什么吗?
  • 我相信AttributeError 如果您没有运行至少pandas 1.2。所以不要使用with 上下文管理器,而只是明确地做一个close
  • 就您持续存在的内存问题而言,我不知道其余代码可能会导致泄漏,所以我的回答很明显可以尝试。
猜你喜欢
  • 2018-12-18
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-06-14
  • 1970-01-01
  • 2017-08-11
  • 1970-01-01
相关资源
最近更新 更多