【问题标题】:Optimize computation time in nested for loops?优化嵌套 for 循环中的计算时间?
【发布时间】:2021-04-17 13:16:45
【问题描述】:

我有这个代码:

import numpy as np
from skimage.util import img_as_ubyte
from skimage.feature import canny
import math

image = img_as_ubyte(sf_img)
edges = np.flipud(canny(image, sigma=3, low_threshold=10, high_threshold=25))
non_zeros = np.nonzero(edges)
true_rows = non_zeros[0]
true_col = non_zeros[1]
plt.imshow(edges)
plt.show()
N_im = 256
x0 = 0
y0 = -0.25
Npx = 129
Npy = 60
delta_py = 0.025
delta_px = 0.031
Nr = 9
delta_r = 0.5
rho = 0.063
epsilon = 0.75
r_k = np.zeros((1, Nr))
r_min = 0.5

for k in range(0, Nr):
   r_k[0, k] = k * delta_r + r_min

a = np.zeros((Npy, Npx, Nr))

#FOR LOOP TO BE TIME OPTIMIZED
for i in range(0, np.size(true_col, 0)):     #true_col and true_rows has the same size so it doesn't matter
   for m in range(0, Npy):
       for l in range(0, Npx):
           d = math.sqrt(math.pow(
               (((true_col[i] - math.floor((N_im + 1) / 2)) / (N_im + 1) / 2) - (
                    l * delta_px - (Npx * delta_px / 2) + x0)),
            2) + math.pow(
            (((true_rows[i] - math.floor((N_im + 1) / 2)) / (N_im + 1) / 2) - (
                    m * delta_py - (Npy * delta_py / 2) + y0)),
            2))
           min_idx = np.argmin(np.abs(d - r_k))
           rk_hat = r_k[0, min_idx]
           if np.abs(d - rk_hat) < rho:
               a[m, l, min_idx] = a[m, l, min_idx] + 1

#ANOTHER LOOP TO BE OPTIMIZED
# for m in range(0, Npy):
#     for l in range(0, Npx):                                #ORIGINAL
#         for k in range(0, Nr):
#             if a[m, l, k] < epsilon * np.max(a):
#                 a[m, l, k] = 0

a[np.where(a[:, :, :] < epsilon * np.max(a))] = 0          #SUBSTITUTED

a_prime = np.sum(a, axis=2)

acc_x = np.zeros((Npx, 1))
acc_y = np.zeros((Npy, 1))

for l in range(0, Npx):
   acc_x[l, 0] = l * delta_px - (Npx * delta_px / 2) + x0

for m in range(0, Npy):
   acc_y[m, 0] = m * delta_py - (Npy * delta_py / 2) + y0

prod = 0
for m in range(0, Npy):
   for l in range(0, Npx):
       prod = prod + (np.array([acc_x[l, 0], acc_y[m, 0]]) * a_prime[m, l])

points = prod / np.sum(a_prime)

基于对an answer的评论:

true_rows = np.random.randint(0,256,10)
true_col = np.random.randint(0,256,10)

简单地说,它会扫描之前通过 Canny 边缘检测处理过的 256x256 图像。 For 循环因此必须扫描结果图像的每个像素,并且还必须计算 2 个嵌套的 for 循环,这些循环根据 'a' 矩阵的 l 和 m 索引的值执行一些操作。

由于边缘检测返回一个带有 0 和 1 的图像(与边缘相对应),并且由于内部操作必须仅针对单值点进行,所以我使用了

non_zeros = np.nonzero(edges)

只获取我感兴趣的索引。确实,以前的代码是这样的

for i in range(0, N_im):
    for j in range(0, N_im):
        if edges[i, j] == 1:
            for m in range(0, Npy):
                for l in range(0, Npx):
                    d = math.sqrt(math.pow(
                        (((i - math.floor((N_im + 1) / 2)) / (N_im + 1) / 2) - (
                                    l * delta_px - (Npx * delta_px / 2) + x0)),
                        2) + math.pow(
                        (((j - math.floor((N_im + 1) / 2)) / (N_im + 1) / 2) - (
                                    m * delta_py - (Npy * delta_py / 2) + y0)),
                        2))
                    min_idx = np.argmin(np.abs(d - r_k))
                    rk_hat = r_k[0, min_idx]
                    if np.abs(d - rk_hat) < rho:
                        a[m, l, min_idx] = a[m, l, min_idx] + 1

