【问题标题】:Python 多处理池没有创建足够的进程
【发布时间】:2017-04-05 04:04:40
【问题描述】:

我正在对 40GB 的数据进行计算。每个文件都是一个压缩的 gzip 文件,其中包含 json 行。每个文件最多有 500,000 行,或大约 500MB。我有一个运行 128 cpu 和 1952 GB 内存的亚马逊实例。我要做的是尽快处理每个文件。

我正在使用这样的多处理池:

def initializeLock(l):

    global lock
    lock = l

if __name__ == '__main__':
    directory = '/home/ubuntu/[directory_containing_files]/*.gz'
    file_names = glob.glob(directory)

    lock = Lock()
    pool = Pool(initializer=initializeLock, initargs=(lock,))
    pool.map(do_analysis, file_names)
    pool.close()
    pool.join()

我期望发生的是创建大量进程,每个进程处理一个文件。实际发生的是最初创建了 100 多个进程。在这一点上,我使用了大约 85% 的内存,这太棒了!然后每个都完成。最终,运行的进程数量减少到大约 10 个。此时我只使用了 5% 的内存。定期启动额外的进程,但它永远不会回到运行 100 左右。所以我有这么大的 CPU 和所有这些空闲内存,但我大部分时间最多运行 10 个进程。

关于如何让它继续运行 100 个进程直到所有文件都完成的任何想法?

编辑:

我在应用程序中添加了一些日志记录。最初它加载了 127 个进程,我认为这是因为我有 128 个 CPU,其中一个在加载进程时正在使用中。一些进程成功完成,结果被保存。然后在某个时候,除了少数正在运行的进程之外,所有进程都会结束。当我检查有多少文件完成时,127 个文件中只有 22 个完成。然后它只使用 5-10 个进程运行,所有这些都成功完成。我在想也许它会耗尽内存并崩溃。但为什么?我有这么多内存和这么多 CPU。

编辑2:

所以我找到了问题所在。问题是我在 do_analysis 方法中设置了一个锁,所有进程都在同一时间完成并等待锁被释放。这些过程没有停止,它们正在休眠。所以这给我带来了另一个问题:我的主要目标是获取具有许多 json 行的每个文件,从 json 行获取 ID 属性,然后将其附加到包含具有相同 id 的其他行的文件中。如果文件不存在,我创建它。我所做的是在访问文件时设置一个锁,以避免它被另一个进程访问。这是我的代码。

for key, value in dataframe.iteritems():
    if os.path.isfile(file_name):
        lock.acquire()
        value.to_csv(filename), mode='a', header=False, encoding='utf-8')
        lock.release()
    else:
        value.to_csv(filename), header=True, encoding='utf-8')

所以现在我正在尝试一种创造性的方式来附加到文件,但不会阻止所有其他进程。我正在处理大量数据,需要同时访问两个文件的可能性很低,但它仍然会发生。所以我需要确保在附加文件时,另一个进程不会尝试打开该文件。

【问题讨论】:

  • 尝试将进程数添加到 Pool(initializer=initializeLock, processes=100, initargs=(lock,)) 的参数中
  • 您是否考虑过使用pool.imap_unordered 而不是pool.map
  • @SeregaLuchko 我试过了。但同样的事情发生了。
  • @Jean-FrançoisFabre 我试过了,它似乎确实可以让更多的进程保持更长时间,但最终它又回到了运行 10 个左右。我刚刚阅读了有关 chunksize 参数的信息。也许它正在向一个进程提交多个文件?我正在尝试 imap_unordered 再次将块大小设置为 1。
  • @Gabriel 尝试使用 celery,你有这么多 CPU,而且似乎也很容易使用 docs.celeryproject.org/en/master/getting-started/…

标签: python amazon-web-services pandas multiprocessing pool


【解决方案1】:

感谢大家的意见。这是我目前对该问题的解决方案,我计划在接下来的一周内提高效率。我接受了 Martin 的建议,并在完成所有文件后将它们粘合在一起,但是,我想努力实施 daphtdazz 解决方案,即在我生成更多文件的同时,让一个流程与队列进行粘合。

def do_analyis(file):
    # To keep the file names unique, I append the process id to the end
    process_id = multiprocessing.current_process().pid

    # doing analysis work...

    for key, value in dataframe.iteritems():
        if os.path.isfile(filename):
            value.to_csv(filename), mode='a', header=False, encoding='utf-8')
        else:
            value.to_csv(filename), header=True, encoding='utf-8')

def merge_files(base_file_name):
    write_directory = 'write_directory'
    all_files = glob.glob('{0}*'.format(base_file_name))

    is_file_created = False

    for file in all_files:
        if is_file_created:
            print 'File already exists, appending'
            dataframe = pandas.read_csv(file, index_col=0)
            dataframe.to_csv('{0}{1}.csv'.format(write_directory, os.path.basename(base_file_name)), mode='a', header=False, encoding='utf-8')
        else:
            print 'File does not exist, creating.'
            dataframe = pandas.read_csv(file, index_col=0)
            dataframe.to_csv('{0}{1}.csv'.format(write_directory, os.path.basename(base_file_name)), header=True, encoding='utf-8')
            is_file_created = True


if __name__ == '__main__':
    # Run the code to do analysis and group files by the id in the json lines
    directory = 'directory'
    file_names = glob.glob(directory)
    pool = Pool()
    pool.imap_unordered(do_analysis, file_names, 1)
    pool.close()
    pool.join()

    # Merge all of the files together
    base_list = get_unique_base_file_names('file_directory')
    pool = Pool()
    pool.imap_unordered(merge_files, base_list, 100)
    pool.close()
    pool.join()

这会保存每个文件,并在文件末尾附加一个唯一的进程 id,然后返回并通过 json 文件中的 id 获取所有文件并将它们合并在一起。创建文件时,cpu 使用率在 60-70% 之间。那是体面的。合并文件时,cpu 使用率约为 8%。这是因为文件合并得如此之快,以至于我不需要我拥有的所有 cpu 处理能力。此解决方案有效。但它可能更有效。我打算同时做这两件事。欢迎任何建议。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-12-12
    • 2013-12-01
    • 2022-01-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-11-30
    • 2015-01-13
    相关资源
    最近更新 更多