【问题标题】:Redistributing excess values in numpy 2D array重新分配 numpy 二维数组中的多余值
【发布时间】:2017-06-12 12:09:46
【问题描述】:

我有以下numpy 随机二维数组:

np.random.rand(30, 20)

我想遍历数组中的每个网格单元。如果网格单元格的值 > 0.6,那么我想将多余部分分配给其紧邻的 8 个相邻单元格(在角网格单元格的情况下,相邻单元格的数量会更少)。

应根据 2 个用户选择的规则之一重新分配超出部分:

  1. 平均在 8 个邻居中
  2. 与每个邻居中的值成正比,即具有更高值的邻居变得更高

有没有办法在 numpy 中做到这一点,而无需借助 for 循环?

【问题讨论】:

  • 把多余的分摊均匀?如果邻居超过 0.6 会怎样
  • 在这种 [0.1 0.1 // 0.7 0.7] 的情况下使用比例规则:例如对于右下角元素的多余部分,其他三个的相对权重应该是多少? 7:1:1 还是 6:1:1?换句话说,先称重,然后夹住,还是反过来?
  • 什么是“foll.”

标签: python numpy vectorization


【解决方案1】:

好的,这是我的看法:在每一步中,算法都会检测所有超阈值单元,并同时均匀或按比例更新所有这些单元及其所有邻居;这是完全矢量化的,有两种实现方式:

  • 通常更快的是基于线性卷积加上一些技巧来保存边缘和角落的质量;
  • 另一个将相同的运算符表示为稀疏矩阵,它通常较慢,但我将其保留,因为
  • 它可以处理稀疏参数,因此当超阈值单元的比例较低时速度更快

由于此过程通常不会一步收敛,因此它被放置在一个循环中,但是对于除最小网格之外的所有网格,它的开销应该是最小的,因为它的有效负载很大。循环将在用户提供的循环数后或没有剩余的超阈值单元时终止。可选地,它可以记录迭代之间的欧几里得增量。

关于算法的几句话:如果不是边界,even 扩展操作可以描述为减去重新分配的质量模式 p 和然后添加与环内核卷积的相同模式 k = [1 1 1; 1 0 1; 1 1 1] / 8;类似地,重新分配 proportional 到剩余质量 r 可以写成

(1) r (k * (p / (k * r em>)))

其中 * 是卷积运算符,乘法和除法是组件明智的。解析公式,我们看到 p 中的每个点首先通过其之前的 8 个邻居的剩余质量 r * k 的平均值进行归一化传播到所述邻居(另一个卷积)并用残差进行缩放。预归一化保证了质量守恒。特别是,它正确地规范了边界和角落。在此基础上,我们看到 even 规则的边界问题可以以类似的方式解决:使用 (1) 将 r 替换为一张。

有趣的旁注:使用 proportional 规则可以构建非收敛模式。这里有两个振荡器:

0.7  0  0.8  0  0.8  0             0   0   0   0
 0  0.6  0  0.6  0  0.6            0  1.0 0.6  0
0.8  0  1.0  0  1.0  0             0   0   0   0
 0  0.6  0  0.6  0  0.6

代码如下,恐怕相当冗长和技术性,但我试图解释至少主要(最快)分支;主函数名为level,还有一些简单的测试函数。

有一些 print 语句,但我认为这是唯一的 Python3 依赖项。

import numpy as np
try:
    from scipy import signal
    HAVE_SCIPY = True
except ImportError:
    HAVE_SCIPY = False
import time

SPARSE_THRESH = 0.05
USE_SCIPY = False # actually, numpy workaround is a bit faster

KERNEL = np.zeros((3, 3)) + 1/8
KERNEL[1, 1] = 0
def scipy_ring(a):
    """convolve 2d array a with kernel

    1/8 1/8 1/8
    1/8  0  1/8
    1/8 1/8 1/8
    """
    return signal.convolve2d(a, KERNEL, mode='same')

def numpy_ring(a):
    """convolve 2d array a with kernel

    1/8 1/8 1/8
    1/8  0  1/8
    1/8 1/8 1/8
    """
    tmp = a.copy()
    tmp[:, 1:] += a[:, :-1]
    tmp[:, :-1] += a[:, 1:]
    out = tmp.copy()
    out[1:, :] += tmp[:-1, :]
    out[:-1, :] += tmp[1:, :]
    return (out - a) / 8

if USE_SCIPY and HAVE_SCIPY:
    conv_with_ring = scipy_ring
else:
    conv_with_ring = numpy_ring


# next is yet another implementation of convolution including edge correction.
# what for? it is most of the time much slower than the above but can handle
# sparse inputs and consequently is faster if the fraction of above-threshold
# cells is small (~5% or less)

SPREAD_CACHE = {}
def precomp(sh):
    """precompute sparse representation of operator mapping ravelled 2d
    array of shape sh to boundary corrected convolution with ring kernel

    1/8 1/8 1/8   /   1/5  0  1/5               0  1/3                \
    1/8  0  1/8   |   1/5 1/5 1/5   at edges,  1/3 1/3   at corners   |
    1/8 1/8 1/8   \                                                   /

    stored are
    - a table of flat indices encoding neighbours of the
      cell whose flat index equals the row no in the table
    - two scaled copies of the appropriate weights which
      equal 1 / neighbour count
    """
    global SPREAD_CACHE
    m, n = sh
    # m*n grid points, each with up to 8 neighbours:
    # tedious but straighforward
    indices = np.empty((m*n, 8), dtype=int)
    indices[n-1:, 1] = np.arange(m*n - (n-1)) # NE
    indices[:-(n+1), 3] = np.arange(n+1, m*n) # SE
    indices[:-(n-1), 5] = np.arange(n-1, m*n) # SW
    indices[n+1:, 7] = np.arange(m*n - (n+1)) # NW
    indices[n:, 0] = np.arange((m-1)*n) # N
    indices[:n, [0, 1, 7]] = -1
    indices[:-1, 2] = np.arange(1, m*n) # E
    indices[n-1::n, 1:4] = -1
    indices[:-n, 4] = np.arange(n, m*n) # S
    indices[-n:, 3:6] = -1
    indices[1:, 6] = np.arange(m*n - 1) # W
    indices[::n, 5:] = -1
    # weights are constant along rows, therefore m*n vector suffices
    nei_count = (indices > -1).sum(axis=-1)
    weights = 1 / nei_count
    SPREAD_CACHE[sh] = indices, weights, 8 * weights.reshape(sh)
    return indices, weights, 8 * weights.reshape(sh)

def iadd_conv_ring(a, out):
    """add boundary corrected convolution of 2d array a with
    ring kernel

    1/8 1/8 1/8   /   1/5  0  1/5               0  1/3                \
    1/8  0  1/8   |   1/5 1/5 1/5   at edges,  1/3 1/3   at corners   |
    1/8 1/8 1/8   \                                                   /

    to out.

    IMPORTANT: out must be flat and have one spare field at its end
    which is used as a "/dev/NULL" by the indexing trickery

    if a is a tuple it is interpreted as a sparse representation of the
    form: (flat) indices, values, shape.
    """
    if isinstance(a, tuple): # sparse
        ind, val, sh = a
        k_ind, k_wgt, __ \
            = SPREAD_CACHE[sh] if sh in SPREAD_CACHE else precomp(sh)
        np.add.at(out, k_ind[ind, :], k_wgt[ind, None]*val[:, None])
    else: # dense
        sh = a.shape
        k_ind, k_wgt, __ \
            = SPREAD_CACHE[sh] if sh in SPREAD_CACHE else precomp(sh)
        np.add.at(out, k_ind, k_wgt[:, None]*a.ravel()[:, None])
    return out

# main function
def level(input, threshold=0.6, rule='proportional', maxiter=1,
          switch_to_sparse_at=SPARSE_THRESH, use_conv_matrix=False,
          track_Euclidean_deltas=False):
    """spread supra-threshold mass to neighbours until equilibrium reached

    updates are simultaneous, iterations are capped at maxiter
    'rule' can be 'proportional' or 'even'
    'switch_to_sparse_at' and 'use_conv_matrix' influence speed but not
    result

    returns updated grid, convergence flag, vector of numbers of supratheshold
    cells for each iteration, runtime, [vector of Euclidean deltas]
    """
    sh = input.shape
    m, n = sh
    nei_ind, rec_nc, rec_8 \
        = SPREAD_CACHE[sh] if sh in SPREAD_CACHE else precomp(sh)
    if rule == 'proportional':
        def iteration(state, state_f):
            em = state > threshold
            nnz = em.sum()
            if nnz == 0: # no change, send signal to quit
                return nnz
            elif nnz < em.size * switch_to_sparse_at: # use sparse code
                ei = np.where(em.flat)[0]
                excess = state_f[ei] - threshold
                state_f[-1] = 0
                exc_nei_sum = rec_nc[ei] * state_f[nei_ind[ei, :]].sum(axis=-1)
                exc_nei_ind = np.unique(nei_ind[ei, :])
                if exc_nei_ind[0] == -1:
                    exc_nei_ind = exc_nei_ind[1:]
                nm = exc_nei_sum != 0
                state_swap = state_f[exc_nei_ind]
                state_f[exc_nei_ind] = 1
                iadd_conv_ring((ei[nm], excess[nm] / exc_nei_sum[nm], sh),
                               state_f)
                state_f[exc_nei_ind] *= state_swap
                iadd_conv_ring((ei[~nm], excess[~nm], sh), state_f)
                state_f[ei] -= excess
            elif use_conv_matrix:
                excess = np.where(em, state - threshold, 0)
                state_f[-1] = 0
                nei_sum = (rec_nc * state_f[nei_ind].sum(axis=-1)).reshape(sh)
                nm = nei_sum != 0
                pm = em & nm
                exc_p = np.where(pm, excess, 0)
                exc_p[pm] /= nei_sum[pm]
                wei_nei_sum = iadd_conv_ring(exc_p, np.zeros_like(state_f))
                state += state * wei_nei_sum[:-1].reshape(sh)
                fm = em & ~nm
                exc_f = np.where(fm, excess, 0)
                iadd_conv_ring(exc_f, state_f)
                state -= excess
            else:
                excess = np.where(em, state - threshold, 0)
                nei_sum = conv_with_ring(state)
                # must special case the event of all neighbours being zero
                nm = nei_sum != 0
                # these can be distributed proportionally:
                pm = em & nm
                # select, prenormalise by sum of masses of neighbours,...
                exc_p = np.where(pm, excess, 0)
                exc_p[pm] /= nei_sum[pm]
                # ...spread to neighbours and scale
                spread_p = state * conv_with_ring(exc_p)
                # these can't be distributed proportionally (because all
                # neighbours are zero); therefore fall back to even:
                fm = em & ~nm
                exc_f = np.where(fm, excess * rec_8, 0)
                spread_f = conv_with_ring(exc_f)
                state += spread_p + spread_f - excess
            return nnz
    elif rule == 'even':
        def iteration(state, state_f):
            em = state > threshold
            nnz = em.sum()
            if nnz == 0: # no change, send signal to quit
                return nnz
            elif nnz < em.size * switch_to_sparse_at: # use sparse code
                ei = np.where(em.flat)[0]
                excess = state_f[ei] - threshold
                iadd_conv_ring((ei, excess, sh), state_f)
                state_f[ei] -= excess
            elif use_conv_matrix:
                excess = np.where(em, state - threshold, 0)
                iadd_conv_ring(excess, state_f)
                state -= excess
            else:
                excess = np.where(em, state - threshold, 0)
                # prenormalise by number of neighbours, and spread
                spread = conv_with_ring(excess * rec_8)
                state += spread - excess
            return nnz
    else:
        raise ValueError('unknown rule: ' + rule)

    # master loop
    t0 = time.time()
    out_f = np.empty((m*n + 1,))
    out = out_f[:m*n]
    out[:] = input.ravel()
    out.shape = sh
    nnz = []
    if track_Euclidean_deltas:
        last = input
        E = []
    for i in range(maxiter):
        nnz.append(iteration(out, out_f))
        if nnz[-1] == 0:
            if track_Euclidean_deltas:
                return out, True, nnz, time.time() - t0, E + [0]
            return out, True, nnz, time.time() - t0
        if track_Euclidean_deltas:
            E.append(np.sqrt(((last-out)**2).sum()))
            last = out.copy()
    if track_Euclidean_deltas:
        return out, False, nnz, time.time() - t0, E
    return out, False, nnz, time.time() - t0

# tests

def check_simple():
    A = np.zeros((6, 6))
    A[[0, 1, 1, 4, 4], [0, 3, 5, 1, 5]] = 1.08
    A[5, :] = 0.1 * np.arange(6)
    print('original')
    print(A)
    for rule in ('proportional', 'even'):
        print(rule)
        for lb, ucm, st in (('convolution', False, 0.001),
                            ('matrix', True, 0.001), ('sparse', True, 0.999)):
            print(lb)
            print(level(A, rule=rule, switch_to_sparse_at=st,
                        use_conv_matrix=ucm)[0])

def check_consistency(sh=(300, 400), n=20):
    print("""Running consistency checks with different solvers
{} trials each {} x {} cells

    """.format(n, *sh))
    data = np.random.random((n,) + sh)
    sums = data.sum(axis=(1, 2))
    for th, lb in ((0.975, 'sparse'), (0.6, 'dense'),
                   (0.975, 'sparse'), (0.6, 'dense'),
                   (0.975, 'sparse'), (0.6, 'dense')):
        times = np.zeros((2, 3))
        for d, s in zip (data, sums):
            for i, rule in enumerate(('proportional', 'even')):
                results = []
                for j, (ucm, st) in enumerate (
                        ((False, 0.001), (True, 0.001), (True, 0.999))):
                    res, conv, nnz, time = level(
                        d, rule=rule, switch_to_sparse_at=st,
                        use_conv_matrix=ucm, threshold=th)
                    results.append(res)
                    times[i, j] += time
                assert np.allclose(results[0], results[1])
                assert np.allclose(results[1], results[2])
                assert np.allclose(results[2], results[0])
                assert np.allclose(s, [r.sum() for r in results])
        print("""condition {} finished, no obvious errors; runtimes [sec]:
                 convolution   matrix         sparse solver
proportional  {:13.7f}  {:13.7f}  {:13.7f}
even          {:13.7f}  {:13.7f}  {:13.7f}

""".format(lb, *tuple(times.ravel())))

def check_convergence(sh=(300, 400), maxiter=100):
    data = np.random.random(sh)
    res, conv, nnz, time, Eucl = level(data, maxiter=maxiter,
                                       track_Euclidean_deltas=True)
    print('nnz:', nnz)
    print('delta:', Eucl)
    print('final length:', np.sqrt((res*res).sum()))
    print('ratio:', Eucl[-1] / np.sqrt((res*res).sum()))

【讨论】:

  • @user308827 不客气。享受代码的乐趣!写它当然很有趣。
【解决方案2】:

此解决方案找到要在八个方向中的每一个方向传播的值,然后将它们相加。

编辑:我更改了以下功能以包括比例甚至加权。这些被视为相同的计算,但在使用所有的权重而不是输入数组时甚至可以实现。

编辑:我更改了基准以匹配 @Paul's 测试用例。这比那里提到的几种情况要快,但如果您使用稀疏矩阵,则不是最快的。以下代码的一个明显优势是您不需要博士来理解和维护它:) 它主要使用numpy 数组切片直接执行请求的操作。

import numpy as np
import time
import sys

def main():

    # Define parameters
    numbers = np.random.rand(300, 400)
    threshold = 0.6
    max_iters = 1
    n = 20

    # Clock the evenly distributed total time for n calls
    t0 = time.time()
    for ctr in range(n):
        s=spread( numbers, threshold, max_iters, rule='even' )
    print('Evenly distributed: {:0.4f}s'.format(time.time()-t0))
    # Evenly distributed: 0.2007s

    # Clock the proportionally distributed total time for n calls
    t0 = time.time()
    for ctr in range(n):
        s=spread( numbers, threshold, max_iters, rule='proportional' )
    print('Proportionally distributed: {:0.4f}s'.format(time.time()-t0))
    # Proportionally distributed: 0.2234s

def spread(numbers,threshold,max_iters=10, rule='even', first_call=True):
    '''Spread the extra over a threshold among the adjacent values.'''

    # This recursive function may go over the Python recursion limit!
    if first_call==True :
        if max_iters > 20000:
            raise(ValueError('max_iters must be less than 20000, but got "{}"'.format(max_iters)))
        elif max_iters > 900:
            sys.setrecursionlimit(max_iters+1000)
        else:
            pass

    n_rows = numbers.shape[0]
    n_cols = numbers.shape[1]

    # Find excess over threshold of each point
    excess = np.maximum( numbers - threshold, np.zeros( numbers.shape ) )

    # Find the value to base the weighting on
    if rule == 'even':
        up = np.ones((n_rows-1,n_cols))
        down = np.ones((n_rows-1,n_cols))
        left = np.ones((n_rows,n_cols-1))
        right = np.ones((n_rows,n_cols-1))
        up_left = np.ones((n_rows-1,n_cols-1))
        up_right = np.ones((n_rows-1,n_cols-1))
        down_left = np.ones((n_rows-1,n_cols-1))
        down_right = np.ones((n_rows-1,n_cols-1))

    elif rule == 'proportional':
        up = numbers[1:,:]
        down = numbers[:-1,:]
        left = numbers[:,1:]
        right = numbers[:,:-1]
        up_left = numbers[1:,1:]
        up_right = numbers[1:,:-1]
        down_left = numbers[:-1,1:]
        down_right = numbers[:-1,:-1]

    else:
        raise(ValueError('Invalid rule "{}"'.format(rule)))

    # Find normalized weight in each direction
    num_up = np.concatenate( (up,np.zeros((1,n_cols))), axis=0) 
    num_down = np.concatenate( (np.zeros((1,n_cols)),down), axis=0)
    num_left = np.concatenate( (left,np.zeros((n_rows,1))), axis=1)
    num_right = np.concatenate( (np.zeros((n_rows,1)),right), axis=1)
    num_up_left = np.concatenate( (np.concatenate( (up_left,np.zeros((1,n_cols-1))), axis=0), np.zeros((n_rows,1))), axis=1)
    num_up_right = np.concatenate( (np.zeros((n_rows,1)), np.concatenate( (up_right,np.zeros((1,n_cols-1))), axis=0)), axis=1)
    num_down_left = np.concatenate( (np.concatenate( (np.zeros((1,n_cols-1)),down_left), axis=0), np.zeros((n_rows,1))), axis=1)
    num_down_right = np.concatenate( (np.zeros((n_rows,1)), np.concatenate( (np.zeros((1,n_cols-1)),down_right), axis=0)), axis=1)
    num_sum = num_up + num_down + num_left + num_right + num_up_left + num_up_right + num_down_left + num_down_right
    up_weight = num_up / num_sum
    down_weight = num_down / num_sum
    left_weight = num_left / num_sum
    right_weight = num_right / num_sum
    up_left_weight = num_up_left / num_sum
    up_right_weight = num_up_right / num_sum
    down_left_weight = num_down_left / num_sum
    down_right_weight = num_down_right / num_sum

    # Set NaN values to zero
    up_weight[np.isnan(up_weight)] = 0
    down_weight[np.isnan(down_weight)] = 0
    left_weight[np.isnan(left_weight)] = 0
    right_weight[np.isnan(right_weight)] = 0
    up_left_weight[np.isnan(up_left_weight)] = 0
    up_right_weight[np.isnan(up_right_weight)] = 0
    down_left_weight[np.isnan(down_left_weight)] = 0
    down_right_weight[np.isnan(down_right_weight)] = 0

    # Apply weight to the excess to find the contributions
    up = (excess * up_weight)[:-1,:]
    down = (excess * down_weight)[1:,:]
    left = (excess * left_weight)[:,:-1]
    right = (excess * right_weight)[:,1:]
    up_left = (excess * up_left_weight)[:-1,:-1]
    up_right = (excess * up_right_weight)[:-1,1:]
    down_left = (excess * down_left_weight)[1:,:-1]
    down_right = (excess * down_right_weight)[1:,1:]

    # Pad with zeros
    down = np.concatenate( (down,np.zeros((1,n_cols))), axis=0) 
    up = np.concatenate( (np.zeros((1,n_cols)),up), axis=0)
    right = np.concatenate( (right,np.zeros((n_rows,1))), axis=1)
    left = np.concatenate( (np.zeros((n_rows,1)),left), axis=1)
    down_right = np.concatenate( (np.concatenate( (down_right,np.zeros((1,n_cols-1))), axis=0), np.zeros((n_rows,1))), axis=1)
    down_left = np.concatenate( (np.zeros((n_rows,1)), np.concatenate( (down_left,np.zeros((1,n_cols-1))), axis=0)), axis=1)
    up_right = np.concatenate( (np.concatenate( (np.zeros((1,n_cols-1)),up_right), axis=0), np.zeros((n_rows,1))), axis=1)
    up_left = np.concatenate( (np.zeros((n_rows,1)), np.concatenate( (np.zeros((1,n_cols-1)),up_left), axis=0)), axis=1)

    # Add the contributions to find the result 
    result = numbers - excess + up + down + left + right + up_left + up_right + down_left + down_right

    if (np.amax(result) > threshold) and (max_iters > 1):
        return spread(numbers=result,threshold=threshold,max_iters=max_iters-1,rule=rule,first_call=False)
    else:
        return result

if __name__ == '__main__':
    main()        

【讨论】:

    【解决方案3】:
    def disperse_peaks(a,thr,iter=10,prop=False):
        n=a.shape
        i=np.arange(n[0])
        j=np.arange(n[1])
        m=np.fmax(np.abs(i[:,None,None,None]-i[None,None,:,None]),np.abs(j[None,:,None,None]-j[None,None,None,:]))==1
        if prop:
            transf=a*m/np.sum(a*m,axis=(2,3))[:,:,None,None]
        else:
            transf=m/np.sum(m,axis=(2,3))[:,:,None,None]
        b=a.copy()
        idx=0
        while np.max(b)>thr:
            idx+=1
            resid=np.where(b>thr,thr,b)
            diff=b-resid
            b=resid+np.einsum('ijkl,ij->kl',transf,diff)
            if idx==iter:
                break
        return b
    

    这是做什么的:

    • 获取 4D 变换数组的索引
    • 通过比较索引创建布尔邻接矩阵(最大差为 1)
    • 将布尔矩阵除以每个切片中的“真”数以获得权重(成比例或其他)
    • 通过获取差值、转换它并添加到残差来更新(最多 iter 次)矩阵

    进行了一些测试,无论迭代多少次,都不能保证准确达到 0.6。不知道为什么(可能是浮点比较错误),但它似乎可以正常工作。总和保持不变,max(disperse_peaks(a)) 趋向于thr

    至于第二个选项,我需要更多关于给周围数字赋予何种加权类型的信息。我现在已经线性完成了,但几乎任何分布都是可能的。

    【讨论】:

    • "做了一些测试,无论迭代多少次,都不能保证准确到 0.6。" - 如果起始配置有任何一对高于 0.6 的邻居,它们将保持彼此高于 0.6;边距可能会缩小,但永远不会变为零。
    • 谢谢,第一次没有通读您的答案,只是想获得高效的代码。
    猜你喜欢
    • 1970-01-01
    • 2013-02-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-07-30
    • 2021-03-11
    • 1970-01-01
    相关资源
    最近更新 更多