似乎我设法优化了前两个循环,但我的脚本需要比这更快。 运行大约需要 6~7 分钟,我需要执行 1000 次。你能帮我进一步优化这个脚本的 for 循环吗?谢谢!

【问题讨论】:

  • 您的minimal reproducible example 是否包含for 循环操作的数据的代表性示例?我们可以直接从您的问题中复制和粘贴吗?如果您的问题中有任何不相关的代码(优化 for 循环),您可以将其配对到最低限度吗?
  • 是的,当然。我唯一能做的就是通过这个链接给你“sf_img.npy”文件:wetransfer.com/downloads/…。你下载它,你通过np.load()加载它,然后你可以复制粘贴所有代码
  • @wwii 我也添加了导入的库
  • The only thing I can do is - untrue ,您可以包含 for 循环使用的数组的最小示例。第一个需要true_coltrue_rowsminimal 示例——看起来您已经提供了所有常量。我们不必去异地资源来回答您的问题。这些数组的示例不需要具有相同的维度,(4,4)数组可以满足(256,256)数组。
  • 为什么r_k的形状是(1,9)而不是(9,)?

标签: python performance numpy


【解决方案1】:

您可以使用 Numba JIT 来加快计算速度(因为默认的 CPython interpreter 对于此类计算非常不利)。此外,您可以重新设计循环,以便代码可以并行运行。

这是生成的代码:

import numba as nb

# Assume you work with 64-bits integer, 
# feel free to change it to 32-bit integers if this is not the case.
# If you encounter type issue, let Numba choose with: @nb.njit(parallel=True) 
# However, note that the first run will be slower if you let Numba choose.
@nb.njit('int64[:,:,::1](bool_[:,:], float64[:,:], int64, int64, int64, int64, float64, float64, float64, float64, float64)', parallel=True)
def fasterImpl(edges, r_k, Npy, Npx, Nr, N_im, delta_px, delta_py, rho, x0, y0):
    a = np.zeros((Npy, Npx, Nr), dtype=nb.int64)
    # Find all the position where edges[i,j]==1
    validEdgePos = np.where(edges == 1)
    for m in nb.prange(0, Npy):
        for l in range(0, Npx):
            # Iterate over the i,j value where edges[i,j]==1
            for i, j in zip(validEdgePos[0], validEdgePos[1]):
                d = math.sqrt(math.pow(
                    (((i - math.floor((N_im + 1) / 2)) / (N_im + 1) / 2) - (
                                l * delta_px - (Npx * delta_px / 2) + x0)),
                    2) + math.pow(
                    (((j - math.floor((N_im + 1) / 2)) / (N_im + 1) / 2) - (
                                m * delta_py - (Npy * delta_py / 2) + y0)),
                    2))
                min_idx = np.argmin(np.abs(d - r_k))
                rk_hat = r_k[0, min_idx]
                if np.abs(d - rk_hat) < rho:
                    a[m, l, min_idx] += 1
    return a

在我的机器上,使用您的问题中描述的输入(包括提供的sf_img),此代码快了 616 倍

Reference time: 109.680 s
Optimized time:   0.178 s

请注意,结果与参考实现完全相同。

【讨论】:

  • 嗨,在内部循环中而不是在外部循环中迭代边缘是否相同,就像我在问题代码中所做的那样?会影响速度吗?
  • 嗨。在您的情况下,在外循环中有一个 if edges[i, j] == 1 非常好,但它会阻止任何有效的并行化。我颠倒了循环顺序并使用np.where(edges == 1) 方法主要是为了启用并行化。没有 Numba JIT 的顺序代码应该与您的解决方案大致一样快。但是使用 JIT 和并行执行,这要快得多。两者都提供相同的结果(因为只有 a 在循环中被修改,并且操作既是关联的又是可交换的)。在这里使用 Numba 的另一个好处是您不必完全重写循环。
