【问题标题】:Python multiprocessing and shared variablePython多处理和共享变量
【发布时间】:2013-04-09 13:51:23
【问题描述】:

我不是 python 专家,但我设法编写了一个多处理代码,该代码使用了我 PC 中的所有 cpu 和内核。我的代码加载了一个非常大的数组,大约 1.6 GB,我需要在每个进程中更新数组。幸运的是,更新包括在图像中添加一些人造星,并且每个过程都有一组不同的图像位置来添加人造星。

图像太大,每次调用进程时我都无法创建一个新图像。我的解决方案是在共享内存中创建一个变量,这样可以节省大量内存。出于某种原因,它适用于 90% 的图像,但有些区域是我的代码在我之前发送到进程的某些位置添加随机数。它与我创建共享变量的方式有关吗?在我的代码执行过程中,进程是否相互干扰?

奇怪的是,当使用单 CPU 和单核时,图像是 100% 完美的,并且图像中没有添加随机数。您是否建议我在多个进程之间共享一个大型数组?这是我的代码的相关部分。请在我定义变量 im_data 时阅读该行。

import warnings
warnings.filterwarnings("ignore")

from mpl_toolkits.mplot3d import Axes3D
from matplotlib import cm
import matplotlib.pyplot as plt
import sys,os
import subprocess
import numpy as np
import time
import cv2 as cv
import pyfits
from pyfits import getheader
import multiprocessing, Queue
import ctypes

class Worker(multiprocessing.Process):


def __init__(self, work_queue, result_queue):

    # base class initialization
    multiprocessing.Process.__init__(self)

    # job management stuff
    self.work_queue = work_queue
    self.result_queue = result_queue
    self.kill_received = False

def run(self):
    while not self.kill_received:

        # get a task
        try:
            i_range, psf_file = self.work_queue.get_nowait()
        except Queue.Empty:
            break

        # the actual processing
        print "Adding artificial stars - index range=", i_range

        radius=16
        x_c,y_c=( (psf_size[1]-1)/2, (psf_size[2]-1)/2 )
        x,y=np.meshgrid(np.arange(psf_size[1])-x_c,np.arange(psf_size[2])-y_c)
        distance = np.sqrt(x**2 + y**2)

        for i in range(i_range[0],i_range[1]):
            psf_xy=np.zeros(psf_size[1:3], dtype=float)
            j=0
            for i_order in range(psf_order+1):
                j_order=0
                while (i_order+j_order < psf_order+1):
                    psf_xy += psf_data[j,:,:] * ((mock_y[i]-psf_offset[1])/psf_scale[1])**i_order * ((mock_x[i]-psf_offset[0])/psf_scale[0])**j_order
                    j_order+=1
                    j+=1


            psf_factor=10.**( (30.-mock_mag[i])/2.5)/np.sum(psf_xy)
            psf_xy *= psf_factor

            npsf_xy=cv.resize(psf_xy,(npsf_size[0],npsf_size[1]),interpolation=cv.INTER_LANCZOS4)
            npsf_factor=10.**( (30.-mock_mag[i])/2.5)/np.sum(npsf_xy)
            npsf_xy *= npsf_factor

            im_rangex=[max(mock_x[i]-npsf_size[1]/2,0), min(mock_x[i]-npsf_size[1]/2+npsf_size[1], im_size[1])]
            im_rangey=[max(mock_y[i]-npsf_size[0]/2,0), min(mock_y[i]-npsf_size[0]/2+npsf_size[0], im_size[0])]
            npsf_rangex=[max(-1*(mock_x[i]-npsf_size[1]/2),0), min(-1*(mock_x[i]-npsf_size[1]/2-im_size[1]),npsf_size[1])]
            npsf_rangey=[max(-1*(mock_y[i]-npsf_size[0]/2),0), min(-1*(mock_y[i]-npsf_size[0]/2-im_size[0]),npsf_size[0])]

            im_data[im_rangey[0]:im_rangey[1], im_rangex[0]:im_rangex[1]] = 10.


        self.result_queue.put(id)

