【问题标题】:Vectorized 2D interpolation矢量化 2D 插值
【发布时间】:2020-02-11 16:12:32
【问题描述】:

我正在使用 SciPy 的 2D 插值函数 (scipy.interpolate.interp2d)。我有存储为Z = [x, y, p1, p2] 的字段。我想使用二维矩阵[x, y, :, :] 在每个xy 点进行插值。 Z 的形状是(40, 40, 6, 6)

我能做到这一点的唯一方法是使用 for 循环并遍历每个点。下面是我使用的代码。 ab 是具有 (6,)(6,) 形状的数组。

for i in range(40):
    for j in range(40):
        ff = interpolate.interp2d(a, b, z[i,j,:,:]) 
        zi[i,j] = ff(atest,btest)

有没有办法不使用 for 循环来做到这一点?

【问题讨论】:

  • 通常在我熟悉的 2D 插值问题中,您将拥有一个 3D 数据集(具有 x、y 位置和一些属性 z),以及一些 x_new、y_new 位置,可能在网格中,你想知道 z_hat 在。但是你有……更多的维度。您能否发布更多信息,或者发布您的数据(或看起来像您的数据的内容)?

标签: python numpy scipy vectorization interpolation


【解决方案1】:

由于您本质上是在相同的 2d 点上插值一堆不同的函数,因此没有内置方法可以做到这一点是合理的。虽然可以省去一些计算,因为对于每个函数,插值下都有相同的 2d 网格,所以期望也不是不可能的。在任何情况下,我都找不到不循环的内置方法。

幸运的是,我们可以使用一些肘部润滑脂来实现bilinear interpolator(就像循环的一样),它使用矢量化同时处理所有点。这是一个实现这个的类:

import numpy as np 

class VectorizedBilinearInterpolator: 
    def __init__(self, x0, y0, z0): 
        """Create a bilinear interpolator for gridded input data 

        Inputs: 
            x0: shape (ngridx,)
            y0: shape (ngridy,)
            z0: shape batch_shape + (ngridy, ngridx) (viewed as batches) 
        """

        if z0.shape[-2:] != y0.shape + x0.shape: 
            raise ValueError("The last two dimensions of z0 must match that of y0 and x0, respectively!") 

        ind_x = np.argsort(x0) 
        self.x0 = x0[ind_x] 

        ind_y = np.argsort(y0) 
        self.y0 = y0[ind_y] 

        self.batch_shape = z0.shape[:-2] 
        indexer = np.ix_(ind_y, ind_x) 
        self.z0 = z0[..., indexer[0], indexer[1]].reshape(-1, y0.size, x0.size)  # shape (nbatch, ngridy, ngridx)

        # compute auxiliary coefficients for interpolation 
        # we have ngridx-1 boxes along x and ngridy-1 boxes along y
        # for each box we need 4 coefficients for bilinear interpolation
        # see e.g. https://en.wikipedia.org/wiki/Bilinear_interpolation#Alternative_algorithm
        # construct a batch of matrices with size (ngridy-1, ngridx-1, 4, 4) to invert
        x1 = self.x0[:-1]
        x2 = self.x0[1:]
        y1 = self.y0[:-1, None]
        y2 = self.y0[1:, None]
        x1,x2,y1,y2,one = np.broadcast_arrays(x1, x2, y1, y2, 1)  # all shaped (ngridy-1, ngridx-1)

        M = np.array([[one, x1, y1, x1*y1], [one, x1, y2, x1*y2],
                      [one, x2, y1, x2*y1], [one, x2, y2, x2*y2]]).transpose(2, 3, 0, 1)  # shape (ngridy-1, ngridx-1, 4, 4)
        zvec = np.array([self.z0[:, :-1, :-1], self.z0[:, 1:, :-1], self.z0[:, :-1, 1:], self.z0[:, 1:, 1:]])  # shape (4, nbatch, ngridy-1, ngridx-1)

        self.coeffs = np.einsum('yxab,bnyx -> nyxa', np.linalg.inv(M), zvec)  # shape (nbatch, ngridy-1, ngridx-1, 4) for "a0,a1,a2,a3" coefficients
        # for a given box (i,j) the interpolated value is given by self.coeffs[:,i,j,:] @ [1, x, y, x*y]

    def __call__(self, x, y): 
        """Evaluate the interpolator at the given coordinates 

        Inputs: 
            x: shape (noutx,) 
            y: shape (nouty,) 

        Output: 
            z: shape batch_shape + (nouty, noutx) (see __init__) 
        """

        # identify outliers (and mask at the end)
        out_x = (x < self.x0[0]) | (self.x0[-1] < x)
        out_y = (y < self.y0[0]) | (self.y0[-1] < y)

        # clip outliers, mask later
        xbox = (self.x0.searchsorted(x) - 1).clip(0, self.x0.size - 2)  # shape (noutx,) indices
        ybox = (self.y0.searchsorted(y) - 1).clip(0, self.y0.size - 2)  # shape (nouty,) indices
        indexer = np.ix_(ybox, xbox)

        xgrid,ygrid = np.meshgrid(x, y)  # both shape (nouty, noutx)

        coeffs_now = self.coeffs[:, indexer[0], indexer[1], :]  # shape (nbatch, nouty, noutx, 4)
        poly = np.array([np.ones_like(xgrid), xgrid, ygrid, xgrid*ygrid])  # shape (4, nouty, noutx)
        values =  np.einsum('nyxa,ayx -> nyx', coeffs_now, poly)  # shape (nbatch, nouty, noutx)

        # reshape final result and mask outliers
        z = values.reshape(self.batch_shape + xgrid.shape)
        z[..., out_y, :] = np.nan
        z[..., :, out_x] = np.nan

        return z

