【问题标题】:In-place shuffling of multidimensional arrays多维数组的就地改组
【发布时间】:2024-04-29 00:10:05
【问题描述】:

我正在尝试在 Cython 中实现一个 NaN 安全的洗牌程序,它可以沿着任意维度的多维矩阵的多个轴进行洗牌。

在一维矩阵的简单情况下,可以使用 Fisher-Yates 算法简单地对所有具有非 NaN 值的索引进行洗牌:

def shuffle1D(np.ndarray[double, ndim=1] x):
    cdef np.ndarray[long, ndim=1] idx = np.where(~np.isnan(x))[0]
    cdef unsigned int i,j,n,m

    randint = np.random.randint
    for i in xrange(len(idx)-1, 0, -1):
        j = randint(i+1)
        n,m = idx[i], idx[j]
        x[n], x[m] = x[m], x[n]

我想扩展此算法以处理大型多维数组而无需重塑(这会触发此处未考虑的更复杂情况的副本)。为此,我需要摆脱固定的输入维度,这对于 numpy 数组和 Cython 中的 memoryviews 似乎都是不可能的。有解决办法吗?

非常感谢!

【问题讨论】:

  • 那么问题只是具有任意数量的维度吗?
  • 当输入的维度未知时,你会使用多少个for循环?
  • @moarningsun 在一般情况下,可以使用数组步幅沿任意轴扫描内存...

标签: python arrays numpy multidimensional-array cython


【解决方案1】:

感谢@Veedrac 的 cmets,此答案使用了更多 Cython 功能。

  • 指针数组存储值的内存地址沿axis
  • 您的算法与修改 that checks for nan values 一起使用,防止它们被排序
  • 它不会为C 有序数组创建副本。对于Fortran 有序数组,ravel() 命令将返回一个副本。这可以通过创建另一个双指针数组来携带x 的值来改进,可能会带来一些缓存损失......

基于切片,此代码至少比其他代码快一个数量级。

from libc.stdlib cimport malloc, free

cimport numpy as np
import numpy as np
from numpy.random import randint

cdef extern from "numpy/npy_math.h":
    bint npy_isnan(double x)

def shuffleND(x, int axis=-1):
    cdef np.ndarray[double, ndim=1] v # view of x
    cdef np.ndarray[int, ndim=1] strides
    cdef int i, j
    cdef int num_axis, pos, stride
    cdef double tmp
    cdef double **v_axis

    if axis==-1:
        axis = x.ndim-1

    shape = list(x.shape)
    num_axis = shape.pop(axis)

    v_axis = <double **>malloc(num_axis*sizeof(double *))
    for i in range(num_axis):
        v_axis[i] = <double *>malloc(1*sizeof(double))

    try:
        tmp_strides = [s//x.itemsize for s in x.strides]
        stride = tmp_strides.pop(axis)
        strides = np.array(tmp_strides, dtype=np.int32)
        v = x.ravel()
        for indices in np.ndindex(*shape):
            pos = (strides*indices).sum()
            for i in range(num_axis):
                v_axis[i] = &v[pos + i*stride]
            for i in range(num_axis-1, 0, -1):
                j = randint(i+1)
                if npy_isnan(v_axis[i][0]) or npy_isnan(v_axis[j][0]):
                    continue
                tmp = v_axis[i][0]
                v_axis[i][0] = v_axis[j][0]
                v_axis[j][0] = tmp
    finally:
        free(v_axis)

    return x

【讨论】:

  • 值得将free 放在finally 块中,但这看起来很整洁。我根本不懂算法,所以我相信这是对的。
  • 请注意,1:ravel可以复制,2:我认为(strides*indices).sum()可能不足以满足所有情况。考虑v[::2].strides
  • @Veedrac 我用几个棘手的输入尝试了(strides*indices).sum(),它似乎有效,我添加了一个注释,如果数组是 Fortran 对齐的,ravel() 将复制...
  • numpy.array(10)[::2] 没有副本可以工作吗?
  • @Veedrac 实际上不是,看来数组必须是C 连续的,以避免得到flatten() 的副本...
【解决方案2】:

以下算法基于切片,不进行任何复制,它应该适用于任何np.ndarray。主要步骤是:

  • np.ndindex() 用于遍历不同的多维索引,不包括属于您要洗牌的轴的索引
  • 应用您已经为一维情况开发的 shuffle。

代码:

def shuffleND(np.ndarray x, axis=-1):
    cdef np.ndarray[long long, ndim=1] idx
    cdef unsigned int i, j, n, m
    if axis==-1:
        axis = x.ndim-1
    all_shape = list(np.shape(x))
    shape = all_shape[:]
    shape.pop(axis)
    for slices in np.ndindex(*shape):
        slices = list(slices)
        axis_slice = slices[:]
        axis_slice.insert(axis, slice(None))
        idx = np.where(~np.isnan(x[tuple(axis_slice)]))[0]
        for i in range(idx.shape[0]-1, 0, -1):
            j = randint(i+1)
            n, m = idx[i], idx[j]
            slice1 = slices[:]
            slice1.insert(axis, n)
            slice2 = slices[:]
            slice2.insert(axis, m)
            slice1 = tuple(slice1)
            slice2 = tuple(slice2)
            x[slice1], x[slice2] = x[slice2], x[slice1]
    return x

【讨论】:

  • 在我看来,这种方法已经抵消了使用 Cython 的任何好处。也许对 user45893 来说已经足够了,但我不知道。
  • @Veedrac 感谢您的评论...我使用数组步幅寻找另一种替代方案并得出了另一个答案...我计时至少比基于解决方案的解决方案快 10 倍切片...