【问题标题】:Numpy huge matrix dot product while multiprocessing多处理时 Numpy 巨大的矩阵点积
【发布时间】:2019-03-13 22:19:39
【问题描述】:

我正在实现一个特殊的 EM-GMM。

X 是形状为 [1000000, 900] 的数据矩阵,是一个 numpy mmap 对象
Q是一个形状为[900, 900]的精度矩阵,是一个ndarray

我还使用 Multiprocessing 库在 40 个内核上同时处理超过 200 个 Q 矩阵,使用相同的数据矩阵 (X)。

它适用于较小的尺寸,例如 [1mil, 196], [1mil, 400],
但是当我尝试在进程的某个时刻运行 [1mil, 900] 时会引发异常:

OSError: [Errno 12] 无法分配内存

我猜这个问题是因为我有 2 个大计算,这可能分配了大矩阵。

作为 E 步骤的一部分,我需要计算:
np.sum(X.dot(Q) * X, axis=1)

作为 M 步的一部分,我需要计算(W 是 [1mil, 1] 权重向量):
(X.T * W).dot(X)

将来我必须对更大尺寸的数据(形状 [2mil, 2500] 甚至 [2mil, 10k])运行这个 EM-GMM
我该怎么做才能使这些计算更节省内存?

编辑:

我注意到worker初始化使用pickle,所以X-matrix变成了ndarray并且worker不共享它(这意味着X-matrix被所有worker复制并填满了我的RAM)

我知道如何解决它,如果解决了会更新。
但如果有人知道如何处理它,我将不胜感激。

【问题讨论】:

  • np.einsum('ij, jk, ik -> i', X, Q, X)np.einsum('ji, ij, jk -> ik', X, W, X) 有帮助吗?也许在np.einsum 中有一个optimize = True 参数?不是memmap 的专家,但这应该会阻止一些中间矩阵的形成(希望如此)。
  • * 乘法是逐元素乘法,而不是矩阵点积,所以我认为 einsum 不会有帮助
  • np.einsum('ij, ij -> ij', A, B) 元素乘法。这只是einsum 允许线性代数一步完成。 (即ij, jk, ik -> ik, ik -> ik -> iji, ij, jk -> ij, jk -> ik)。这就是爱因斯坦求和符号的好处——您可以在一个框架内进行最常见的线性代数运算。
  • 哦,我不知道。我会试试的,谢谢:)
  • 您是否仅限于 numpy-memmap 或者您还可以使用更复杂的存储系统,例如 HDF5? Numpy memmap 仅在从磁盘连续读取数据时(沿 C 有序数组中的最后一个轴)提供良好的性能。您的所有计算都可以分成几部分,基本上即使在您的上一个示例中 [2mil, 10k],小于 2GB 的 RAM 也应该足够了(甚至更少,但磁盘 IO 将远高于高)。您在工人之间共享 X 矩阵,而不是将其复制给每个工人,对吗?您的 RAM 限制是多少?

标签: python numpy


【解决方案1】:

原来有 2 个不相关的问题导致 RAM 过度使用。

首先,当为多处理工作人员腌制时,memmap 对象已从磁盘完全读取。
这种数据复制为每个工作人员分配了 6.7GB 的额外 RAM。
为了解决这个问题,我创建了一个共享的RawArray 并将数据加载到其中,并在每个工作人员上使用了np.frombuffer

其次,X.dot(Q)(X.T * W) 都导致 numpy 分配另一个 X 形矩阵,即另一个 6.7GB RAM
我从该线程创建了答案的变体:https://stackoverflow.com/a/21096605/5572523
由于我的矩阵又瘦又高,所以我对行进行了切片:

def _block_slices(dim_size, block_size):
    count = 0
    while True:
        yield slice(count, count + block_size, 1)
        count += block_size
        if count >= dim_size:
            raise StopIteration

现在我可以迭代成批的数据(在处理 weight=0 时也增加了一些额外的加速)

我设置了max_elements = 2 ** 27,因为我使用的是float64,所以这会产生一个1GB的矩阵(如果我没记错的话)。

所以(X.T * W).dot(X) 转向:

def weighted_outer_prod(X, W):
    n, d = X.shape

    max_rows = max(1, int(max_elements / d))
    sigma = np.zeros([d, d])
    for mm in _block_slices(n, max_rows):
        sigma += batch_weighted_outer_prod(X[mm, :], W[mm])
    return sigma

def batch_weighted_outer_prod(batch, W):
    nz = W > 0
    buff = np.empty(batch[nz].shape)
    np.multiply(batch[nz], W[nz, np.newaxis], out=buff)
    sigma = buff.T.dot(batch[nz])
    del(buff)
    return sigma

np.sum(X.dot(Q) * X, axis=1) 转为:(不要介意函数名)

def calc_l_k(X, Q):
    max_rows = max(1, int(max_elements / d))
    l_k = np.empty(n)
    for mm in _block_slices(n, max_rows):
        l_k[mm] = batch_l_k(X[mm, :], Q)

    return l_k


def batch_l_k(batch, Q):
    buff = np.empty(batch.shape)
    np.dot(batch, Q, out=buff)
    np.multiply(buff, batch, out=buff)
    l_k = -0.5 * np.sum(buff, axis=1)
    del(buff)
    return l_k

现在它运行在形状为 [1mil, 900] 的 X 上,我希望它仍然可以在更高的维度上运行。

【讨论】:

    猜你喜欢
    • 2019-06-28
    • 2015-03-31
    • 2017-07-30
    • 1970-01-01
    • 2017-02-26
    • 1970-01-01
    • 2019-10-08
    • 1970-01-01
    • 2019-05-10
    相关资源
    最近更新 更多