【问题标题】:matplotlib and python multithread / multiprocessing file processingmatplotlib和python多线程/多处理文件处理
【发布时间】:2012-12-12 23:01:22
【问题描述】:

我有大量文件要处理。我编写了一个脚本来获取、排序和绘制我想要的数据。到现在为止还挺好。我已经对其进行了测试,它给出了预期的结果。

然后我想使用多线程来做到这一点。我查看了互联网上的文档和示例,并且在我的程序中使用一个线程可以正常工作。但是当我使用更多时,有时会出现随机 matplotlib 错误,并且我怀疑那里存在一些冲突,即使我使用了一个带有绘图名称的函数,我也看不出问题出在哪里。

这是整个脚本,如果您需要更多评论,我会添加它们。谢谢。

#!/usr/bin/python
import matplotlib
matplotlib.use('GTKAgg')
import numpy as np
from scipy.interpolate import griddata

import matplotlib.pyplot as plt
import matplotlib.colors as mcl
from matplotlib import rc #for latex

import time as tm
import sys
import threading
import Queue #queue in 3.2 and Queue in 2.7 !

import pdb #the debugger

rc('text', usetex=True)#for latex

map=0 #initialize the map index. It will be use to index the array like     this: array[map,[x,y]]
time=np.zeros(1) #an array to store the time
middle_h=np.zeros((0,3)) #x phi c

#for the middle of the box
current_file=open("single_void_cyl_periodic_phi_c_middle_h_out",'r')
for line in current_file:
    if line.startswith('# ===  time'):
        map+=1
        np.append(time,[float(line.strip('# ===  time  '))])
    elif line.startswith('#'):
        pass
    else:
        v=np.fromstring(line,dtype=float,sep=' ')
        middle_h=np.vstack( (middle_h,v[[1,3,4]]) ) 
current_file.close()
middle_h=middle_h.reshape((map,-1,3)) #3d array: map, x, phi,c 

#####
def load_and_plot(): #will load a map file, and plot it along with the     corresponding profile loaded before
    while not exit_flag:
        print("fecthing work ...")
        #try:
        if not tasks_queue.empty():
            map_index=tasks_queue.get()
            print("----> working on map: %s" %map_index)
            x,y,zp=np.loadtxt("single_void_cyl_growth_periodic_post_map_"+str(map_index),unpack=True, usecols=[1, 2,3])
            for i,el in enumerate(zp):
                if el<0.:
                    zp[i]=0.
            xv=np.unique(x)
            yv=np.unique(y)
            X,Y= np.meshgrid(xv,yv)
            Z = griddata((x, y), zp, (X, Y),method='nearest')

            figure=plt.figure(num=map_index,figsize=(14, 8))
            ax1=plt.subplot2grid((2,2),(0,0))
            ax1.plot(middle_h[map_index,:,0],middle_h[map_index,:,1],'*b')
            ax1.grid(True)
            ax1.axis([-15, 15, 0, 1])
            ax1.set_title('Profiles')
            ax1.set_ylabel(r'$\phi$')
            ax1.set_xlabel('x')

            ax2=plt.subplot2grid((2,2),(1,0))
            ax2.plot(middle_h[map_index,:,0],middle_h[map_index,:,2],'*r')
            ax2.grid(True)
            ax2.axis([-15, 15, 0, 1])
            ax2.set_ylabel('c')
            ax2.set_xlabel('x')

            ax3=plt.subplot2grid((2,2),(0,1),rowspan=2,aspect='equal')
            sub_contour=ax3.contourf(X,Y,Z,np.linspace(0,1,11),vmin=0.)
            figure.colorbar(sub_contour,ax=ax3)
            figure.savefig('single_void_cyl_'+str(map_index)+'.png')
            plt.close(map_index)
            tasks_queue.task_done()
        else:
            print("nothing left to do, other threads finishing,sleeping 2 seconds...")
            tm.sleep(2)
   # except:
   #     print("failed this time: %s" %map_index+". Sleeping 2 seconds")
   #     tm.sleep(2)
#####
exit_flag=0
nb_threads=2
tasks_queue=Queue.Queue()
threads_list=[]

jobs=list(range(map)) #each job is composed of a map
print("inserting jobs in the queue...")
for job in jobs:
    tasks_queue.put(job)
print("done")

#launch the threads
for i in range(nb_threads):
    working_bee=threading.Thread(target=load_and_plot)
    working_bee.daemon=True
    print("starting thread "+str(i)+' ...')
    threads_list.append(working_bee)
working_bee.start()


#wait for all tasks to be treated
tasks_queue.join()

#flip the flag, so the threads know it's time to stop
exit_flag=1

for t in threads_list:
    print("waiting for threads %s to stop..."%t)
    t.join()

print("all threads stopped")

