【问题标题】:Replace elements in numpy array avoiding loops替换numpy数组中的元素避免循环
【发布时间】:2019-04-08 21:25:28
【问题描述】:

我有一个相当大的 1d numpy 数组 Xold,它具有给定的值。这些值应为 根据二维 numpy 数组 Y 指定的规则替换: 一个例子是

Xold=np.array([0,1,2,3,4])
Y=np.array([[0,0],[1,100],[3,300],[4,400],[2,200]])

只要 Xold 中的值与 Y[:,0] 中的值相同,Xnew 中的新值应该是 Y[:,1] 中的对应值。这是通过两个嵌套的 for 循环来完成的:

Xnew=np.zeros(len(Xold))
for i in range(len(Xold)):
for j in range(len(Y)):
    if Xold[i]==Y[j,0]:
        Xnew[i]=Y[j,1]

对于给定的示例,这会产生Xnew=[0,100,200,300,400]。 但是,对于大型数据集,此过程非常缓慢。有什么更快、更优雅的方式来完成这项任务?

【问题讨论】:

    标签: python numpy for-loop numpy-slicing


    【解决方案1】:

    您可以做的第一个改进是使用 numpy 索引,但您仍然会有 1 个循环:

    for old, new in Y: 
        Xold[Xold == old] = new
    

    【讨论】:

    • 这是不正确的。对于我的情况,它返回 [200 200 200 300 400 400] X = np.array([0,1,2,3,4,4]); Y = np.array([[0,1],[1,2],[3,300],[4,400],[2,200]])
    【解决方案2】:

    这是一种可能性:

    import numpy as np
    
    Xold = np.array([0, 1, 2, 3, 4])
    Y = np.array([[0, 0], [1, 100], [3, 300], [4, 400], [2, 200]])
    # Check every X value against every Y first value
    m = Xold == Y[:, 0, np.newaxis]
    # Check which elements in X are among Y first values
    # (so values that are not in Y are not replaced)
    m_X = np.any(m, axis=0)
    # Compute replacement
    # Xold * (1 - m_X) are the non-replaced values
    # np.sum(Y[:, 1, np.newaxis] * m, axis=0) * m_X are the replaced values
    Xnew = Xold * (1 - m_X) + np.sum(Y[:, 1, np.newaxis] * m, axis=0) * m_X
    print(Xnew)
    

    输出:

    [  0 100 200 300 400]
    

    此方法或多或少适用于每种情况(未排序的数组、X 中值的多次重复、X 中的值未替换、Y 中的值未替换 X 中的任何内容),除非您在Y,无论如何这都是错误的。但是,它的时间和空间复杂度是 X 和 Y 大小的乘积。如果您的问题有其他限制(数据已排序、没有重复等),则可能会做得更好。例如,如果 X 没有重复元素进行排序,并且 Y 中的每个值都替换了 X 中的一个值(就像在您的示例中一样),那么这可能会更快:

    import numpy as np
    
    Xold = np.array([0, 1, 2, 3, 4])
    Y = np.array([[0, 0], [1, 100], [3, 300], [4, 400], [2, 200]])
    idx = np.searchsorted(Xold, Y[:, 0])
    Xnew = Xold.copy()
    Xnew[idx] = Y[:, 1]
    print(Xnew)
    # [  0 100 200 300 400]
    

    【讨论】:

      【解决方案3】:

      Y 的第一列中的数据不一定排序时,我们可以将np.searchsorted 用于一般情况-

      sidx = Y[:,0].argsort()
      out = Y[sidx[np.searchsorted(Y[:,0], Xold, sorter=sidx)],1]
      

      示例运行 -

      In [53]: Xold
      Out[53]: array([14, 10, 12, 13, 11])
      
      In [54]: Y
      Out[54]: 
      array([[ 10,   0],
             [ 11, 100],
             [ 13, 300],
             [ 14, 400],
             [ 12, 200]])
      
      In [55]: sidx = Y[:,0].argsort()
          ...: out = Y[sidx[np.searchsorted(Y[:,0], Xold, sorter=sidx)],1]
      
      In [56]: out
      Out[56]: array([400,   0, 200, 300, 100])
      

      如果不是所有元素都有对应的映射可用,那么我们需要做更多的工作,像这样 -

      sidx = Y[:,0].argsort()
      sorted_indx = np.searchsorted(Y[:,0], Xold, sorter=sidx)
      sorted_indx[sorted_indx==len(sidx)] = len(sidx)-1
      idx_out = sidx[sorted_indx]
      out = Y[idx_out,1]
      out[Y[idx_out,0]!=Xold] = 0 # NA values as 0s
      

      【讨论】:

      • 这很好,但是,假设存在并非所有值都有映射的情况,我不确定是否应该将它们设置为 0/NA/... 还是保持原样在Xold。但我认为这只是意味着用Xold[Y[idx_out,0]!=Xold] 替换最后一个0 在任何情况下都是如此好的解决方案。
      • @jdehesa OP 的输出为 Xnew=np.zeros(len(Xold))。所以,这对我来说是有意义的。
      • 这段代码对我不起作用:In [16]: sidx = Y[:,0].argsort()In [17]: out = Y[sidx[np.searchsorted(Y[:,0], Xold, sorter=sidx)],1]IndexError: index 5 is out of bounds for axis 1 with size 5
      • 正如@MihaiAlexandruIonut 所指出的那样,这是因为我的示例中的 Xold 包含 Y 中缺少的元素。但是,没有初始限制不能出现这种情况。
      • @DanielKislyuk 因此,我的帖子末尾的通用解决方案。
      【解决方案4】:

      您可以将slicing 功能与argsort 方法结合使用。

      Xnew = Y[Y[:,1].argsort()][:, 1][Xold] 
      

      输出

      array([  0, 100, 200, 300, 400])
      

      【讨论】:

      • 这段代码对我不起作用。 Xnew = Y[Y[:,1].argsort()][:, 1][Xold] IndexError: index 100 is out of bounds for axis 1 with size 5
      • @DanielKislyuk,这是因为您的 Xold 数组包含 Y 数组中不存在的索引。
      • 是的。您能否指出指定Y[Y[:,1].argsort()][:, 1][Xold] 替换如何工作的文档?无法掌握。
      【解决方案5】:

      解决方案pd.Series.map()

      如果您愿意使用 Pandas 库,也可以使用 .map() 以矢量化方式执行此操作:

      >>> import pandas as pd
      >>> pd.Series(Xold).map(pd.Series(Y[:, 1], index=Y[:, 0]))                                                                                                                                                                    
      0      0
      1    100
      2    200
      3    300
      4    400
      dtype: int64
      
      >>> pd.Series(Xold).map(pd.Series(Y[:, 1], index=Y[:, 0])).values                                                                                                                                                            
      array([  0, 100, 200, 300, 400])
      

      对于签名a.map(b)ab 的索引中查找其对应条目,并映射到b 中的相应值。

      b 这里是pd.Series(Y[:, 1], index=Y[:, 0]),它使用第 0 列作为索引,第 1 列作为映射到的值。


      直接使用pandas.core.algorithms

      Under the hood,这将使用 .get_indexer() 和 Cython 实现的 take_1d()

      indexer = mapper.index.get_indexer(values)
      new_values = algorithms.take_1d(mapper._values, indexer)
      

      知道,如果数组真的很大,您可以像这样减少一些开销:

      from pandas.core import algorithms
      
      indexer = pd.Index(Y[:, 0]).get_indexer(Xold)  
      mapped = algorithms.take_1d(Y[:, 1], indexer)
      

      【讨论】:

        【解决方案6】:

        numpy_indexed 包(免责声明;我是它的作者)包含一个有效的矢量化函数,可以解决一般问题:

        import numpy_indexed as npi
        Xnew = npi.remap(Xold, keys=Y[:, 0], values=Y[:, 1])
        

        也就是说,这适用于任何 dtype,或者当要替换的键和值本身是 ndarray 时,你会得到一个 kwarg 来指定如何对缺失的元素做出反应。

        不确定它与 pandas 的性能相比如何;但是这个库中的一个设计选择是执行这样的基本操作(或进行分组等)不应该涉及创建一个全新的数据类型,如 Series 或 Table,这总是让我对使用 pandas 这种类型感到困扰的东西。

        【讨论】:

          【解决方案7】:

          您可以使用 y = dict(Y) 将 Y 转换为字典,然后运行以下列表理解

          [y[i] if i in y.keys() else i for i in Xold]
          

          【讨论】:

            【解决方案8】:

            选择最快的方法

            这个问题的答案提供了各种各样的方法来替换 numpy 数组中的元素。让我们检查一下,哪个最快。

            TL;DR: Numpy 索引是赢家

             def meth1(): # suggested by @Slam
                for old, new in Y:  
                    Xold[Xold == old] = new
            
             def meth2(): # suggested by myself, convert y_dict = dict(Y) first
                 [y_dict[i] if i in y_dict.keys() else i for i in Xold]
            
             def meth3(): # suggested by @Eelco Hoogendoom, import numpy_index as npi first
                 npi.remap(Xold, keys=Y[:, 0], values=Y[:, 1])
            
             def meth4(): # suggested by @Brad Solomon, import pandas as pd first 
                 pd.Series(Xold).map(pd.Series(Y[:, 1], index=Y[:, 0])).values
            
              # suggested by @jdehesa. create Xnew = Xold.copy() and index
              # idx = np.searchsorted(Xold, Y[:, 0]) first
              def meth5():             
                 Xnew[idx] = Y[:, 1]
            

            结果并不那么令人惊讶

             In [39]: timeit.timeit(meth1, number=1000000)                                                                      
             Out[39]: 12.08
            
             In [40]: timeit.timeit(meth2, number=1000000)                                                                      
             Out[40]: 2.87
            
             In [38]: timeit.timeit(meth3, number=1000000)                                                                      
             Out[38]: 55.39
            
             In [12]: timeit.timeit(meth4, number=1000000)                                                                                      
             Out[12]: 256.84
            
             In [50]: timeit.timeit(meth5, number=1000000)                                                                                      
             Out[50]: 1.12
            

            所以,好的旧列表理解是第二快的,获胜的方法是 numpy 索引结合searchsorted()

            【讨论】:

            • 你在什么数据集上测试它?
            • Xold=np.array([0,1,2,3,4,4,4,0]) , Y=np.array([[0,0],[1,100], [3,300],[4,400],[2,200]])
            猜你喜欢
            • 2020-02-15
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 2019-01-05
            • 2015-10-02
            相关资源
            最近更新 更多