这里有一些测试:

from scipy import interpolate  # only needed for the loopy comparison

# input points
x0 = np.linspace(0, 1, 6)
y0 = np.linspace(0, 1, 7)
z0 = np.random.rand(40, 41, y0.size, x0.size)

# output points
xtest = np.linspace(0, 1, 20)
ytest = np.linspace(0, 1, 21)

# the original loopy version
def loopy():
    zi = np.empty(z0.shape[:2] + (ytest.size, xtest.size))
    for i in range(z0.shape[0]):
        for j in range(z0.shape[1]):
            ff = interpolate.interp2d(x0, y0, z0[i,j,:,:])
            zi[i,j] = ff(xtest, ytest)
    return zi

# the new, vectorized version
def vector():
    f = VectorizedBilinearInterpolator(x0, y0, z0)
    zi = f(xtest, ytest)
    return zi

首先,我们应该确保两个插值器做同样的事情:

>>> np.allclose(loopy(), vector())
True

其次,由于懒惰,我们可以使用 ipython 的 %timeit 魔法来计时这个虚拟示例:

>>> %timeit loopy()
... %timeit vector()
216 ms ± 2.57 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
25.9 ms ± 1.08 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

所以至少在这个小例子中,矢量化版本快了 8 倍。实际加速很大程度上取决于实际示例的大小。尽管我已经尝试正确实现异常值(即将异常值设置为np.nan,而不是推断废话),但我没有彻底检查所有边缘情况,因此请确保您测试实际数据的实现(通过将循环与几个典型案例的新插值器)。


如果您的问题包含真正的问题,即形状为(40,40,6,6) 的最终数组,那么这些工作都不值得您花费任何时间(考虑到非常短的运行时间),您应该只使用循环。

【讨论】:

    猜你喜欢
    • 2012-09-12
    • 1970-01-01
    • 2016-04-25
    • 1970-01-01
    • 2022-01-22
    • 2021-05-08
    • 2014-05-24
    • 2019-04-13
    • 1970-01-01
    相关资源
    最近更新 更多