if __name__ == "__main__":

  n_cpu=2
  n_core=6
  n_processes=n_cpu*n_core*1
  input_mock_file=sys.argv[1]

  print "Reading file ", im_file[i]
  hdu=pyfits.open(im_file[i])
  data=hdu[0].data
  im_size=data.shape

  im_data_base = multiprocessing.Array(ctypes.c_float, im_size[0]*im_size[1])
  im_data = np.ctypeslib.as_array(im_data_base.get_obj())
  im_data = im_data.reshape(im_size[0], im_size[1])
  im_data[:] = data
  data=0
  assert im_data.base.base is im_data_base.get_obj()

  # run
  # load up work queue
  tic=time.time()
  j_step=np.int(np.ceil( mock_n*1./n_processes ))
  j_range=range(0,mock_n,j_step)
  j_range.append(mock_n)


  work_queue = multiprocessing.Queue()
  for j in range(np.size(j_range)-1):
    if work_queue.full():
      print "Oh no! Queue is full after only %d iterations" % j
    work_queue.put( (j_range[j:j+2], psf_file[i]) )

  # create a queue to pass to workers to store the results
  result_queue = multiprocessing.Queue()

  # spawn workers
  for j in range(n_processes):
    worker = Worker(work_queue, result_queue)
    worker.start()

  # collect the results off the queue
  while not work_queue.empty():
    result_queue.get()

  print "Writing file ", mock_im_file[i]
  hdu[0].data=im_data
  hdu.writeto(mock_im_file[i])
  print "%f s for parallel computation." % (time.time() - tic)

【问题讨论】:

  • 不是共享大数组,你不能把它分成更小的子数组并将这些子数组发送到子进程吗?然后将结果组合回原始数组。
  • 还可以考虑使用与 Python 不同的东西来处理如此巨大的图像(C 插件?)。

标签: python multiprocessing


【解决方案1】:

我认为问题(正如您在问题中所建议的那样)来自这样一个事实,即您在多个线程的同一数组中写入

im_data_base = multiprocessing.Array(ctypes.c_float, im_size[0]*im_size[1])
im_data = np.ctypeslib.as_array(im_data_base.get_obj())
im_data = im_data.reshape(im_size[0], im_size[1])
im_data[:] = data

虽然我很确定您可以以“进程安全”的方式写入im_data_base(python 使用隐式锁来同步对数组的访问),但我不确定您是否可以写入@987654323 @ 以过程安全的方式。

因此,我会(即使我不确定我是否会解决您的问题)建议您在 im_data

周围创建一个显式锁定
# Disable python implicit lock, we are going to use our own
im_data_base = multiprocessing.Array(ctypes.c_float, im_size[0]*im_size[1], 
    lock=False)
im_data = np.ctypeslib.as_array(im_data_base.get_obj())
im_data = im_data.reshape(im_size[0], im_size[1])
im_data[:] = data
# Create our own lock
im_data_lock = Lock()

然后在进程中,每次需要修改im_data时获取锁

self.im_data_lock.acquire()
im_data[im_rangey[0]:im_rangey[1], im_rangex[0]:im_rangex[1]] = 10
self.im_data_lock.release()

为了简洁起见,我省略了将锁传递给进程的构造函数并将其存储为成员字段 (self.im_data_lock) 的代码。您还应该将 im_data 数组传递给进程的构造函数并将其存储为成员字段。

【讨论】:

    【解决方案2】:

    当多个线程写入图像/数组中的重叠区域时,您的示例中会出现问题。因此,实际上您要么必须为每个图像放置一个锁,要么为每个图像部分创建一组锁(以减少锁争用)。

    或者您可以在一组进程中生成图像修改,并在单独的单个线程中对图像进行实际修改。

    【讨论】:

      猜你喜欢
      • 2020-06-12
      • 2013-06-26
      • 1970-01-01
      • 2019-12-11
      • 2015-06-08
      • 1970-01-01
      • 2017-06-18
      • 1970-01-01
      • 2015-03-15
      相关资源
      最近更新 更多