【问题标题】:Improving runtime of python numpy code改进 python numpy 代码的运行时
【发布时间】:2025-11-30 10:40:01
【问题描述】:

我有一个代码可以将 bin 重新分配给一个大的 numpy 数组。基本上,大数组的元素已经以不同的频率进行采样,最终目标是在固定的 bin freq_bins 处重新组合整个数组。对于我拥有的数组,代码有点慢。有什么好的方法可以提高这段代码的运行时间吗?现在只有少数人会这样做。可能会有一些numba 魔法。

import numpy as np
import time
division = 90
freq_division = 50
cd = 3000
boost_factor = np.random.rand(division, division, cd)
freq_bins = np.linspace(1, 60, freq_division)
es = np.random.randint(1,10, size = (cd, freq_division))
final_emit = np.zeros((division, division, freq_division))
time1 = time.time()
for i in xrange(division):
    fre_boost = np.einsum('ij, k->ijk', boost_factor[i], freq_bins)
    sky_by_cap = np.einsum('ij, jk->ijk', boost_factor[i],es)
    freq_index = np.digitize(fre_boost, freq_bins)
    freq_index_reshaped = freq_index.reshape(division*cd, -1)
    freq_index = None
    sky_by_cap_reshaped = sky_by_cap.reshape(freq_index_reshaped.shape)
    to_bin_emit = np.zeros(freq_index_reshaped.shape)
    row_index = np.arange(freq_index_reshaped.shape[0]).reshape(-1, 1)
    np.add.at(to_bin_emit, (row_index, freq_index_reshaped), sky_by_cap_reshaped)
    to_bin_emit = to_bin_emit.reshape(fre_boost.shape)
    to_bin_emit = np.multiply(to_bin_emit, freq_bins, out=to_bin_emit)
    final_emit[i] = np.sum(to_bin_emit, axis=1)
print(time.time()-time1)

【问题讨论】:

  • 也许解释一下这段代码在做什么会有所帮助......
  • @Julien 抱歉,我现在添加了更多解释。
  • 在第 21 行你有: to_bin_emit = to_bin_emit.reshape(frequency_boost1.shape) frequency_boost1.shape 定义在哪里?
  • @TimothyLombard 抱歉,这个问题现在已经修复了。
  • 关于第 22 行:NameError: name 'to_bin_emission' is not defined 您的示例似乎是独立的,也许您应该考虑在发布之前在笔记本中运行代码片段......我喜欢使用新的谷歌实用程序colab.research.google.com

标签: python arrays performance numpy numba


【解决方案1】:

保持代码简单而不是优化

如果您知道要编写什么算法,请编写一个简单的参考实现。从此,您可以使用 Python 采用两种方式。您可以尝试将代码矢量化您可以编译代码以获得良好的性能。

即使 np.einsumnp.add.at 在 Numba 中实现,任何编译器都很难从您的示例中生成高效的二进制代码。

我唯一重写的是一种更有效的数字化标量值的方法。

编辑

在 Numba 源代码中,对标量值进行了更有效的数字化。

代码

#From Numba source
#Copyright (c) 2012, Anaconda, Inc.
#All rights reserved.

@nb.njit(fastmath=True)
def digitize(x, bins, right=False):
    # bins are monotonically-increasing
    n = len(bins)
    lo = 0
    hi = n

    if right:
        if np.isnan(x):
            # Find the first nan (i.e. the last from the end of bins,
            # since there shouldn't be many of them in practice)
            for i in range(n, 0, -1):
                if not np.isnan(bins[i - 1]):
                    return i
            return 0
        while hi > lo:
            mid = (lo + hi) >> 1
            if bins[mid] < x:
                # mid is too low => narrow to upper bins
                lo = mid + 1
            else:
                # mid is too high, or is a NaN => narrow to lower bins
                hi = mid
    else:
        if np.isnan(x):
            # NaNs end up in the last bin
            return n
        while hi > lo:
            mid = (lo + hi) >> 1
            if bins[mid] <= x:
                # mid is too low => narrow to upper bins
                lo = mid + 1
            else:
                # mid is too high, or is a NaN => narrow to lower bins
                hi = mid

    return lo

