加快 numpy 向量化方法的一种方法是避免为临时数据分配昂贵的内存,更有效地使用缓存并利用并行化。这可以使用Numba、Cython 或C 轻松完成。请注意,并行化并不总是有益的。如果要转换的数组太小,请使用单线程版本(parallel=False)
Numba 版本的 Cyril Gaudefroy 使用临时内存分配回答
import numba as nb
import numpy as np
@nb.njit(nb.uint16[::1](nb.uint8[::1]),fastmath=True,parallel=True)
def nb_read_uint12(data_chunk):
"""data_chunk is a contigous 1D array of uint8 data)
eg.data_chunk = np.frombuffer(data_chunk, dtype=np.uint8)"""
#ensure that the data_chunk has the right length
assert np.mod(data_chunk.shape[0],3)==0
out=np.empty(data_chunk.shape[0]//3*2,dtype=np.uint16)
for i in nb.prange(data_chunk.shape[0]//3):
fst_uint8=np.uint16(data_chunk[i*3])
mid_uint8=np.uint16(data_chunk[i*3+1])
lst_uint8=np.uint16(data_chunk[i*3+2])
out[i*2] = (fst_uint8 << 4) + (mid_uint8 >> 4)
out[i*2+1] = ((mid_uint8 % 16) << 8) + lst_uint8
return out
Cyril Gaudefroy 的 Numba 版本通过内存预分配回答
如果你在类似大小的数据块上多次应用这个函数,你只能预分配输出数组一次。
@nb.njit(nb.uint16[::1](nb.uint8[::1],nb.uint16[::1]),fastmath=True,parallel=True,cache=True)
def nb_read_uint12_prealloc(data_chunk,out):
"""data_chunk is a contigous 1D array of uint8 data)
eg.data_chunk = np.frombuffer(data_chunk, dtype=np.uint8)"""
#ensure that the data_chunk has the right length
assert np.mod(data_chunk.shape[0],3)==0
assert out.shape[0]==data_chunk.shape[0]//3*2
for i in nb.prange(data_chunk.shape[0]//3):
fst_uint8=np.uint16(data_chunk[i*3])
mid_uint8=np.uint16(data_chunk[i*3+1])
lst_uint8=np.uint16(data_chunk[i*3+2])
out[i*2] = (fst_uint8 << 4) + (mid_uint8 >> 4)
out[i*2+1] = ((mid_uint8 % 16) << 8) + lst_uint8
return out
带有临时内存分配的 DGrifffith 答案的 Numba 版本
@nb.njit(nb.uint16[::1](nb.uint8[::1]),fastmath=True,parallel=True,cache=True)
def read_uint12_var_2(data_chunk):
"""data_chunk is a contigous 1D array of uint8 data)
eg.data_chunk = np.frombuffer(data_chunk, dtype=np.uint8)"""
#ensure that the data_chunk has the right length
assert np.mod(data_chunk.shape[0],3)==0
out=np.empty(data_chunk.shape[0]//3*2,dtype=np.uint16)
for i in nb.prange(data_chunk.shape[0]//3):
fst_uint8=np.uint16(data_chunk[i*3])
mid_uint8=np.uint16(data_chunk[i*3+1])
lst_uint8=np.uint16(data_chunk[i*3+2])
out[i*2] = (fst_uint8 << 4) + (mid_uint8 >> 4)
out[i*2+1] = (lst_uint8 << 4) + (15 & mid_uint8)
return out
带有内存预分配的 DGrifffith 答案的 Numba 版本
@nb.njit(nb.uint16[::1](nb.uint8[::1],nb.uint16[::1]),fastmath=True,parallel=True,cache=True)
def read_uint12_var_2_prealloc(data_chunk,out):
"""data_chunk is a contigous 1D array of uint8 data)
eg.data_chunk = np.frombuffer(data_chunk, dtype=np.uint8)"""
#ensure that the data_chunk has the right length
assert np.mod(data_chunk.shape[0],3)==0
assert out.shape[0]==data_chunk.shape[0]//3*2
for i in nb.prange(data_chunk.shape[0]//3):
fst_uint8=np.uint16(data_chunk[i*3])
mid_uint8=np.uint16(data_chunk[i*3+1])
lst_uint8=np.uint16(data_chunk[i*3+2])
out[i*2] = (fst_uint8 << 4) + (mid_uint8 >> 4)
out[i*2+1] = (lst_uint8 << 4) + (15 & mid_uint8)
return out
时间
num_Frames=10
data_chunk=np.random.randint(low=0,high=255,size=np.int(640*256*1.5*num_Frames),dtype=np.uint8)
%timeit read_uint12_gaud(data_chunk)
#11.3 ms ± 53.4 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
#435 MB/s
%timeit nb_read_uint12(data_chunk)
#939 µs ± 24.3 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
#5235 MB/s
out=np.empty(data_chunk.shape[0]//3*2,dtype=np.uint16)
%timeit nb_read_uint12_prealloc(data_chunk,out)
#407 µs ± 5.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
#11759 MB/s
%timeit read_uint12_griff(data_chunk)
#10.2 ms ± 55.9 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
#491 MB/s
%timeit read_uint12_var_2(data_chunk)
#928 µs ± 16.5 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
#5297 MB/s
%timeit read_uint12_var_2_prealloc(data_chunk,out)
#403 µs ± 13.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
#12227 MB/s