虽然没有直接的方法以完全矢量化的方式执行此操作,但对于较大的输入,单个应用 mask[full_row_indices, full_col_indices] 和预先计算的完整索引列表比多次应用 mask[partial_row_indices, partial_col_indices] 更快。
在内存方面,多个应用程序的要求也较低,因为不需要构建中间 full_row_indices/full_col_indices。
当然这通常取决于indices的长度。
为了了解不同可能的解决方案的速度有多快,我们测试了以下功能:
import numpy as np
import random
def gen_mask_direct(col_indices, cols=None):
if cols is None:
cols = np.max(np.concatenate(col_indices)) + 1
rows = len(col_indices)
mask = np.zeros((rows, cols), dtype=bool)
for row_index, col_index in enumerate(col_indices):
mask[row_index, col_index] = True
return mask
def gen_mask_loops(col_indices, cols=None):
rows = len(col_indices)
row_indices = tuple(i for i, j in enumerate(col_indices) for _ in j)
col_indices = tuple(sum(col_indices, ()))
if cols is None:
cols = np.max(col_indices) + 1
mask = np.zeros((rows, cols), dtype=bool)
mask[row_indices, col_indices] = True
return mask
def gen_mask_np_repeat(col_indices, cols=None):
rows = len(col_indices)
lengths = list(map(len, col_indices))
row_indices = np.repeat(np.arange(rows), lengths)
col_indices = np.concatenate(col_indices)
if cols is None:
cols = np.max(col_indices) + 1
mask = np.zeros((rows, cols), dtype=bool)
mask[row_indices, col_indices] = True
return mask
def gen_mask_np_concatenate(col_indices, cols=None):
rows = len(col_indices)
row_indices = tuple(np.full(len(col_index), i) for i, col_index in enumerate(col_indices))
row_indices = np.concatenate(row_indices)
col_indices = np.concatenate(col_indices)
if cols is None:
cols = np.max(col_indices) + 1
mask = np.zeros((rows, cols), dtype=bool)
mask[row_indices, col_indices] = True
return mask
gen_mask_direct()基本上就是@Derlin answer,实现了mask[partial_row_indices, partial_col_indices]的多种应用。
所有其他人都实现了mask[full_row_indices, full_col_indices] 的单个应用程序,并以不同的方式准备full_row_indices 和full_col_indices:
-
gen_mask_loops() 使用直接循环
-
gen_mask_np_repeat() 使用 np.repeat()(它与 @Divakar answer 基本相同)
-
gen_mask_np_concatenate() 使用 np.full() 和 np.concatenate() 的组合
快速健全性检查表明所有这些都是等效的:
funcs = gen_mask_direct, gen_mask_loops, gen_mask_np_repeat, gen_mask_np_concatenate
random.seed(0)
test_inputs = [
(tuple(
tuple(sorted(set([random.randint(0, n - 1) for _ in range(random.randint(1, n - 1))])))
for _ in range(random.randint(1, n - 1))))
for n in range(5, 6)
]
print(test_inputs)
# [((0, 2, 3, 4), (2, 3, 4), (1, 4), (0, 1, 4))]
for func in funcs:
print('Func:', func.__name__)
for test_input in test_inputs:
print(func(test_input).astype(int))
Func: gen_mask_direct
[[1 0 1 1 1]
[0 0 1 1 1]
[0 1 0 0 1]
[1 1 0 0 1]]
Func: gen_mask_loops
[[1 0 1 1 1]
[0 0 1 1 1]
[0 1 0 0 1]
[1 1 0 0 1]]
Func: gen_mask_np_repeat
[[1 0 1 1 1]
[0 0 1 1 1]
[0 1 0 0 1]
[1 1 0 0 1]]
Func: gen_mask_np_concatenate
[[1 0 1 1 1]
[0 0 1 1 1]
[0 1 0 0 1]
[1 1 0 0 1]]
以下是一些基准测试(使用来自here 的代码):
并以最快的速度缩放:
支持整体声明,通常情况下,对完整索引单个应用 mask[...] 比对部分索引多个应用 mask[...] 更快。
为了完整起见,以下代码用于生成输入、比较输出、运行基准测试并准备绘图:
def gen_input(n):
random.seed(0)
return tuple(
tuple(sorted(set([random.randint(0, n - 1) for _ in range(random.randint(n // 2, n - 1))])))
for _ in range(random.randint(n // 2, n - 1)))
def equal_output(a, b):
return np.all(a == b)
input_sizes = tuple(int(2 ** (2 + (3 * i) / 4)) for i in range(13))
print('Input Sizes:\n', input_sizes, '\n')
runtimes, input_sizes, labels, results = benchmark(
funcs, gen_input=gen_input, equal_output=equal_output,
input_sizes=input_sizes)
plot_benchmarks(runtimes, input_sizes, labels, units='ms')
plot_benchmarks(runtimes, input_sizes, labels, units='ms', zoom_fastest=2)