【问题标题】:Vectorization to achieve performance矢量化以实现性能
【发布时间】:2018-06-11 20:44:20
【问题描述】:

我想避免在以下代码中使用 for 循环来实现性能。向量化适合这类问题吗?

a = np.array([[0,1,2,3,4],
              [5,6,7,8,9],
              [0,1,2,3,4],
              [5,6,7,8,9],
              [0,1,2,3,4]],dtype= np.float32)

temp_a = np.copy(a)

for i in range(1,a.shape[0]-1):
    for j in range(1,a.shape[1]-1):
        if a[i,j] > 3:
            temp_a[i+1,j] += a[i,j] / 5.
            temp_a[i-1,j] += a[i,j] / 5.
            temp_a[i,j+1] += a[i,j] / 5.
            temp_a[i,j-1] += a[i,j] / 5.
            temp_a[i,j]   -= a[i,j] * 4. / 5.
a = np.copy(temp_a)   

【问题讨论】:

  • 只要在所有字段上运行,最后generic_filter就会修改所有数据
  • 我没有遇到过这样的预定义过滤器。但是创建一个我们想要的并不难。因此,如果您给我们一个示例,您的数组元素必须如何被过滤器修改,我们可以构建一个。
  • @Saranns 我将准备一个示例并更新我的问题。
  • 就目前而言,结果似乎取决于元素更新的顺序。这是故意的吗?
  • @PaulPanzer 我使用临时数组更改了示例。我想如果我使用预定义的过滤器函数,结果将不依赖于元素更新的顺序,对吧?

标签: python arrays numpy scipy vectorization


【解决方案1】:

你基本上是在做卷积,对边框做了一些特殊处理。

尝试以下方法:

from scipy.signal import convolve2d


# define your filter
f = np.array([[0.0, 0.2, 0.0],
              [0.2,-0.8, 0.2],
              [0.0, 0.2, 0.0]])

# select parts of 'a' to be used for convolution
b = (a * (a > 3))[1:-1, 1:-1]

# convolve, padding with zeros ('same' mode)
c = convolve2d(b, f, mode='same')

# add the convolved result to 'a', excluding borders
a[1:-1, 1:-1] += c

# treat the special cases of the borders
a[0, 1:-1] += .2 * b[0, :]
a[-1, 1:-1] += .2 * b[-1, :]
a[1:-1, 0] += .2 * b[:, 0]
a[1:-1, -1] += .2 * b[:, -1]

它给出以下结果,这与嵌套循环相同。

[[  0.    2.2   3.4   4.6   4. ]
 [  6.2   2.6   4.2   3.   10.6]
 [  0.    3.4   4.8   6.2   4. ]
 [  6.2   2.6   4.2   3.   10.6]
 [  0.    2.2   3.4   4.6   4. ]]

【讨论】:

  • 这个答案使用更少的内存并且比我的答案略快。谢谢!
【解决方案2】:

我的跟踪使用 3 个过滤器,rot90np.wherenp.sumnp.multiply。我不确定哪种基准测试方法更合理。如果您考虑创建过滤器的时间,则速度大约快 4 倍。

# Each filter basically does what `op` tries to achieve in a loop

filter1 = np.array([[0, 1 ,0, 0, 0],
                  [1, -4, 1, 0, 0],
                  [0, 1, 0, 0, 0],
                  [0, 0, 0, 0, 0],
                  [0, 0, 0, 0, 0]]) /5.

filter2 = np.array([[0, 0 ,1, 0, 0],
                  [0, 1, -4, 1, 0],
                  [0, 0, 1, 0, 0],
                  [0, 0, 0, 0, 0],
                  [0, 0, 0, 0, 0]]) /5.
filter3 = np.array([[0, 0 ,0, 0, 0],
                  [0, 0, 1, 0, 0],
                  [0, 1, -4, 1, 0],
                  [0, 0, 1, 0, 0],
                  [0, 0, 0, 0, 0]]) /5.
# only loop over the center of the matrix, a
center = np.array([[0, 0, 0, 0, 0],
                   [0, 1, 1, 1, 0],
                   [0, 1, 1, 1, 0],
                   [0, 1, 1, 1, 0],
                   [0, 0, 0, 0, 0]])

filter1filter2 可以旋转以分别表示 4 个过滤器。