@nb.njit(fastmath=True)
def digitize(value, bins):
  if value<bins[0]:
    return 0

  if value>=bins[bins.shape[0]-1]:
    return bins.shape[0]

  for l in range(1,bins.shape[0]):
    if value>=bins[l-1] and value<bins[l]:
      return l

@nb.njit(fastmath=True,parallel=True)
def inner_loop(boost_factor,freq_bins,es):
  res=np.zeros((boost_factor.shape[0],freq_bins.shape[0]),dtype=np.float64)
  for i in nb.prange(boost_factor.shape[0]):
    for j in range(boost_factor.shape[1]):
      for k in range(freq_bins.shape[0]):
        ind=nb.int64(digitize(boost_factor[i,j]*freq_bins[k],freq_bins))
        res[i,ind]+=boost_factor[i,j]*es[j,k]*freq_bins[ind]
  return res

@nb.njit(fastmath=True)
def calc_nb(division,freq_division,cd,boost_factor,freq_bins,es):
  final_emit = np.empty((division, division, freq_division),np.float64)
  for i in range(division):
    final_emit[i,:,:]=inner_loop(boost_factor[i],freq_bins,es)
  return final_emit

性能

(Quadcore i7)
original_code: 118.5s
calc_nb: 4.14s
#with digitize implementation from Numba source
calc_nb: 2.66s

【讨论】:

  • @matttree 我已经添加了数字化的 Numba 源实现。这提供了额外的加速...
【解决方案2】:

这似乎很容易并行化:

  • 您有一个运行 90 次的外循环。
  • 每次,除了final_emit 之外,您不会改变任何共享数组
    • ... 仅用于存储到唯一的行中。
  • 看起来循环内的大部分工作都是 numpy 数组范围的操作,这将释放 GIL。

所以(使用 concurrent.futuresfutures 反向端口,因为您似乎使用的是 2.7):

import numpy as np
import time
import futures

division = 90
freq_division = 50
cd = 3000
boost_factor = np.random.rand(division, division, cd)
freq_bins = np.linspace(1, 60, freq_division)
es = np.random.randint(1,10, size = (cd, freq_division))
final_emit = np.zeros((division, division, freq_division))

def dostuff(i):
    fre_boost = np.einsum('ij, k->ijk', boost_factor[i], freq_bins)
    # ...
    to_bin_emit = np.multiply(to_bin_emit, freq_bins, out=to_bin_emit)
    return np.sum(to_bin_emit, axis=1)

with futures.ThreadPoolExecutor(max_workers=8) as x:
    for i, row in enumerate(x.map(dostuff, xrange(division))):
        final_emit[i] = row

如果可行,可以尝试两种调整,其中任何一种都可能更有效。我们并不关心结果返回的顺序,但map 按顺序排列它们。这会浪费一点空间和时间。我认为这不会有太大的不同(大概,您的大部分时间大概都花在了计算上,而不是写出结果上),但是如果不分析您的代码,就很难确定。所以,有两种简单的方法可以解决这个问题。


使用as_completed 可以让我们按照结果完成的顺序使用结果,而不是按照我们排队的顺序。像这样的:

def dostuff(i):
    fre_boost = np.einsum('ij, k->ijk', boost_factor[i], freq_bins)
    # ...
    to_bin_emit = np.multiply(to_bin_emit, freq_bins, out=to_bin_emit)
    return i, np.sum(to_bin_emit, axis=1)

with futures.ThreadPoolExecutor(max_workers=8) as x:
    fs = [x.submit(dostuff, i) for i in xrange(division))
    for i, row in futures.as_completed(fs): 
        final_emit[i] = row

