【问题标题】:Parallel python iteration并行python迭代
【发布时间】:2015-08-24 19:16:59
【问题描述】:

我想根据pandas.DataFrame 中的值创建一个类的多个实例。这个我记下来了。

import itertools
import multiprocessing as mp
import pandas as pd

class Toy:
    id_iter = itertools.count(1)

    def __init__(self, row):
        self.id = self.id_iter.next()
        self.type = row['type']

if __name__ == "__main__":

    table = pd.DataFrame({
        'type': ['a', 'b', 'c'],
        'number': [5000, 4000, 30000]
        })

    for index, row in table.iterrows():
        [Toy(row) for _ in range(row['number'])]

多处理尝试

我已经能够通过添加以下内容(在某种程度上)并行化:

pool = mp.Pool(processes=mp.cpu_count())
m = mp.Manager()
q = m.Queue()

for index, row in table.iterrows():
    pool.apply_async([Toy(row) for _ in range(row['number'])])

如果row['number'] 中的数字明显长于table 的长度,这似乎会更快。但在我的实际情况下,table 有数千行长,每个row['number'] 都比较小。

尝试将table 分解为cpu_count() 块并在表中迭代似乎更聪明。但现在我们正处于我的 Python 技能的边缘。

我已经尝试过 python 解释器对我大喊大叫的事情,例如:

pool.apply_async(
        for index, row in table.iterrows(): 
        [Toy(row) for _ in range(row['number'])]
        )

还有“不能腌制”的东西

Parallel(n_jobs=4)(
    delayed(Toy)([row for _ in range(row['number'])]) \
            for index, row in table.iterrows()
)

编辑

这可能让我更接近一点,但仍然没有。我在单独的函数中创建类实例,

def create_toys(row):
    [Toy(row) for _ in range(row['number'])]

....

Parallel(n_jobs=4, backend="threading")(
    (create_toys)(row) for i, row in table.iterrows()
)

但有人告诉我“NoneType”对象不可迭代。

【问题讨论】:

  • 你看到这个问题了吗? stackoverflow.com/questions/26784164/…
  • 不,我没有;现在正在看。
  • 我可以看到它是如何应用的,但我不能完全强迫它解决我的问题。
  • 您创建了许多Toy 实例,但看起来您只是将它们丢弃。目前尚不清楚您为什么要这样做,因此很难提出更好的方法。
  • 在我的真实案例中,该类调用了一个 write 方法,该方法将实例写入 xml 树。这是一个完全不同的问题......

标签: python pandas python-multiprocessing


【解决方案1】:

我有点不清楚您期望的输出是什么。你只是想要一个表格的大列表

[Toy(row_1) ... Toy(row_n)]

每个Toy(row_i) 以多重row_i.number 出现在哪里?

根据@JD Long 提到的答案,我认为您可以这样做:

def process(df):
    L = []
    for index, row in table.iterrows():
        L += [Toy(row) for _ in range(row['number'])]
    return L

table = pd.DataFrame({
    'type': ['a', 'b', 'c']*10,
    'number': [5000, 4000, 30000]*10
    })

p = mp.Pool(processes=8)
split_dfs = np.array_split(table,8)    
pool_results = p.map(process, split_dfs)
p.close()
p.join()

# merging parts processed by different processes
result = [a for L in pool_results for a in L]

【讨论】:

  • 这正是我所需要的,尽管最后一行花了我很长时间才弄明白。在看到你已经涵盖了我需要的东西之前,我最终选择了this question
  • 不错,实际上我很不喜欢这种语法,我觉得它很难读,而且我永远记不起循环的运行顺序。(虽然不知道我会怎么做)
  • 你能看看这个问题吗:- stackoverflow.com/questions/53561794/…
猜你喜欢
  • 2018-11-15
  • 1970-01-01
  • 1970-01-01
  • 2013-06-20
  • 2019-03-29
  • 1970-01-01
  • 2018-10-18
  • 2019-05-02
  • 1970-01-01
相关资源
最近更新 更多