【问题标题】:Rewriting a for loop in pure NumPy to decrease execution time在纯 NumPy 中重写 for 循环以减少执行时间
【发布时间】:2011-02-05 06:25:57
【问题描述】:

recently asked about trying to optimise a Python loop for a scientific application,并为我收到了an excellent, smart way of recoding it within NumPy which reduced execution time by a factor of around 100

但是,B 值的计算实际上嵌套在其他几个循环中,因为它是在规则的位置网格中计算的。是否有类似的智能 NumPy 重写来缩短此过程的时间?

我怀疑这部分的性能提升会不那么明显,缺点大概是无法向用户报告计算进度,结果无法写入输出文件直到计算结束,并且可能在一个巨大的步骤中执行此操作会影响内存?有没有可能绕过这些?

import numpy as np
import time

def reshape_vector(v):
    b = np.empty((3,1))
    for i in range(3):
        b[i][0] = v[i]
    return b

def unit_vectors(r):
     return r / np.sqrt((r*r).sum(0))

def calculate_dipole(mu, r_i, mom_i):
    relative = mu - r_i
    r_unit = unit_vectors(relative)
    A = 1e-7

    num = A*(3*np.sum(mom_i*r_unit, 0)*r_unit - mom_i)
    den = np.sqrt(np.sum(relative*relative, 0))**3
    B = np.sum(num/den, 1)
    return B

N = 20000 # number of dipoles
r_i = np.random.random((3,N)) # positions of dipoles
mom_i = np.random.random((3,N)) # moments of dipoles
a = np.random.random((3,3)) # three basis vectors for this crystal
n = [10,10,10] # points at which to evaluate sum
gamma_mu = 135.5 # a constant

t_start = time.clock()
for i in range(n[0]):
    r_frac_x = np.float(i)/np.float(n[0])
    r_test_x = r_frac_x * a[0]
    for j in range(n[1]):
        r_frac_y = np.float(j)/np.float(n[1])
        r_test_y = r_frac_y * a[1]
        for k in range(n[2]):
            r_frac_z = np.float(k)/np.float(n[2])
            r_test = r_test_x +r_test_y + r_frac_z * a[2]
            r_test_fast = reshape_vector(r_test)
            B = calculate_dipole(r_test_fast, r_i, mom_i)
            omega = gamma_mu*np.sqrt(np.dot(B,B))
            # write r_test, B and omega to a file
    frac_done = np.float(i+1)/(n[0]+1)
    t_elapsed = (time.clock()-t_start)
    t_remain = (1-frac_done)*t_elapsed/frac_done
    print frac_done*100,'% done in',t_elapsed/60.,'minutes...approximately',t_remain/60.,'minutes remaining'

【问题讨论】:

    标签: python optimization numpy physics


    【解决方案1】:

    如果您 profile 您的代码,您会看到 99% 的运行时间在 calculate_dipole 中,因此减少此循环的时间实际上不会显着减少执行时间。如果你想让它更快,你仍然需要关注calculate_dipole。我为此尝试了calculate_dipole 的 Cython 代码,总时间减少了大约 2 倍。可能还有其他方法可以改进 Cython 代码。

    【讨论】:

      【解决方案2】:

      您可以做的一件显而易见的事情是替换该行

      r_test_fast = reshape_vector(r_test)
      

      r_test_fast = r_test.reshape((3,1))
      

      可能不会对性能产生任何重大影响,但无论如何使用 numpy 内置函数而不是重新发明轮子是有意义的。

      一般来说,您现在可能已经注意到,优化 numpy 的技巧是借助 numpy 整个数组操作或至少使用切片来表达算法,而不是在 python 代码中迭代每个元素。倾向于防止这种“矢量化”的是所谓的循环携带依赖,即每次迭代都依赖于前一次迭代的结果的循环。简单看一下你的代码,你没有这样的东西,应该可以很好地矢量化你的代码。

      编辑:一种解决方案

      我尚未验证这是否正确,但应该让您了解如何处理它。

      首先,拨打cartesian() function, which we'll use。那么

      def calculate_dipole_vect(mus, r_i, mom_i): # Treat each mu sequentially Bs = [] omega = [] for mu in mus: rel = mu - r_i r_norm = np.sqrt((rel * rel).sum(1)) r_unit = rel / r_norm[:, np.newaxis] A = 1e-7 num = A*(3*np.sum(mom_i * r_unit, 0)*r_unit - mom_i) den = r_norm ** 3 B = np.sum(num / den[:, np.newaxis], 0) Bs.append(B) omega.append(gamma_mu * np.sqrt(np.dot(B, B))) return Bs, omega # Transpose to get more "natural" ordering with row-major numpy r_i = r_i.T mom_i = mom_i.T t_start = time.clock() r_frac = cartesian((np.arange(n[0]) / float(n[0]), np.arange(n[1]) / float(n[1]), np.arange(n[2]) / float(n[2]))) r_test = np.dot(r_frac, a) B, omega = calculate_dipole_vect(r_test, r_i, mom_i) print 'Total time for vectorized: %f s' % (time.clock() - t_start)

      嗯,在我的测试中,这实际上比我开始使用的基于循环的方法要慢一些。问题是,在问题的原始版本中,它已经通过对形状数组 (20000, 3) 的全数组操作进行了矢量化,因此任何进一步的矢量化并没有真正带来更多的好处。事实上,它可能会降低性能,如上所述,可能是由于临时数组很大。

      【讨论】:

      • 我认为 Justin 对个人资料的建议可能是明智的,但非常感谢……虽然我不确定我是否会使用它,但我认为尝试理解该示例可能是一个非常好的方法的学习。 :)
      猜你喜欢
      • 2018-02-04
      • 1970-01-01
      • 2019-10-07
      • 2018-03-27
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-10-21
      • 1970-01-01
      相关资源
      最近更新 更多