【问题标题】:Fast algorithm to find indices where multiple arrays have the same value查找多个数组具有相同值的索引的快速算法
【发布时间】:2016-06-23 09:28:37
【问题描述】:

我正在寻找加快(或替换)我的数据分组算法的方法。

我有一个 numpy 数组列表。我想生成一个新的 numpy 数组,这样该数组的每个元素对于原始数组也相同的每个索引都是相同的。 (在不是这种情况的情况下有所不同。)

这听起来有点尴尬,所以举个例子:

# Test values:
values = [
    np.array([10, 11, 10, 11, 10, 11, 10]),
    np.array([21, 21, 22, 22, 21, 22, 23]),
    ]

# Expected outcome: np.array([0, 1, 2, 3, 0, 3, 4])
#                             *           *

请注意,我标记的预期结果的元素(索引 0 和 4)具有相同的值(0),因为原始的两个数组也相同(即1021)。索引为 3 和 5 的元素类似 (3)。

该算法必须处理任意数量(大小相等)的输入数组,并且还为每个结果数返回它们对应的原始数组的值。 (所以对于这个例子,“3”指的是(11, 22)。)

这是我目前的算法:

import numpy as np

def groupify(values):
    group = np.zeros((len(values[0]),), dtype=np.int64) - 1 # Magic number: -1 means ungrouped.
    group_meanings = {}
    next_hash = 0
    matching = np.ones((len(values[0]),), dtype=bool)
    while any(group == -1):
        this_combo = {}

        matching[:] = (group == -1)
        first_ungrouped_idx = np.where(matching)[0][0]

        for curr_id, value_array in enumerate(values):
            needed_value = value_array[first_ungrouped_idx]
            matching[matching] = value_array[matching] == needed_value
            this_combo[curr_id] = needed_value
        # Assign all of the found elements to a new group
        group[matching] = next_hash
        group_meanings[next_hash] = this_combo
        next_hash += 1
    return group, group_meanings

请注意,表达式 value_array[matching] == needed_value 会针对每个单独的索引进行多次评估,这就是缓慢的原因。

我不确定我的算法是否可以进一步加快速度,但我也不确定它是否是开始时的最佳算法。有更好的方法吗?