【问题讨论】:

  • 我建议使用multiprocessing 而不是线程。我用它成功地实现了平行图形绘制。
  • 谢谢,实现起来似乎更复杂,但我会试一试。
  • 你只启动最后一个线程;在循环内移动working_bee.start()
  • 创建多处理版本,将全局代码移至main()函数,在if __name__ == "__main__":块中调用,用multiprocessing.Process替换threading.Thread,用multiprocessing.Queue替换Queue.Queue。顺便说一句,queue.empty() 可能不可靠,您可以改用标记值,例如,在主线程结束时:for i in range(len(threads_list)): queue.put(None),在每个线程中:for map_index in iter(queue.get, None): ...

标签: python multithreading matplotlib multiprocessing python-multithreading


【解决方案1】:

按照大卫的建议,我在多处理中做到了。使用 8 个处理器,我的速度提高了 5 倍。我相信剩下的就是在我的脚本开始时对单进程工作进行处理。 编辑:但是有时脚本会在最后一张地图上“挂起”,即使它生成了正确的地图,也会出现以下错误:

文件“single_void_cyl_plot_mprocess.py”,第 90 行,在 tasks_queue.join() 中

文件“/usr/local/epd-7.0-2-rh5-x86_64/lib/python2.7/multiprocessing/queues.py”,第 316 行,加入 self._cond.wait()

文件“/usr/local/epd-7.0-2-rh5-x86_64/lib/python2.7/multiprocessing/synchronize.py”,第 220 行,等待 self._wait_semaphore.acquire(True, timeout)

import numpy as np
from scipy.interpolate import griddata

import matplotlib.pyplot as plt
from matplotlib import rc #for latex

from multiprocessing import Process, JoinableQueue

import pdb #the debugger

rc('text', usetex=True)#for latex

map=0 #initialize the map index. It will be use to index the array     like this: array[map,x,y,...]
time=np.zeros(1) #an array to store the time
middle_h=np.zeros((0,3)) #x phi c

#for the middle of the box
current_file=open("single_void_cyl_periodic_phi_c_middle_h_out",'r')
for line in current_file.readlines():
    if line.startswith('# ===  time'):
        map+=1
        np.append(time,[float(line.strip('# ===  time  '))])
    elif line.startswith('#'):
        pass
    else:
        v=np.fromstring(line,dtype=float,sep=' ')
        middle_h=np.vstack( (middle_h,v[[1,3,4]]) ) 
current_file.close()
middle_h=middle_h.reshape((map,-1,3)) #3d array: map, x, phi,c 

#######
def load_and_plot(): #will load a map file, and plot it along with     the corresponding profile loaded before
    while tasks_queue.empty()==False:
        print("fecthing work ...")
        try:
            map_index=tasks_queue.get() #get some work to do from     the queue
            print("----> working on map: %s" %map_index)
                 x,y,zp=np.loadtxt("single_void_cyl_growth_periodic_post_map_"+str(map_index),\
                unpack=True, usecols=[1, 2,3])
            for i,el in enumerate(zp):
                if el<0.:
                    zp[i]=0.
            xv=np.unique(x)
            yv=np.unique(y)
            X,Y= np.meshgrid(xv,yv)
            Z = griddata((x, y), zp, (X, Y),method='nearest')

            figure=plt.figure(num=map_index,figsize=(14, 8))
            ax1=plt.subplot2grid((2,2),(0,0))
                ax1.plot(middle_h[map_index,:,0],middle_h[map_index,:,1],'*b')
            ax1.grid(True)
            ax1.axis([-15, 15, 0, 1])
            ax1.set_title('Profiles')
            ax1.set_ylabel(r'$\phi$')
            ax1.set_xlabel('x')

            ax2=plt.subplot2grid((2,2),(1,0))
                ax2.plot(middle_h[map_index,:,0],middle_h[map_index,:,2],'*r')
            ax2.grid(True)
            ax2.axis([-15, 15, 0, 1])
            ax2.set_ylabel('c')
            ax2.set_xlabel('x')

            ax3=plt.subplot2grid((2,2),    (0,1),rowspan=2,aspect='equal')
                sub_contour=ax3.contourf(X,Y,Z,np.linspace(0,1,11),vmin=0.)
            figure.colorbar(sub_contour,ax=ax3)
                figure.savefig('single_void_cyl_'+str(map_index)+'.png')
            plt.close(map_index)
            tasks_queue.task_done() #work for this item finished
        except:
            print("failed this time: %s" %map_index)
#######

nb_proc=8 #number of processes
tasks_queue=JoinableQueue() #a queue to pile up the work to do

jobs=list(range(map)) #each job is composed of a map
print("inserting jobs in the queue...")
for job in jobs:
    tasks_queue.put(job)
print("done")

#launch the processes
for i in range(nb_proc):
    current_process=Process(target=load_and_plot)
    current_process.start()

#wait for all tasks to be treated
tasks_queue.join()

【讨论】:

  • 你也可以试试pool = multiprocessing.Pool()for result in pool.imap_unordered(process_job, jobs): pass
  • 我可以尝试一下,似乎在加入的某个地方有一个错误......它不会每次都发生,但它让我感到困惑。
猜你喜欢
  • 2013-04-03
  • 2017-06-18
  • 2021-02-08
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-03-17
  • 1970-01-01
相关资源
最近更新 更多