filter1_90_rot = np.rot90(filter1, k=1)
filter1_180_rot = np.rot90(filter1, k=2)
filter1_270_rot = np.rot90(filter1, k=3)
filter2_90_rot = np.rot90(filter2, k=1)
filter2_180_rot = np.rot90(filter2, k=2)
filter2_270_rot = np.rot90(filter2, k=3)

# Based on different index from `a` return different filter

filter_dict = {
             (1,1): filter1,
             (3,1): filter1_90_rot,
             (3,3): filter1_180_rot,
             (1,3): filter1_270_rot,
             (1,2): filter2,
             (2,1): filter2_90_rot,
             (3,2): filter2_180_rot,
             (2,3): filter2_270_rot,
             (2,2): filter3
            }

主要功能

def get_new_a(a):
    x, y = np.where(((a > 3) * center) > 0) # find pairs that match the condition
    return a + np.sum(np.multiply(filter_dict[i, j], a[i,j])
                      for (i, j) in zip(x,y))

注意:似乎存在一些数字错误,例如 np.equal() 在我的结果和 OP 之间大多会返回 False,而 np.close() 会返回 true。

计时结果

def op():
    temp_a = np.copy(a)

    for i in range(1,a.shape[0]-1):
        for j in range(1,a.shape[1]-1):
            if a[i,j] > 3:
                temp_a[i+1,j] += a[i,j] / 5.
                temp_a[i-1,j] += a[i,j] / 5.
                temp_a[i,j+1] += a[i,j] / 5.
                temp_a[i,j-1] += a[i,j] / 5.
                temp_a[i,j]   -= a[i,j] * 4. / 5.
    a2 = np.copy(temp_a)   

%timeit op()
167 µs ± 2.72 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

%timeit get_new_a(a):
37.2 µs ± 2.68 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each) 

再次注意,我们忽略了创建过滤器的时间,因为我认为这将是一次性的事情。 如果您确实想包括创建过滤器的时间,它大约快两倍。您可能认为这不公平,因为 op 的方法包含两个 np.copy。我认为 op 方法的瓶颈是 for 循环。

参考:

numpy.multiply 在两个矩阵之间进行元素乘法。
np.rot90 为我们进行旋转。 k 是一个参数,您可以决定旋转多少次。 np.isclose 可以使用这个函数来检查两个矩阵是否在你可以定义的某个误差范围内接近。

【讨论】:

  • 这适用于具有不同形状的较大数组吗?
  • @BehzadJamali 我没试过。我现在只是用你的例子。如果您有其他示例,我可以尝试对其进行试验。
  • 我认为你只需要定义正确的过滤器来处理不同的形状。
  • 此答案不适用于具有不同形状的数组(您可以通过在数组a 中添加另一列来测试它)。我的真实数组是 4000*4000 矩阵(比如说 a = np.random.uniform(low=0, high=6, size=(4000,4000)) )。是否可以将您的答案与我在此处为大随机数组提供的答案进行比较?
  • 现在我明白你所说的不同形状是什么意思了。我现在不确定如何使用 (4000, 4000) 矩阵来做到这一点。我认为这属于另一个问题。我的 2 美分是你需要注意重复并利用它。
【解决方案3】:

我想出了这个解决方案:

a = np.array([[0,0,0,0,0],
              [0,6,2,8,0],
              [0,1,5,3,0],
              [0,6,7,8,0],
              [0,0,0,0,0]],dtype= np.float32)

up= np.zeros_like(a)
down= np.zeros_like(a)
right= np.zeros_like(a)
left = np.zeros_like(a)

def new_a(a,u,r,d,l):

    c = np.copy(a)
    c[c <= 3] = 0

    up[:-2, 1:-1] += c[1:-1,1:-1] / 5.
    down[2:, 1:-1] += c[1:-1,1:-1] / 5.
    left[1:-1, :-2] += c[1:-1,1:-1]/ 5.
    right[1:-1, 2:] += c[1:-1,1:-1] / 5.
    a[1:-1,1:-1] -= c[1:-1,1:-1] * 4. / 5.
    a += up + down + left + right

    return a

【讨论】:

    猜你喜欢
    • 2021-03-13
    • 2018-03-14
    • 1970-01-01
    • 2013-10-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多