【解决方案2】:

根据您的脚本,您通常对 numpy 几乎没有经验。 Numpy 使用 SIMD 指令进行了优化,您的代码有点击败它。我建议你复习一下如何编写 numpy 代码的基础知识

请查看此备忘单。 https://s3.amazonaws.com/assets.datacamp.com/blog_assets/Numpy_Python_Cheat_Sheet.pdf

例如,这段代码可以从

r_k = np.zeros((1, Nr))
for k in range(0, Nr):
   r_k[0, k] = k * delta_r + r_min

### to a simple np.arange assignment
r_k = np.zeros((1, Nr))
r_k[0,:] = np.arange(Nr) * delta_r + r_min

### or you can do everything in one line
r_k = np.expand_dims(np.arange(Nr) * delta_r + r_min,axis=0)

这段代码有点尴尬,因为您在循环每个元素时创建了一个 np.array。您也可以简化此代码。您是在此处将数据类型从 int 更改为 np.array 吗?

prod = 0
for m in range(0, Npy):
   for l in range(0, Npx):
       prod = prod + (np.array([acc_x[l, 0], acc_y[m, 0]]) * a_prime[m, l])

对于这个循环,你可以慢慢分离出依赖和独立的元素。

#FOR LOOP TO BE TIME OPTIMIZED
for i in range(0, np.size(true_col, 0)):     #true_col and true_rows has the same size so it doesn't matter
   for m in range(0, Npy):
       for l in range(0, Npx):
           d = math.sqrt(math.pow(
               (((true_col[i] - math.floor((N_im + 1) / 2)) / (N_im + 1) / 2) - (
                    l * delta_px - (Npx * delta_px / 2) + x0)),
            2) + math.pow(
            (((true_rows[i] - math.floor((N_im + 1) / 2)) / (N_im + 1) / 2) - (
                    m * delta_py - (Npy * delta_py / 2) + y0)),
            2))
           min_idx = np.argmin(np.abs(d - r_k))
           rk_hat = r_k[0, min_idx]
           if np.abs(d - rk_hat) < rho:
               a[m, l, min_idx] = a[m, l, min_idx] + 1

外循环for i in range(0, np.size(true_col, 0))就可以了

您不需要循环来计算它。对于索引乘法,您可以分配一个额外的矩阵数组,以便获得所需的 1:1 格式。

   for m in range(0, Npy):
       for l in range(0, Npx):
           d = math.sqrt(math.pow(
               (((true_col[i] - math.floor((N_im + 1) / 2)) / (N_im + 1) / 2) - (
                    l * delta_px - (Npx * delta_px / 2) + x0)),
            2) + math.pow(
            (((true_rows[i] - math.floor((N_im + 1) / 2)) / (N_im + 1) / 2) - (
                    m * delta_py - (Npy * delta_py / 2) + y0)),
            2))

要模拟 m 和 l 行为,您可以创建一个 Npx by Npy 索引矩阵。尽管这种模式可能看起来很奇怪,但 Numpy 继承了 MATLAB 生态系统的技巧,因为 MATLAB/numpy 的目标是简化代码并允许您花费更多时间来修复您的逻辑。

## l matrix
[[0,1,2,3,4,5,6,7,8....Npx],
[0,1,2,3,4,5,6,7,8....Npx],
.....
[0,1,2,3,4,5,6,7,8....Npx]]

##m matrix
[[0,0,0,0,0,0,0,0,0,0,0,0],
 [1,1,1,1,,1,1,1,1,1,1,1,1],
  .....
 [Npx,Npx,Npx.....,Npx]]
## You can create both with one command
l_mat, m_mat = np.meshgrid(np.arange(Npx), np.arange(Npy))

>>> l_mat
array([[  0,   1,   2, ..., 147, 148, 149],
       [  0,   1,   2, ..., 147, 148, 149],
       [  0,   1,   2, ..., 147, 148, 149],
       ...,
       [  0,   1,   2, ..., 147, 148, 149],
       [  0,   1,   2, ..., 147, 148, 149],
       [  0,   1,   2, ..., 147, 148, 149]])
