【问题标题】:Index a NumPy array row-wise [duplicate]逐行索引NumPy数组[重复]
【发布时间】:2018-08-06 10:51:08
【问题描述】:

假设我有一个 NumPy 数组:

>>> X = np.array([[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12]])
>>> X
array([[ 1,  2,  3,  4],
       [ 5,  6,  7,  8],
       [ 9, 10, 11, 12]])

以及我要为每一行选择的索引数组:

>>> ixs = np.array([[1, 3], [0, 1], [1, 2]])
>>> ixs
array([[1, 3],
       [0, 1],
       [1, 2]])

如何索引数组 X 以便为 X 中的每一行选择 ixs 中指定的两个索引?

所以对于这种情况,我想为第一行选择元素 1 和 3,为第二行选择元素 0 和 1,依此类推。输出应该是:

array([[2, 4],
       [5, 6],
       [10, 11]])

一个缓慢的解决方案是这样的:

output = np.array([row[ix] for row, ix in zip(X, ixs)])

但是,对于极长的数组,这可能会有点慢。有没有使用 NumPy 没有循环的更快方法?

编辑:在 2.5K * 1M 阵列和 2K 宽 ixs (10GB) 上进行一些非常近似的速度测试:

np.array([row[ix] for row, ix in zip(X, ixs)])0.16s

X[np.arange(len(ixs)), ixs.T].T0.175s

X.take(idx+np.arange(0, X.shape[0]*X.shape[1], X.shape[1])[:,None])33s

np.fromiter((X[i, j] for i, row in enumerate(ixs) for j in row), dtype=X.dtype).reshape(ixs.shape)2.4s

【问题讨论】:

  • 这会产生一个接近你想要的数组:X[:,ixs]。任何人都可以在此基础上进行构建吗?
  • 您认为并行化是一种可接受的解决方案吗?
  • 你能像这样重建 ixs:ixs2 = np.array(((0, 1), (0, 3), (1, 0), ...))?如果是这样,那么X[ixs2[:,0], ixs2[:,1]] 会得到我认为你需要的东西。
  • 否 @MohsinBukhari 我已经在更高的循环上并行化。我也不认为并行化在这里会有所帮助,因为在进程之间传递信息很慢。
  • 嗯,您是否尝试过使用 numba JIT 的直接 for 循环实现?

标签: python arrays numpy optimization indexing


【解决方案1】:

你可以用这个:

X[np.arange(len(ixs)), ixs.T].T

Here 是复杂索引的参考。

【讨论】:

  • 这是一个更简洁的解决方案,但在我的(近似)测试中实际上有点慢。我测量了原始版本的 1.75s 和 1.15s
  • @mxbi 您要在多大的阵列上进行测试?除非您使用大型数组,否则我希望这些小尺寸的列表理解更快...
  • 我的阵列是 10K * 1M @juanpa.arrivillaga - 尽管我最初的测试是在一个小得多的阵列上。在大数组上,这个版本只比列表理解慢一点。
  • @mxbi mmm 如果我不得不猜测双转置只是花费太长时间来弥补有效的索引......你可以尝试计时我的.take 方法,也许将结果添加为修改问题?
  • @mxbi 10K * 1M 内存太大(10G long ???),也许你需要尝试10K * 10K级别。
【解决方案2】:

我相信你可以这样使用.take

In [185]: X
Out[185]:
array([[ 1,  2,  3,  4],
       [ 5,  6,  7,  8],
       [ 9, 10, 11, 12]])

In [186]: idx
Out[186]:
array([[1, 3],
       [0, 1],
       [1, 2]])

In [187]: X.take(idx + (np.arange(X.shape[0]) * X.shape[1]).reshape(-1, 1))
Out[187]:
array([[ 2,  4],
       [ 5,  6],
       [10, 11]])

如果您的数组尺寸很大,这样做可能会更快,尽管更丑:

idx+np.arange(0, X.shape[0]*X.shape[1], X.shape[1])[:,None]

只是为了好玩,看看下面的表现如何:

np.fromiter((X[i, j] for i, row in enumerate(ixs) for j in row), dtype=X.dtype, count=ixs.size).reshape(ixs.shape)

编辑以添加计时

In [15]: X = np.arange(1000*10000, dtype=np.int32).reshape(1000,-1)

In [16]: ixs = np.random.randint(0, 10000, (1000, 2))

In [17]: ixs.sort(axis=1)

In [18]: ixs
Out[18]:
array([[2738, 3511],
       [3600, 7414],
       [7426, 9851],
       ...,
       [1654, 8252],
       [2194, 8200],
       [5497, 8900]])

