【发布时间】:2026-02-22 05:45:02
【问题描述】:
我有一个 scipy.sparse.csr_matrix 格式的大型稀疏矩阵 X,我想利用并行性将它乘以一个 numpy 数组 W。经过一些研究,我发现我需要在多处理中使用 Array 以避免在进程之间复制 X 和 W(例如来自这里:How to combine Pool.map with Array (shared memory) in Python multiprocessing? 和 Is shared readonly data copied to different processes for Python multiprocessing?)。这是我最近的尝试
import multiprocessing
import numpy
import scipy.sparse
import time
def initProcess(data, indices, indptr, shape, Warr, Wshp):
global XData
global XIndices
global XIntptr
global Xshape
XData = data
XIndices = indices
XIntptr = indptr
Xshape = shape
global WArray
global WShape
WArray = Warr
WShape = Wshp
def dot2(args):
rowInds, i = args
global XData
global XIndices
global XIntptr
global Xshape
data = numpy.frombuffer(XData, dtype=numpy.float)
indices = numpy.frombuffer(XIndices, dtype=numpy.int32)
indptr = numpy.frombuffer(XIntptr, dtype=numpy.int32)
Xr = scipy.sparse.csr_matrix((data, indices, indptr), shape=Xshape)
global WArray
global WShape
W = numpy.frombuffer(WArray, dtype=numpy.float).reshape(WShape)
return Xr[rowInds[i]:rowInds[i+1], :].dot(W)
def getMatmat(X):
numJobs = multiprocessing.cpu_count()
rowInds = numpy.array(numpy.linspace(0, X.shape[0], numJobs+1), numpy.int)
#Store the data in X as RawArray objects so we can share it amoung processes
XData = multiprocessing.RawArray("d", X.data)
XIndices = multiprocessing.RawArray("i", X.indices)
XIndptr = multiprocessing.RawArray("i", X.indptr)
def matmat(W):
WArray = multiprocessing.RawArray("d", W.flatten())
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count(), initializer=initProcess, initargs=(XData, XIndices, XIndptr, X.shape, WArray, W.shape))
params = []
for i in range(numJobs):
params.append((rowInds, i))
iterator = pool.map(dot2, params)
P = numpy.zeros((X.shape[0], W.shape[1]))
for i in range(numJobs):
P[rowInds[i]:rowInds[i+1], :] = iterator[i]
return P
return matmat
if __name__ == '__main__':
#Create a random sparse matrix X and a random dense one W
X = scipy.sparse.rand(10000, 8000, 0.1)
X = X.tocsr()
W = numpy.random.rand(8000, 20)
startTime = time.time()
A = getMatmat(X)(W)
parallelTime = time.time()-startTime
startTime = time.time()
B = X.dot(W)
nonParallelTime = time.time()-startTime
print(parallelTime, nonParallelTime)
但是输出类似于:(4.431, 0.165) 表示并行版本比非并行乘法慢得多。
我相信当一个人将大数据复制到进程时,在类似的情况下可能会导致减速,但这里不是这种情况,因为我使用 Array 来存储共享变量(除非它发生在 numpy.frombuffer 或创建时csr_matrix,但后来我找不到直接共享 csr_matrix 的方法)。速度慢的另一个可能原因是为每个进程返回每个矩阵乘法的大结果,但是我不确定解决这个问题的方法。
有人能看出我哪里出错了吗? 谢谢你的帮助!
更新:我不能确定,但我认为在进程之间共享大量数据并没有那么高效,理想情况下我应该使用多线程(尽管全局解释器锁 (GIL) 使得这非常困难)。解决此问题的一种方法是使用 Cython 发布 GIL(请参阅http://docs.cython.org/src/userguide/parallelism.html),尽管许多 numpy 函数需要通过 GIL。
【问题讨论】:
-
您是否将 numpy/scipy 链接到优化的多线程 ATLAS 构建?如果你这样做,你应该在使用 np.dot 时免费获得并行矩阵乘法。
-
我正在使用链接到 numpy/scipy 的多线程 BLAS 库 (OpenBLAS) 但我测试了 X.dot(W) 和 numpy.dot(X, W) (后者不适用于稀疏X) 这不是并行的。
标签: python parallel-processing scipy sparse-matrix