>>> m_mat
array([[ 0,  0,  0, ...,  0,  0,  0],
       [ 1,  1,  1, ...,  1,  1,  1],
       [ 2,  2,  2, ...,  2,  2,  2],
       ...,
       [97, 97, 97, ..., 97, 97, 97],
       [98, 98, 98, ..., 98, 98, 98],
       [99, 99, 99, ..., 99, 99, 99]])

使用这两个矩阵,您可以将其相乘以创建结果。

d = np.sqrt(np.pow( true_col[i] - np.floor((N_im + 1)/2)) / (N_im + l_mat).....

这两行代码,你好像是在设置一个argmin矩阵。

   min_idx = np.argmin(np.abs(d - r_k))
   rk_hat = r_k[0, min_idx]

https://numpy.org/doc/stable/reference/generated/numpy.vectorize.html

   vfunc = np.vectorize(lambda x: np.argmin(np.abs(x - r_k))
   min_idx = vfunc(d)
   vfunc2 = np.vectorize(lambda x: r_k[0, x])
   rk_hat = vfunc2(min_idx)
         

对于最后两行,d 和 rk_hat 应该是 Npx by Npy 矩阵。您可以使用矩阵切片或 np.where 来创建矩阵掩码。

       if np.abs(d - rk_hat) < rho:
            

       points = np.where( np.abs(d-rk_hat) < rho )

https://numpy.org/doc/stable/reference/generated/numpy.where.html

我放弃最后一行,如果你把它放在一个循环中可能没关系

      a[m, l, min_idx] = a[m, l, min_idx] + 1


      for xy in points:
         a[xy[0],xy[1], min_idx[xy[0],xy[1]]] += 1
      

【讨论】:

  • 在最后两个代码中,我不需要循环来按顺序遍历每个索引吗?如果没有它,我怎么能增加每对索引?
  • 您的代码也不需要索引。您可以创建一个布尔掩码来指定您想要的索引。您将上述技巧与 a[np.where(a[:, :, :] numpy.org/doc/stable/reference/arrays.indexing.html 我建议你可以让代码更简洁、更容易编写、更快,而且比你想象的要少得多
  • 我尝试创建 d 矩阵,但是当我进入下一步时 min_idx = np.argmin(np.abs(d - r_k)) 我收到错误 ValueError: operands could not be broadcast together with shapes (60,129) (1,9)
  • 另外,将你的 d 公式放入外部 for 循环,我最终只获得最后一个索引 d 矩阵:for i in range(0, np.size(true_col, 0)): d = np.sqrt(np.power((((true_col[i] - np.floor((N_im + 1) / 2)) / (N_im + 1) / 2) - (l_mat * delta_px - (Npx * delta_px / 2) + x0)),2) + np.power((((true_rows[i] - math.floor((N_im + 1) / 2)) / (N_im + 1) / 2) - (m_mat * delta_py - (Npy * delta_py / 2) + y0)),2))
  • 我猜你需要更多帮助。我扩展了我的答案以包含其余代码
【解决方案3】:

优化嵌套循环的新答案,

....
     for i in range(0, np.size(true_col, 0)):     #true_col and true_rows has the same size so it doesn't matter
        for m in range(0, Npy):
            for l in range(0, Npx):

处理时间有显着改善。对于 2500 的 true_coltrue_rows 长度,在我的机器上大约需要 3 秒。它位于用于测试目的的函数中。

def new():
    a = np.zeros((Npy, Npx, Nr),dtype=int)

    # tease out and separate some of the terms
    # used in the calculation of the distance - d
    bb = N_im + 1
    cc = (Npx * delta_px / 2)
    dd = (Npy * delta_py / 2)

    l, m = np.meshgrid(np.arange(Npx), np.arange(Npy))

    q = (true_col - math.floor(bb / 2)) / bb / 2             # shape (true_col length,)
    r = l * delta_px - cc + x0                               # shape(Npy,Npx)
    s = np.square(q - r[...,None])                           # shape(Npy,Npx,true_col length)
                                                             # - last dimension is the outer loop of the original

    t = (true_rows - math.floor(bb / 2)) / bb / 2            # shape (len(true_rows),)
    u = m * delta_py - dd + y0                               # shape(60,129) ... (Npx,Npy)
    v = np.square(t - u[...,None])                           # shape(Npy,Npx,true_col length)

    d = np.sqrt(s + v)                                       # shape(Npy,Npx,true_col length)

    e1 = np.abs(d[...,None] - r_k.squeeze())                 # shape(Npy,Npx,true_col length,len(r_k[0,:]))
    min_idx =  np.argmin(e1,-1)                              # shape(Npy,Npx,true_col length)
    rk_hat = r_k[0,min_idx]                                  # shape(Npy,Npx,true_col length)
    zz = np.abs(d-rk_hat)                                    # shape(Npy,Npx,true_col length)
    condition = zz < rho                                     # shape(Npy,Npx,true_col length)

    # seemingly unavoidable for loop needed to perform 
    # a bincount along the last dimension (filtered)
    # while retaining the 2d position info
    # this will be pretty fast though,
    # nothing really going on other than indexing and assignment
    for iii in range(Npy*Npx):
        row,col = divmod(iii,Npx)
        filter = condition[row,col]
        one_d = min_idx[row,col]
        counts = np.bincount(one_d[filter])
        a[row,col,:counts.size] = counts

    return a

我不知道如何使用 Numpy 方法来摆脱过滤少于 rho 的最终循环并执行 bincount - 如果我弄清楚了,我会更新


来自您的问题和 cmets 的数据

import math
import numpy as np

np.random.seed(5)
n_ = 2500
true_col = np.random.randint(0,256,n_)
true_rows = np.random.randint(0,256,n_)

N_im = 256
x0 = 0
y0 = -0.25
Npx = 129
Npy = 60
# Npx = 8
# Npy = 4
delta_py = 0.025
delta_px = 0.031
Nr = 9
delta_r = 0.5
rho = 0.063
epsilon = 0.75
r_min = 0.5
r_k = np.arange(Nr) * delta_r + r_min
r_k = r_k.reshape(1,Nr)

您在函数中的原始嵌套循环 - 添加了一些诊断功能。

def original(writer=None):
    '''writer should be a csv.Writer object.'''

    a = np.zeros((Npy, Npx, Nr),dtype=int)
    for i in range(0, np.size(true_col, 0)):     #true_col and true_rows has the same size so it doesn't matter
        for m in range(0, Npy):
            for l in range(0, Npx):
                d = math.sqrt(math.pow((((true_col[i] - math.floor((N_im + 1) / 2)) / (N_im + 1) / 2) - (l * delta_px - (Npx * delta_px / 2) + x0)),2) +
                            math.pow((((true_rows[i] - math.floor((N_im + 1) / 2)) / (N_im + 1) / 2) - (m * delta_py - (Npy * delta_py / 2) + y0)),2))
                min_idx = np.argmin(np.abs(d - r_k))    # scalar
                rk_hat = r_k[0, min_idx]    # scalar
                if np.abs(d - rk_hat) < rho:
                    # if (m,l) == (0,0):
                    if writer:
                        writer.writerow([i,m,l,d,min_idx,rk_hat,a[m, l, min_idx] + 1])
                    # print(f'condition satisfied: i:{i} a[{m},{l},{min_idx}] = {a[m, l, min_idx]} + 1')
                    a[m, l, min_idx] = a[m, l, min_idx] + 1
    return a

【讨论】:

  • true_col 和 true_rows 都是 (2230,) ndarray,两者都可以取 0 到 256 之间的整数值。如果需要,您可以尝试使用随机数,或者您可以下载我链接的文件并关注基于此的代码。
  • 代码没有运行:它们缺少多个括号
  • 谢谢@JérômeRichard - 已更新以修复括号,但由于移除了外循环,它仍然会引发广播错误。猜猜这是一项正在进行的工作,正在解决问题。
  • @JérômeRichard - 我已经完全修改了这个答案 - 这会减轻你的反对票吗?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2012-01-27
  • 2020-05-18
  • 1970-01-01
  • 1970-01-01
  • 2021-06-05
  • 1970-01-01
相关资源
最近更新 更多