【问题讨论】:

    标签: python performance numpy


    【解决方案1】:

    终于破解了矢量化解决方案!这是一个有趣的问题。问题是我们必须标记从列表的相应数组元素中获取的每对值。然后,我们应该根据它们在其他对中的唯一性来标记每个这样的对。因此,我们可以使用np.unique 滥用其所有可选参数,最后做一些额外的工作来保持最终输出的顺序。这里的实现基本上分三个阶段完成 -

    # Stack as a 2D array with each pair from values as a column each.
    # Convert to linear index equivalent considering each column as indexing tuple
    arr = np.vstack(values)
    idx = np.ravel_multi_index(arr,arr.max(1)+1)
    
    # Do the heavy work with np.unique to give us :
    #   1. Starting indices of unique elems, 
    #   2. Srray that has unique IDs for each element in idx, and 
    #   3. Group ID counts
    _,unq_start_idx,unqID,count = np.unique(idx,return_index=True, \
                                            return_inverse=True,return_counts=True)
    
    # Best part happens here : Use mask to ignore the repeated elems and re-tag 
    # each unqID using argsort() of masked elements from idx
    mask = ~np.in1d(unqID,np.where(count>1)[0])
    mask[unq_start_idx] = 1
    out = idx[mask].argsort()[unqID]
    

    运行时测试

    让我们将提议的矢量化方法与原始代码进行比较。由于建议的代码只为我们提供组 ID,因此为了公平的基准测试,让我们从原始代码中删除不用于给我们的部分。所以,这里是函数定义 -

    def groupify(values):  # Original code
        group = np.zeros((len(values[0]),), dtype=np.int64) - 1
        next_hash = 0
        matching = np.ones((len(values[0]),), dtype=bool)
        while any(group == -1):
    
            matching[:] = (group == -1)
            first_ungrouped_idx = np.where(matching)[0][0]
    
            for curr_id, value_array in enumerate(values):
                needed_value = value_array[first_ungrouped_idx]
                matching[matching] = value_array[matching] == needed_value
            # Assign all of the found elements to a new group
            group[matching] = next_hash
            next_hash += 1
        return group
    
    def groupify_vectorized(values):  # Proposed code
        arr = np.vstack(values)
        idx = np.ravel_multi_index(arr,arr.max(1)+1)
        _,unq_start_idx,unqID,count = np.unique(idx,return_index=True, \
                                            return_inverse=True,return_counts=True)    
        mask = ~np.in1d(unqID,np.where(count>1)[0])
        mask[unq_start_idx] = 1
        return idx[mask].argsort()[unqID]
    

    具有大型数组的列表上的运行时结果 -

    In [345]: # Input list with random elements
         ...: values = [item for item in np.random.randint(10,40,(10,10000))]
    
    In [346]: np.allclose(groupify(values),groupify_vectorized(values))
    Out[346]: True
    
    In [347]: %timeit groupify(values)
    1 loops, best of 3: 4.02 s per loop
    
    In [348]: %timeit groupify_vectorized(values)
    100 loops, best of 3: 3.74 ms per loop
    

    【讨论】:

    • 你能解释一下这里实际发生了什么吗?特别是:np.ravel_multi_index 做了什么? (文档对我来说不是很清楚。)为什么要在 arr.max(1)+1 上调用它?
    • @acdr 基本上将每一对视为一个索引元组。因此,对于来自样本(10,21) 的第一对在具有适当形状的二维网格上将对应于一个线性索引数字。假设我们采用形状为(100,100) 的网格。因此,线性索引将是10*100+21 = 1021。我们使用ravel_multi_index 一次性对所有配对进行此操作。此外,二维网格的形状可以被认为是(first and second pair elements) + 1 的最大值。希望这是有道理的。我可以建议的最好的方法是研究 MATLAB 中也使用的线性索引。
    • 这是有道理的。在这种情况下,该功能可以满足我的所有需求。 (我不一定需要 ID 从 0 开始并以 1 递增,只要它们是唯一的。)不过,我猜想这个解决方案不适用于非整数数组? (例如,假设我有一个字符串数组,这些值根本不会很好地映射到网格。)
    • @acdr 您可以使用_,values_elems = np.unique(input_string,return_inverse=True) 给我们自己提供数字标签,这将对应于values 中的元素。您可能必须使用 .ravel() 并在两者之间进行整形。
    • 谢谢! vstacking [np.unique(v,return_inverse=True)[1] for v in values] 而不是 values 确实解决了“其他数据类型”问题。我接受了这个答案(尽管也喜欢 Eelco 的),因为它不是一个单独的包,而且 timeit 结果值得赞赏。
    【解决方案2】:

    这应该可以工作,并且应该更快,因为我们正在使用broadcasting 和 numpy 固有的快速布尔比较:

    import numpy as np
    
    # Test values:
    values = [
        np.array([10, 11, 10, 11, 10, 11, 10]),
        np.array([21, 21, 22, 22, 21, 22, 23]),
        ]
    # Expected outcome: np.array([0, 1, 2, 3, 0, 3, 4])
    
    # for every value in values, check where duplicate values occur
    same_mask = [val[:,np.newaxis] == val[np.newaxis,:] for val in values]
    
    # get the conjunction of all those tests
    conjunction = np.logical_and.reduce(same_mask)
    
    # ignore the diagonal
    conjunction[np.diag_indices_from(conjunction)] = False
    
    # initialize the labelled array with nans (used as flag)
    labelled = np.empty(values[0].shape)
    labelled.fill(np.nan)
    
    # keep track of labelled value
    val = 0
    for k, row in enumerate(conjunction):
        if np.isnan(labelled[k]):  # this element has not been labelled yet
            labelled[k] = val      # so label it
            labelled[row] = val    # and label every element satisfying the test
            val += 1
    
    print(labelled)
    # outputs [ 0.  1.  2.  3.  0.  3.  4.]
    

    在处理两个数组时,它比您的版本快大约 1.5 倍,但我怀疑对于更多数组,加速应该会更好。

    【讨论】:

    • 这是一个聪明的解决方案,我喜欢它,但对我来说,这对于大型阵列来说是爆炸性的。 (如果我的每个输入数组的长度为 N,那么 val[:, np.newaxis] 部分会创建一个 N × N 数组。如果我有一百万个元素,并且每个布尔值用完一个字节(我相信它在 numpy 中是这样),那么我最终需要一个 TB 的内存来存储这个数组。:(
    • 啊,是的,你没有提到任何关于内存问题的事情:) 速度和内存效率之间总是存在权衡的。你的版本更保守,我的应该更快(尤其是大数据)。 (注意顺便说一句,导致内存分配的不是val[:,np.newaxis](只是广播操作),而是随后的==比较迫使广播数组实际扩展。不是迂腐,但也许这指向进一步可能的优化。)
    • 实际上,如果内存是一个问题,我怀疑您当前的算法接近最优。也许你可以用np.unique 做点什么?它支持一个选项return_inverse,可能有用。
    【解决方案3】:

    numpy_indexed 包(免责声明:我是它的作者)包含 numpy 数组集操作的通用变体,可用于以优雅高效(矢量化)的方式解决您的问题:

    import numpy_indexed as npi
    unique_values, labels = npi.unique(tuple(values), return_inverse=True)
    

    上述方法适用于任意类型组合,但如果 values 是许多具有相同 dtype 的数组的列表,则以下方法会更有效:

    unique_values, labels = npi.unique(np.asarray(values), axis=1, return_inverse=True)
    

    【讨论】:

      【解决方案4】:

      如果我理解正确,您正在尝试根据列对值进行哈希处理。最好将列自己转换为任意值,然后从中找到哈希值。

      所以你实际上想要散列list(np.array(values).T)

      此功能已内置在 Pandas 中。你不需要写它。唯一的问题是它需要一个值列表,其中没有进一步的列表。在这种情况下,您只需将内部列表转换为 string map(str, list(np.array(values).T)) 并分解它!

      >>> import pandas as pd
      >>> pd.factorize(map(str, list(np.array(values).T)))
      (array([0, 1, 2, 3, 0, 3, 4]),
       array(['[10 21]', '[11 21]', '[10 22]', '[11 22]', '[10 23]'], dtype=object))
      

      我已将您的数组列表转换为数组,然后转换为字符串...

      【讨论】:

      • 据我所知,pandas.factorize 仅适用于一维数组。您将每个“行”转换为字符串,但对于大型数组,这将非常缓慢。
      • 没错。但是,我不确定原始 Python 中是否有任何可以击败 Pandas 的实现。也许我们可以使用 cython 来加速原始 Python 代码。这一切都归结为数组的实际列表将有多大......
      • 取决于与每个数组的大小相比,有多少独特的组合。对于组合很少的非常大的阵列,我的解决方案将击败您的解决方案。 (只需将我的输入数组复制一百万次。您将有一百万次调用 str。)
      猜你喜欢
      • 1970-01-01
      • 2018-10-13
      • 2012-02-29
      • 1970-01-01
      • 1970-01-01
      • 2013-07-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多