In [19]: %timeit  np.array([row[ix] for row, ix in zip(X, ixs)])
928 µs ± 23.8 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

In [20]: %timeit X[np.arange(len(ixs)), ixs.T].T
23.6 µs ± 491 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)

In [21]: %timeit X.take(idx+np.arange(0, X.shape[0]*X.shape[1], X.shape[1])[:,None])
20.6 µs ± 530 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)

In [22]: %timeit np.fromiter((X[i, j] for i, row in enumerate(ixs) for j in row), dtype=X.dtype, count=ixs.size).reshape(ixs.shape)
1.42 ms ± 9.94 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

@mxbi 我添加了一些时间,我的结果和你的不太一致,你应该检查一下

这是一个更大的数组:

In [33]: X = np.arange(10000*100000, dtype=np.int32).reshape(10000,-1)

In [34]: ixs = np.random.randint(0, 100000, (10000, 2))

In [35]: ixs.sort(axis=1)

In [36]: X.shape
Out[36]: (10000, 100000)

In [37]: ixs.shape
Out[37]: (10000, 2)

有一些结果:

In [42]: %timeit  np.array([row[ix] for row, ix in zip(X, ixs)])
11.4 ms ± 177 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

In [43]: %timeit X[np.arange(len(ixs)), ixs.T].T
596 µs ± 17.8 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

In [44]: %timeit X.take(ixs+np.arange(0, X.shape[0]*X.shape[1], X.shape[1])[:,None])
540 µs ± 16.6 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

现在,我们使用第 500 列索引而不是两个索引,我们看到列表理解开始胜出:

In [45]: ixs = np.random.randint(0, 100000, (10000, 500))

In [46]: ixs.sort(axis=1)

In [47]: %timeit  np.array([row[ix] for row, ix in zip(X, ixs)])
93 ms ± 1.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [48]: %timeit X[np.arange(len(ixs)), ixs.T].T
133 ms ± 638 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)

In [49]: %timeit X.take(ixs+np.arange(0, X.shape[0]*X.shape[1], X.shape[1])[:,None])
87.5 ms ± 1.13 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

【讨论】:

  • 不幸的是,这个似乎有点慢。在 10K x 1M 矩阵上,较快的版本需要 33 秒才能完成,而列表理解需要 170 毫秒。
  • @mxbi 是的,如果你的尺寸很大,idx + ... 的东西会有很多开销,不过,试试idx+np.arange(0, X.shape[0]*X.shape[1], X.shape[1])[:,None] 我想知道它是否更快......即使不是很快够了。
  • @mxbi 10G long确实太多了,瓶颈可能是操作系统分页。
  • @liliscent 这台机器有 128GB 内存并且没有交换所以我没有分页。是的@ juanpa 我想这在较小的阵列上效果更好。
  • @mxbi 你试过这个版本吗:idx+np.arange(0, X.shape[0]*X.shape[1], X.shape[1])[:,None],我认为它不够快,但我很好奇它是否比另一个更快......
【解决方案3】:

从行索引项目的通常建议是:

X[np.arange(X.shape[0])[:,None], ixs]

即做一个形状为(n,1)的行索引(列向量),会以ixs的(n,m)形状广播,给出一个(n,m)的解。

这个基本一样:

X[np.arange(len(ixs)), ixs.T].T

根据 (m,n) 广播 (n,) 索引并转置。

时间基本相同:

In [299]: X = np.ones((1000,2000))
In [300]: ixs = np.random.randint(0,2000,(1000,200))
In [301]: timeit X[np.arange(len(ixs)), ixs.T].T
6.58 ms ± 71.6 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
In [302]: timeit X[np.arange(X.shape[0])[:,None], ixs]
6.57 ms ± 129 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

为了比较:

In [307]: timeit np.array([row[ix] for row, ix in zip(X, ixs)])
6.63 ms ± 229 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

我有点惊讶这个列表理解做得这么好。我想知道当尺寸变化时相对优势如何比较,特别是在Xixs 的相对形状(长、宽等)方面。


第一种解决方案是ix_产生的索引样式:

In [303]: np.ix_(np.arange(3), np.arange(2))
Out[303]: 
(array([[0],
        [1],
        [2]]), array([[0, 1]]))

【讨论】:

    【解决方案4】:

    这应该可以工作

    [X[i][[y]] for i, y in enumerate(ixs)] 
    

    编辑:我刚刚注意到你不想要循环解决方案。

    【讨论】:

    • 这与我原来的解决方案也非常接近(如果我将它包装在对 np.array 的调用中)