【问题标题】:Calculating distances between vectors of two numpy arrays计算两个numpy数组的向量之间的距离
【发布时间】:2020-02-29 01:27:26
【问题描述】:

我有两个尺寸为 S x F 的 numpy 数组 R 和尺寸为 N x M x FW >。具体让我们分配以下值N = 5M = 7F = 3S = 4

数组 R 包含一组样本 S = 4F = 3 特征。每行代表一个样本,每一行代表一个特征。因此R[0] 是第一个样本,R[1] 是第二个样本,然后继续。每个R[i-th] 条目,都包含F 元素,例如R[0] = np.array([1, 4, -2])

这是一个小 sn-p 来初始化所有这些值,并考虑到 MWE

import numpy as np

# Size of Map (rows, columns)
N, M = 5, 7

# Number of features
F = 3

# Sample size
S = 4

np.random.seed(13)
R = np.random.randint(0, 10, size=(S, F))
W = np.random.randint(-4, 5, size=(N, M, F))

我们还可以看到 numpy 数组 W的给定“深度线”,作为一个向量,也与数组的每一行具有相同的维度 R(查看两个数组的最后一个维度的大小很容易注意到这一点)。这样我就可以访问W[2, 3] 并获得np.array([ 2, 2, -1 ])(这里的值只是示例)。

我创建了一个简单的函数来计算给定向量 r 到矩阵 W 的每个 “深度线” 的距离以及返回W深度线距离r

最近的元素的位置
def nearest_vector_matrix_naive(r, W):
    delta = np.zeros((N,M), dtype=int)
    for i in range(N):
        for j in range(M):
            norm = 0
            for k in range(F):
                norm += (r[k] - W[i,j,k])**2
            delta[i,j] = norm
            norm = 0
    win_idx = np.unravel_index(np.argmin(delta, axis=None), delta.shape)
    return win_idx

当然这是一种非常幼稚的方法,我可以进一步优化下面的代码,获得巨大的性能提升。

def nearest_vector_matrix(r, W):
    delta = np.sum((W[:,:] - r)**2, axis=2)
    return np.unravel_index(np.argmin(delta, axis=None), delta.shape)

我可以像使用这个函数一样简单

nearest_idx = nearest_vector_matrix(R[0], W)
# Returns the nearest vector in W to R[0]
W[nearest_idx]

由于我的数组 R 包含一堆样本,因此我使用以下 sn-p 来计算最接近样本数组的向量:

def nearest_samples_matrix(R, W):
    DELTA = np.zeros((R.shape[0],2))
    for idx, r in enumerate(R):
        delta = np.sum((W[:,:] - r)**2, axis=2)
        DELTA[idx] = np.unravel_index(np.argmin(delta, axis=None), delta.shape)
    return DELTA

此函数返回一个数组,其中包含 S 行(S 是样本数)的二维索引。那就是 DELTA 具有 (S, 2) 形状(始终)。

我想知道如何替换nearest_samples_matrix 中的for 循环(例如用于广播)以进一步提高代码执行性能?

我不知道该怎么做。 (除了我在第一种情况下能够做到)

【问题讨论】:

  • 数组的实际大小是多少?例如。你的参数F?如果 F 非常小,Kdtree 方法将比所有这些蛮力算法快得多。docs.scipy.org/doc/scipy/reference/generated/…
  • 是的。 F远小于S、N或M。实际上讲量级:S >> N = M >> F,
  • 但是 F 可以大到 100。阅读文档 F 大于 20 并不比蛮力好。
  • 如果 K-Dtree 至少和蛮力方法一样快,我可以在全局范围内使用它,并在 F 维度较小时获得一些好处。

标签: numpy array-broadcasting


【解决方案1】:

最佳解决方案取决于数组的输入大小

对于低维问题 dima kdtree approach 通常是要走的路。关于这个主题有很多答案,例如。 one我几周前写的。

如果问题的维度太高,您可以切换到蛮力算法。以下两种算法都比您的优化方法快得多,但在更大的输入大小和低维问题上,比 kdtree 方法慢得多 O(log(n)) 而不是 O(n^2)。

蛮力1

以下示例使用here 描述的算法。它在大维问题上非常快,因为大部分计算都是在高度优化的矩阵-矩阵乘法算法中完成的。 缺点是高内存使用(所有距离都在一个函数调用中计算)和精度问题,因为更容易出错的计算方法。

import numpy as np
from sklearn.metrics.pairwise import euclidean_distances

def nearest_samples_matrix_2(R,W):
    R_Temp=R
    W_Temp=W.reshape(-1,W.shape[2])
    dist=euclidean_distances(R_Temp, W_Temp)
    ind_1,ind_2=np.unravel_index(np.argmin(dist,axis=1),shape=(W.shape[0],W.shape[1]))
    return np.vstack((ind_1,ind_2)).T

蛮力2

这与您的幼稚方法非常相似,但使用 JIT 编译器 (Numba) 来获得良好的性能。临时数组不是必需的,精度应该很好(只要不发生溢出)。在更大的输入尺寸上还有进一步优化(循环平铺)的空间。

import numpy as np
import numba as nb

#parallelization is only beneficial on larger input data
@nb.njit(fastmath=True,parallel=True,cache=True)
def nearest_samples_matrix_3(r, W):
    ind_i=0
    ind_j=0
    out=np.empty((r.shape[0],2),dtype=np.int64)
    for x in nb.prange(r.shape[0]):
        delta=0
        for k in range(W.shape[2]):
            delta += (r[x,k] - W[0,0,k])**2

        for i in range(W.shape[0]):
            for j in range(W.shape[1]):
                norm = 0
                for k in range(W.shape[2]):
                    norm += (r[x,k] - W[i,j,k])**2
                if norm < delta:
                    delta=norm
                    ind_i=i
                    ind_j=j
        out[x,0]=ind_i
        out[x,1]=ind_j
    return out

时间

#small Arrays
N, M = 100, 200
F = 30
S = 50
R = np.random.randint(0, 10, size=(S, F))
W = np.random.randint(-4, 5, size=(N, M, F))

#your function
%timeit nearest_samples_matrix(R,W)
#268 ms ± 2.94 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit nearest_samples_matrix_2(R,W)
#5.62 ms ± 22.6 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
%timeit nearest_samples_matrix_3(R,W)
#3.68 ms ± 1.01 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

#larger arrays
N, M = 1_000, 2_000
F = 50
S = 100
R = np.random.randint(0, 10, size=(S, F))
W = np.random.randint(-4, 5, size=(N, M, F))

#%timeit nearest_samples_matrix_1(R,W)
#too slow
%timeit nearest_samples_matrix_2(R,W)
#2.76 s ± 17.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit nearest_samples_matrix_3(R,W)
#1.42 s ± 402 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

【讨论】:

  • 我从 numba 收到这条消息:NumbaWarning: Cannot cache compiled function "nearest_samples_matrix_3" 因为它使用动态全局变量(例如 ctypes 指针和大型全局数组)
  • @Lin 您使用的是哪个 Numba 版本 nb.__version__ ?至少在 0.48 上它应该可以工作。但无论如何,如果该函数确实对性能至关重要并且经常被调用,那么在第一次调用时只有大约 800 毫秒(parallel=True)的开销,如果关闭并行化,则更少。
猜你喜欢
  • 2015-02-25
  • 2017-04-21
  • 1970-01-01
  • 1970-01-01
  • 2018-09-21
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-05-29
相关资源
最近更新 更多