【发布时间】:2018-12-22 22:35:37
【问题描述】:
我正在使用 apply_async() 对我的 python 脚本进行多重处理,如下所示:
def my_proc(df, id):
# do something
return df
df = pd.read_csv(myfile, sep='\t', header=0, dtype=object)
p = multiprocessing.Pool(50)
ids = df['id'].tolist()
for i in range(len(ids))
result[id] = p.apply_async(my_proc, [df, ids[i]])
我遇到的问题是,如果数据帧变得非常大(200K 行,75 列),在任何给定时间只有一个进程运行,而所有其他进程都在睡眠模式下被阻塞。
如果我将数据帧保存到 csv 文件中并将 csv 文件名作为参数传递并打开进程并读取 csv,我看到现在所有进程都在运行,但性能变得无法接受,因为所有进程(其中 50 个) ) 竞争打开同一个大的 csv 文件。
任何人都可以告诉我如何找出这些进程被阻止的原因和位置。对替代的高性能解决方法有什么建议吗?
编辑:
我正在使用 Linux 服务器。 我试图在下面的队列中传递 df,但结果相同。我还返回一个 None 并将我的进程计数更改为 3 以隔离问题:
def my_proc(q, id):
df = q.get()
# do something
return **None**
p = multiprocessing.Pool(**3**)
m = multiprocessing.Manager()
q = m.Queue()
df = pd.read_csv(report_file_dups, sep='\t', header=0, dtype=object)
q.put(df)
ids = df['id'].tolist()
for i in range(len(ids))
result[id] = p.apply_async(my_proc, [q, ids[i]])
我是否按预期使用队列?
【问题讨论】:
-
您在 Windows 上吗?如果你是你需要把你的主要代码放在
if __name__=="__main__":块中。否则多处理将无法正常工作。 -
您是否尝试过将
map与chunksize设置一起使用?看起来有点像我最近回复的一个问题:stackoverflow.com/a/53797655/1358308 -
它也可能与在该地方发送如此多的数据有关。每个调用的参数都被独立“腌制”,结果在发回之前被挑选出来。即你可能想做一些事情,这样你就不会在每次调用时都腌制一个大数据框