【问题标题】:Python processes stay blocked with large argumentsPython 进程被大参数阻塞
【发布时间】:2018-12-22 22:35:37
【问题描述】:

我正在使用 apply_async() 对我的 python 脚本进行多重处理,如下所示:

def my_proc(df, id):
   # do something
   return df

df = pd.read_csv(myfile, sep='\t', header=0, dtype=object)
p = multiprocessing.Pool(50)
ids = df['id'].tolist()
for i in range(len(ids))
    result[id] = p.apply_async(my_proc, [df, ids[i]])

我遇到的问题是,如果数据帧变得非常大(200K 行,75 列),在任何给定时间只有一个进程运行,而所有其他进程都在睡眠模式下被阻塞。

如果我将数据帧保存到 csv 文件中并将 csv 文件名作为参数传递并打开进程并读取 csv,我看到现在所有进程都在运行,但性能变得无法接受,因为所有进程(其中 50 个) ) 竞争打开同一个大的 csv 文件。

任何人都可以告诉我如何找出这些进程被阻止的原因和位置。对替代的高性能解决方法有什么建议吗?

编辑:

我正在使用 Linux 服务器。 我试图在下面的队列中传递 df,但结果相同。我还返回一个 None 并将我的进程计数更改为 3 以隔离问题:

def my_proc(q, id):
    df = q.get()
    # do something
    return **None**

p = multiprocessing.Pool(**3**)
m = multiprocessing.Manager()
q = m.Queue()
df = pd.read_csv(report_file_dups, sep='\t', header=0, dtype=object)
q.put(df)
ids = df['id'].tolist()
for i in range(len(ids))
    result[id] = p.apply_async(my_proc, [q, ids[i]])

我是否按预期使用队列?

【问题讨论】:

  • 您在 Windows 上吗?如果你是你需要把你的主要代码放在if __name__=="__main__": 块中。否则多处理将无法正常工作。
  • 您是否尝试过将mapchunksize 设置一起使用?看起来有点像我最近回复的一个问题:stackoverflow.com/a/53797655/1358308
  • 它也可能与在该地方发送如此多的数据有关。每个调用的参数都被独立“腌制”,结果在发回之前被挑选出来。即你可能想做一些事情,这样你就不会在每次调用时都腌制一个大数据框

标签: python multiprocessing


【解决方案1】:

如何将文件放入队列,逐行读取,并让工作人员从队列中消费数据?

【讨论】:

  • 我尝试使用编辑中指示的队列,但同样的问题仍然存在。
  • 您不会将数据拆分为几个块以由工作人员并行处理。我不知道熊猫,但看起来您将整个数据框作为队列中的一项提供。所以当然只有一名工人可以get()它。你不能把它分成几行,或者可能是用循环送入队列的块吗?另外,如果我是您,我会在您填满队列之前启动工作人员,因此您无需在开始运行之前将所有内容加载到 ram 中。
  • 我无法破坏数据框。每个工作人员都需要整个数据帧来生成输出。每个工作人员都有一个 id 并对整个数据帧进行操作。这里的想法是为每个工作人员的不同 id 并行化这些计算。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-06-05
  • 2016-07-06
  • 1970-01-01
  • 2012-01-25
  • 1970-01-01
  • 2013-05-24
相关资源
最近更新 更多