【问题标题】:How to run the same function multiple times simultaneously?如何同时多次运行相同的功能?
【发布时间】:2021-07-15 17:16:31
【问题描述】:

我有一个函数,它将数据帧作为输入并返回一个数据帧。喜欢:

def process(df):
    <all the code for processing>
    return df
# input df has 250K rows and 30 columns
# saving it in a variable
result = process(df)
# transform input df into 10,000K rows and over 50 columns

它做了很多处理,因此需要很长时间才能返回输出。我正在使用 jupyter 笔记本。

我想出了一个新函数,它根据原始 df 列上的某个类别过滤器将原始数据帧过滤成 5 个大小不相等但介于 30K 到 100K 之间的块,并将其作为 process(df1 )、进程(df2)...等。并将其保存为结果 1、结果 2 等,然后将结果合并为一个最终数据帧。

但我希望它们同时运行并自动组合结果。就像一个代码一起运行 5 个流程功能,一旦全部完成,它们就可以合并为一个,为我提供与之前相同的“结果”,但节省了大量运行时间。

如果我可以将原始数据帧分成相等的部分并使用 process(df) 函数同时运行每个部分,那就更好了,就像它将这 250 k 行随机拆分为 5 个大小为 50k 的数据帧并将它们作为输入发送到进程(df)五次并并行运行它们,并给我相同的最终输出,我现在会得到没有任何这种优化。

我阅读了很多关于多线程的内容,并在堆栈溢出方面找到了一些有用的答案,但我无法真正让它发挥作用。我对多线程这个概念很陌生。

【问题讨论】:

    标签: python multithreading dataframe function optimization


    【解决方案1】:

    您应该检查 dask (https://dask.org/),因为您似乎主要对数据帧进行操作。一个很大的优势是您不必担心手动拆分数据框的所有细节以及所有这些。

    【讨论】:

    • 嗨,我已经尝试过 dask,它很好并且可以创建块,但问题是数据必须被摄取到 ETL 管道中,并且系统目前无法识别 dask 数据帧。我想要一种更传统的软件工程方法。
    • 我明白了...我的意思是,当您使用 .compute()(如 pandas 数据框)时,您仍然可以获得不是 dask 类型的输出,但您的系统中可能有其他限制,我是不知道。
    【解决方案2】:

    您可以为此使用 multiprocessing 库,它允许您在 CPU 的不同内核上运行函数。

    下面是一个例子

    from multiprocessing import Pool
    
    def f(df):
        # Process dataframe
        return df
    
    if __name__ == '__main__':
        dataframes = [df1, df2, df3]
    
        with Pool(len(dataframes)) as p:
            proccessed_dfs = p.map(f, dataframes)
        
        print(processed_dfs)
    
        # You would join them here
    

    【讨论】:

    • 嗨,莱昂,谢谢。我理解代码,但它需要很长时间才能运行。它似乎进一步减慢它。就像我在少数情况下厌倦了它,它运行了太长时间并且仍在运行。
    • 这真的很奇怪。为了解决这个问题,我有一些问题:当您说“太长”时,您是指 2 秒还是 2 小时(创建所有进程通常需要一秒)?您将数据框分解为多少部分?你的 CPU 有多少个内核?计时时是否包括拆分和合并数据帧所需的时间?
    • 因此,正如我在问题中提到的那样,我用一小块原本很大的数据框对其进行了测试。只是为了测试我使用 100 条记录,串行方法在不到一分钟的时间内快速完成,而池继续运行 20 分钟,我不得不强制停止它。它不会引发错误。我没有计算拆分它的时间,我已经准备好了拆分。我将数据框分为 5 个类别,每个类别的记录接近 20 条。而且,我有 6 个 CPU 内核。
    • 我发现这篇关于堆栈溢出的文章 - stackoverflow.com/questions/20727375/…(查看 aelfinn 的答案)。我想知道这是否是我传递大量数据帧的原因,这些数据帧将进一步扩展到更多的行和列
    • 你的处理功能好像有问题。当多处理需要更长的时间时,它只是几秒钟,而不是 19 分钟。我认为您没有在数据处理功能中正确退出进程,导致它挂起。一个简单的测试方法是在函数的不同阶段放置一些打印语句以查看它挂起的位置(例如,一个在进程开始时,另一个在它开始处理数据之前,另一个在它完成处理数据之后,以及最后在函数完成之前)。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-09-03
    相关资源
    最近更新 更多