【发布时间】:2021-09-23 14:04:41
【问题描述】:
我正在编写一些使用Goerztel method 构建频谱图的代码。计算主要使用 Numpy ndarrays 执行。最终的频谱图是一个 2D ndarray(例如 1536 x 828),它是从初始空/零 ndarray 构建的,然后使用 Goerztel 算法的结果(列向量)进行更新,该算法执行num_windows 次。
我有使用其他编程语言(C/Java)进行多线程/并行处理的经验,但对 Python 不太熟悉。我有一个多进程版本的代码工作,但我觉得有一种更优雅/更有效的方法。根据我对 Python 中的代码和多处理的理解,一些变量的副本用于每个进程(transformed_cols 和coefficients ndarrays),我认为这是可以避免的。
我认为这段代码适合并行性的原因是,当写入发生在同一个 ndarray 上时,没有重叠 ndarray 的哪一部分被写入。
通过阅读其他类似的帖子,我未能找到适合我的情况以解决我的问题的帖子,因此我们将不胜感激。我认为可以改进的部分是 apply_async 函数调用,我只是不确定如何:(
对于它的价值,与我的串行解决方案相比,我看到使用以下解决方案(在我的机器上)的速度提高了大约 3-3.5 倍
def build_specific_spectrogram(signal: np.ndarray,
sample_rate: int,
filterbank: Filterbank,
analysis_window: AnalysisWindow,
time_spaces: list,
num_windows: int) -> np.ndarray:
if :
## other spectrograms here
elif filterbank.name == 'goertzel':
spect = np.zeros((filterbank.num_bands, num_windows), dtype='complex_')
transformed_cols = build_window_transformed_cols(analysis_window.data, signal, num_windows, analysis_window.window_overlap)
coefficients = filterbank.band_frequencies / sample_rate * transformed_cols.shape[0]
num_processes = mp.cpu_count()
def update_spect(result, index):
spect[:,index] = result
pool = mp.Pool(processes=num_processes)
for win_index in range(num_windows-1):
func_callback = partial(update_spect, index=win_index)
pool.apply_async(build_goertzel_async, [win_index, transformed_cols, coefficients], callback=func_callback)
pool.close()
pool.join()
return spect
def build_goertzel_async(win_index, transformed_cols, coefficients):
signal_window = transformed_cols[:, win_index]
window_powers = generalized_goertzel(signal_window, coefficients)
return window_powers[:,]
def build_window_transformed_cols(analysis_window_data: np.ndarray, sample_window: np.ndarray, num_windows: int, window_overlap: float) -> np.ndarray:
transformed_cols = np.zeros((len(analysis_window_data), num_windows - 1))
s_index = 0
e_index = len(analysis_window_data)
for win_index in range(num_windows-1):
windowed_signal = sample_window[s_index:e_index]
transformed_cols[:, win_index] = np.asarray([windowed_signal[i] * analysis_window_data[i] for i in range(len(windowed_signal))])
s_index += window_overlap
e_index += window_overlap
return transformed_cols
def generalized_goertzel(signal_window: np.ndarray,
coefficients: np.ndarray) -> np.ndarray:
signal_length = len(signal_window)
signal_window = np.reshape(signal_window, (signal_length, 1), order='F')
num_freqs = len(coefficients)
powers = np.zeros((num_freqs), dtype = 'complex_')
for freq_index in range(num_freqs):
A = 2 * math.pi * (coefficients[freq_index] / signal_length)
B = math.cos(A) * 2
C = cmath.exp(A * -1j)
s_0 = 0
s_1 = 0
s_2 = 0
for i in range(0, signal_length-1):
s_0 = signal_window[i] + B * s_1 - s_2
s_2 = s_1
s_1 = s_0
s_0 = signal_window[signal_length - 1] + B * s_1 - s_2
powers[freq_index] = s_0 - s_1 * C
powers[freq_index] = powers[freq_index] * cmath.exp(A * (signal_length - 1) * -1j)
return powers
对于没有提供可以运行的代码提前道歉,但这需要完整的代码库,这对于 stackoverflow 帖子来说有点长。
【问题讨论】:
-
我会使用
multiprocessing.shared_memory,就像我写的答案here
标签: python numpy multiprocessing python-multiprocessing