【问题标题】:How to reduce time for multiprocessing in python如何减少python中多处理的时间
【发布时间】:2016-11-22 17:06:02
【问题描述】:

我正在尝试在 python 中构建多处理以降低计算速度,但似乎在多处理之后,整体计算速度显着下降。我创建了 4 个不同的进程并将数据帧拆分为 4 个不同的数据帧,这将是每个进程的输入。在对每个流程进行计时后,似乎间接费用成本很高,想知道是否有办法降低这些间接费用。

我使用的是 windows7,python 3.5,我的机器有 8 个内核。

def doSomething(args, dataPassed,):

    processing data, and calculating outputs

def parallelize_dataframe(df, nestedApply):
    df_split = np.array_split(df, 4)
    pool = multiprocessing.Pool(4)
    df = pool.map(nestedApply, df_split)
    print ('finished with Simulation')
    time = float((dt.datetime.now() - startTime).total_seconds())

    pool.close()
    pool.join()

def nestedApply(df):

    func2 = partial(doSomething, args=())
    res = df.apply(func2, axis=1)
    res =  [output Tables]
    return res

if __name__ == '__main__':

data = pd.read_sql_query(query, conn)

parallelize_dataframe(data, nestedApply)

【问题讨论】:

  • 你能列出单线程与多线程相比需要多长时间吗?
  • 您有多少 CPU/内核(真实的,不是超线程)?这看起来像是 CPU 密集型工作,因此拆分为多于内核数量只会减慢速度。另外,数据帧有多大,doSomething 的成本是多少?要将数据帧发送到每个子进程,必须对其进行序列化(通过pickle)和反序列化,因此如果帧很大并且doSomething 很便宜,您确实会看到大部分时间都花在了开销上。
  • @Fruitspunchsamurai 单线程运行耗时 26 分钟,仅运行映射函数耗时 33 分钟,整体耗时 71 分钟。
  • @Oliver 我有 8 个真正的内核,是的,doSomething 函数非常占用 CPU 资源且成本高昂(它将所有结果存储在内存中,直到命中所有数据)。我运行了大约 25,000 行 DataFrame,每行大约有 20 列。

标签: python multiprocessing overhead-minimization


【解决方案1】:

我建议使用队列,而不是将 DataFrame 作为块提供。您需要大量资源来复制每个块,并且需要相当长的时间。如果您的 DataFrame 真的很大,您可能会耗尽内存。使用队列,您可以从 pandas 中的快速迭代器中受益。 这是我的方法。开销随着工作人员的复杂性而降低。不幸的是,我的工作人员很难真正证明这一点,但 sleep 有点模拟复杂性。

import pandas as pd
import multiprocessing as mp
import numpy as np
import time


def worker(in_queue, out_queue):
    for row in iter(in_queue.get, 'STOP'):
        value = (row[1] * row[2] / row[3]) + row[4]
        time.sleep(0.1)
        out_queue.put((row[0], value))

if __name__ == "__main__":
    # fill a DataFrame
    df = pd.DataFrame(np.random.randn(1e5, 4), columns=list('ABCD'))

    in_queue = mp.Queue()
    out_queue = mp.Queue()

    # setup workers
    numProc = 2
    process = [mp.Process(target=worker,
                          args=(in_queue, out_queue)) for x in range(numProc)]

    # run processes
    for p in process:
        p.start()

    # iterator over rows
    it = df.itertuples()

    # fill queue and get data
    # code fills the queue until a new element is available in the output
    # fill blocks if no slot is available in the in_queue
    for i in range(len(df)):
        while out_queue.empty():
            # fill the queue
            try:
                row = next(it)
                in_queue.put((row[0], row[1], row[2], row[3], row[4]), block=True)  # row = (index, A, B, C, D) tuple
            except StopIteration:
                break
        row_data = out_queue.get()
        df.loc[row_data[0], "Result"] = row_data[1]

    # signals for processes stop
    for p in process:
        in_queue.put('STOP')

    # wait for processes to finish
    for p in process:
        p.join()

使用numProc = 2,每个循环需要50秒,使用numProc = 4,速度是原来的两倍。

【讨论】:

    猜你喜欢
    • 2017-07-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-12-11
    • 1970-01-01
    • 2023-03-20
    • 2021-12-23
    • 1970-01-01
    相关资源
    最近更新 更多