或者,我们可以让函数直接插入行,而不是返回它们。这意味着我们现在正在从多个线程中改变一个共享对象。所以我认为我们在这里需要一个锁,虽然我不是很肯定(numpy 的规则有点复杂,而且我没有仔细阅读你的代码......)。但这可能不会显着损害性能,而且很容易。所以:

import numpy as np
import threading
# etc.
final_emit = np.zeros((division, division, freq_division))
final_emit_lock = threading.Lock()

def dostuff(i):
    fre_boost = np.einsum('ij, k->ijk', boost_factor[i], freq_bins)
    # ...
    to_bin_emit = np.multiply(to_bin_emit, freq_bins, out=to_bin_emit)
    with final_emit_lock:
        final_emit[i] = np.sum(to_bin_emit, axis=1)

with futures.ThreadPoolExecutor(max_workers=8) as x:
    x.map(dostuff, xrange(division))

我所有示例中的max_workers=8 都应该针对您的机器进行调整。线程太多是不好的,因为它们开始互相争斗而不是并行化;线程太少更糟糕,因为你的一些核心只是闲置在那里。

如果您希望它在各种机器上运行,而不是针对每台机器进行调整,最好的猜测(对于 2.7)通常是:

import multiprocessing
# ...
with futures.ThreadPoolExecutor(max_workers=multiprocessing.cpu_count()) as x:

但是,如果您想从特定机器中获得最大性能,您应该测试不同的值。特别是,对于具有超线程的典型四核笔记本电脑,理想值可以是 4 到 8 之间的任何值,具体取决于您正在做的确切工作,并且尝试所有值比尝试预测更容易。

【讨论】:

  • 哇,整个代码的速度提高了大约 3 倍。您能否详细说明您提到的最后两个调整。我不太清楚你将如何在代码中实现这些。
  • @matttree 好的,我为两者添加了(未经测试的)示例。链接文档中也有一些很好的示例(这些示例显然不适合您的代码,但它们已经过测试,并且解释得很好)。另外,最后还有一个注释。
【解决方案3】:

我认为通过将einsum 替换为实际乘法,您的性能会略有提升。

import numpy as np
import time
division = 90
freq_division = 50
cd = 3000
boost_factor = np.random.rand(division, division, cd)
freq_bins = np.linspace(1, 60, freq_division)
es = np.random.randint(1,10, size = (cd, freq_division))
final_emit = np.zeros((division, division, freq_division))
time1 = time.time()
for i in xrange(division):
    fre_boost = boost_factor[i][:, :, None]*freq_bins[None, None, :]
    sky_by_cap = boost_factor[i][:, :, None]*es[None, :, :]
    freq_index = np.digitize(fre_boost, freq_bins)
    freq_index_reshaped = freq_index.reshape(division*cd, -1)
    freq_index = None
    sky_by_cap_reshaped = sky_by_cap.reshape(freq_index_reshaped.shape)
    to_bin_emit = np.zeros(freq_index_reshaped.shape)
    row_index = np.arange(freq_index_reshaped.shape[0]).reshape(-1, 1)
    np.add.at(to_bin_emit, (row_index, freq_index_reshaped), sky_by_cap_reshaped)
    to_bin_emit = to_bin_emit.reshape(fre_boost.shape)
    to_bin_emit = np.multiply(to_bin_emit, freq_bins, out=to_bin_emit)
    final_emit[i] = np.sum(to_bin_emit, axis=1)
print(time.time()-time1)

您的代码在np.add.at 处相当慢,我相信使用np.bincount 可以更快,尽管我无法让它完全适用于您拥有的多维数组。可能有人可以在这里添加。

【讨论】:

  • 谢谢。但是,我认为按照您的建议,速度的提高确实很小,因为einsum 并没有花太多时间。我正在查看np.bincount,但我认为将它正确用于多维数组真的很棘手,正如您也指出的那样。