【问题标题】:Using Multiprocessing in a Time and Memory Efficient Way以节省时间和内存的方式使用多处理
【发布时间】:2021-01-28 17:54:24
【问题描述】:

我有一个问题,我的想法已经不多了,所以我希望有人有更好的想法。

我正在尝试在 python 中对数百万个数据点进行优化。我有一个函数calculate(data),它接收一个巨大的数组data,并返回一个大小相同的数组results。计算相当简单,但需要使用data 中的多个条目对results 中的每个条目进行计算,不幸的是无法矢量化。由于维度原因,dataresults 的大小很大,无法缩小。 由于计算的数量庞大,这可能需要几天的时间来计算。

因此我开始使用multiprocessing,它显着提高了速度。我实现这一点的方法是将results 切割成块,为每个核心提供完整的(必需的)data 数组以及计算整个results 数组的单个块并返回它们以在之后合并它们的任务。 (使用pool.apply_async(func, (data,))

随着data 的大小进一步增加,我开始出现内存错误。根据我的分析,这是因为每个核心都有其单独的 data 数组,这意味着我的 RAM 中有 data 的原始 1 + # of cores 副本。为了减少这一点,我想我会尝试使用带有代理字典的管理器来处理data,然后每个核心都可以访问它。 (使用data_shared = manager.dict(data))不幸的是,这非常慢,这大概就是不推荐它的原因。

我错过了一个明显的解决方案吗?我非常感谢任何想法。

【问题讨论】:

    标签: python memory multiprocessing


    【解决方案1】:

    这是一个使用整数列表的示例。

    这段代码只是将一个包含 0、1、2、... 999 的 1000 个整数的数组分解为大小为 10 的块,最终得到 1000 / 10 = 100 个块,并从这些块中创建 100 个列表。这些块及其起始索引被提交给一个工作程序,该工作程序对列表的元素求和并返回起始索引和总和,然后用于计算总计。此处的起始索引未用于计算总计,但在另一种情况下它可能很有用。

    from multiprocessing import Pool, Array
    import itertools
    
    
    def init_pool(arr):
        global data
        data = arr
    
    
    def my_worker(tpl):
        index, results_chunk = tpl
        # sum items from results_chunk and data array:
        data_index = index
        the_sum = 0
        for item in results_chunk:
            the_sum += item + data[data_index]
            data_index += 1
        return index, the_sum
    
    
    def get_chunks(arr, size):
        index = 0
        it = iter(arr)
        while True:
            x = tuple(itertools.islice(it, size))
            if not x:
                return
            yield index, list(x)
            index += size
    
    
    # required for Windows and other platforms that do not have a fork() call:
    if __name__ == '__main__':
        data = [x for x in range(1000, 2000)]
        data_sum = sum(data)
        arr = Array('i', data, lock=False)
        results = [x for x in range(1000)]
        results_sum = sum(results)
        print('Excpected total sum =', data_sum + results_sum)
        with Pool(initializer=init_pool, initargs=(arr,)) as pool:
            total = 0
            for index, the_sum in pool.imap_unordered(my_worker, get_chunks(results, 10), 3):
                total += the_sum
            print('Actual total sum =', total)
    

    打印:

    Excpected total sum = 1999000
    Actual total sum = 1999000
    

    【讨论】:

    • 嘿,谢谢你的建议 :) 我有一些问题:get_chunks() 是传递给每个工人还是只传递给它的产量?如果它确实通过了,那是否也需要传递数据数组?如果它没有通过,这对我的例子有什么作用?我遇到的问题是我不知道在函数中计算的 calculate() 函数中将需要数据数组的哪些条目。
    • get_chunks() 是传递给每个工人还是只传递给它的产量? 只传递给它的产量,它们是较大列表的子列表。该代码适用于拆分并提交您的results 工作——我将其贴错标签data——对此感到抱歉。如果您不想在每个地址空间中复制您的data,那么使用托管结构(为什么不使用数组而不是字典?)是我知道的唯一方法。也许其他人不知道。
    • 如果您的计算(或大部分计算)可以使用 C 语言包(例如 numpy)来完成,那么您可以使用多线程而不是使用多线程,并且仍然可以获得显着的并行性,因为没有Global Interpreter Lock 的竞争将不再那么激烈,当然,一切都将在一个地址空间中。此外,您还可以通过使用 C 获得加速。
    • 在我发表最后一条评论后,我知道有些不对劲。如果您使用非托管多处理结构,例如multiprocessing.Arraymultiprocessing.dict,它是在共享内存中实现的,并且应该只有一个副本。请参阅更新的答案,其中我将results 作为分块实现为传递的普通列表和data,这是共享内存中的multiprocessing.Array,没有锁,因为它是只读的。分块前的resultsdata 大小相同,目标是将所有元素加在一起(以复杂的方式)。
    • 嗨,很抱歉这么晚才回复你。我实施了您的建议,在最终解决了一些错误后,我可以确认它运行良好。我没有计算每次访问数组的速度与我之前尝试过的相比有多快,但是额外的时间延迟并不明显,让我们这么说吧。非常感谢你的帮助,你真的救了我! :)
    猜你喜欢
    • 1970-01-01
    • 2018-01-20
    • 2015-11-15
    • 2012-07-14
    • 2019-02-08
    • 2018-01-27
    • 2013-10-30
    • 1970-01-01
    • 2017-12-13
    相关资源
    最近更新 更多