【问题标题】:Python: Fast mapping and lookup between two listsPython:两个列表之间的快速映射和查找
【发布时间】:2025-12-16 21:40:01
【问题描述】:

我目前正在开发一个高性能 python 2.7 项目,该项目使用大小为一万个元素的列表。显然,每个操作都必须尽可能快地执行。

所以,我有两个列表:一个是 unique 任意数字的列表,我们称之为 A,另一个是线性列表,从 1 开始,长度相同第一个列表,命名为 B,表示 A 中的索引(从 1 开始)

类似枚举,从 1 开始。

例如:

A = [500, 300, 400, 200, 100] # The order here is arbitrary, they can be any integers, but every integer can only exist once
B = [  1,   2,   3,   4,   5] # This is fixed, starting from 1, with exactly as many elements as A

如果我有一个 B 的元素(称为 e_B)并且想要 A 中的相应元素,我可以简单地做correspond_e_A = A[e_B - 1]。没问题。

但是现在我有一个巨大的随机、非唯一整数列表,我想知道 A 中的整数的索引,以及 B 中的相应元素是什么。

我认为我对第一个问题有一个合理的解决方案:

indices_of_existing = numpy.nonzero(numpy.in1d(random_list, A))[0]

这种方法的好处是不需要 map() 单个操作,numpy 的 in1d 只返回一个类似 [True, True, False, True, ...] 的列表。使用 nonzero() 我可以获取 A 中存在的 random_list 中元素的索引。我认为完美。

但是对于第二个问题,我很难过。 我试过类似的东西:

corresponding_e_B = map(lambda x: numpy.where(A==x)[0][0] + 1, random_list))

这正确地给了我索引,但它不是最优的,因为首先我需要一个 map(),其次我需要一个 lambda,最后 numpy.where() 在找到项目后不会停止(记住,A只有独特的元素),这意味着它可以与像我这样的庞大数据集进行可怕的扩展。

我查看了 bisect,但似乎 bisect 仅适用于单个请求,而不适用于列表,这意味着我仍然必须使用 map() 并按元素构建我的列表(这很慢,不是吗? )

由于我对 Python 还很陌生,我希望这里的任何人都可以有一个想法?也许是我还不知道的图书馆?

【问题讨论】:

    标签: python list numpy bisect


    【解决方案1】:

    我认为您最好使用哈希表进行查找,而不是 numpy.in1d,后者使用 O(n log n) 合并排序作为预处理步骤。

    >>> A = [500, 300, 400, 200, 100]
    >>> index = { k:i for i,k in enumerate(A, 1) }
    >>> random_list = [200, 100, 50]
    >>> [i for i,x in enumerate(random_list) if x in index]
    [0, 1]
    >>> map(index.get, random_list)
    [4, 5, None]
    >>> filter(None, map(index.get, random_list))
    [4, 5]
    

    这是 Python 2,在 Python 3 中 mapfilter 返回类似生成器的对象,因此如果您想将结果作为列表获取,则需要在过滤器周围使用 list

    我尝试尽可能多地使用内置函数将计算负担推到 C 端(假设您使用 CPython)。所有名称都是预先解析的,所以应该很快。

    一般来说,为了获得最佳性能,您可能需要考虑使用 PyPy,这是一个带有 JIT 编译的绝佳替代 Python 实现。

    比较多种方法的基准绝不是一个坏主意:

    import sys
    is_pypy = '__pypy__' in sys.builtin_module_names
    
    import timeit
    import random
    if not is_pypy:
      import numpy
    import bisect
    
    n = 10000
    m = 10000
    q = 100
    
    A = set()
    while len(A) < n: A.add(random.randint(0,2*n))
    A = list(A)
    
    queries = set()
    while len(queries) < m: queries.add(random.randint(0,2*n))
    queries = list(queries)
    
    # these two solve question one (find indices of queries that exist in A)
    if not is_pypy:
      def fun11():
        for _ in range(q):
          numpy.nonzero(numpy.in1d(queries, A))[0]
    
    def fun12():
      index = set(A)
      for _ in range(q):
        [i for i,x in enumerate(queries) if x in index]
    
    # these three solve question two (find according entries of B)
    def fun21():
      index = { k:i for i,k in enumerate(A, 1) }
      for _ in range(q):
        [index[i] for i in queries if i in index]
    
    def fun22():
      index = { k:i for i,k in enumerate(A, 1) }
      for _ in range(q):
        list(filter(None, map(index.get, queries)))
    
    def findit(keys, values, key):
      i = bisect.bisect(keys, key)
      if i == len(keys) or keys[i] != key:
        return None
      return values[i]
    
    def fun23():
      keys, values = zip(*sorted((k,i) for i,k in enumerate(A,1)))
      for _ in range(q):
        list(filter(None, [findit(keys, values, x) for x in queries]))
    
    if not is_pypy:
      # note this does not filter out nonexisting elements
      def fun24():
        I = numpy.argsort(A)
        ss = numpy.searchsorted
        maxi = len(I)
        for _ in range(q):   
          a = ss(A, queries, sorter=I)
          I[a[a<maxi]]
    
    tests = ("fun12", "fun21", "fun22", "fun23")
    if not is_pypy: tests = ("fun11",) + tests + ("fun24",)
    
    if is_pypy:
      # warmup
      for f in tests:
        timeit.timeit("%s()" % f, setup = "from __main__ import %s" % f, number=20)
    
    # actual timing
    for f in tests:
      print("%s: %.3f" % (f, timeit.timeit("%s()" % f, setup = "from __main__ import %s" % f, number=3)))
    

    结果:

    $ python2 -V
    Python 2.7.6
    $ python3 -V
    Python 3.3.3
    $ pypy -V
    Python 2.7.3 (87aa9de10f9ca71da9ab4a3d53e0ba176b67d086, Dec 04 2013, 12:50:47)
    [PyPy 2.2.1 with GCC 4.8.2]
    $ python2 test.py
    fun11: 1.016
    fun12: 0.349
    fun21: 0.302
    fun22: 0.276
    fun23: 2.432
    fun24: 0.897
    $ python3 test.py
    fun11: 0.973
    fun12: 0.382
    fun21: 0.423
    fun22: 0.341
    fun23: 3.650
    fun24: 0.894
    $ pypy ~/tmp/test.py
    fun12: 0.087
    fun21: 0.073
    fun22: 0.128
    fun23: 1.131
    

    您可以根据您的场景调整nA 的大小)、mrandom_list 的大小)和q(查询数量)。令我惊讶的是,我尝试变得聪明并使用内置函数而不是列表组合并没有得到回报,因为fun22 并不比fun21 快很多(在 Python 2 中只有 ~10%,在 Python 3 中只有 ~25% ,但在 PyPy 中慢了将近 75%)。这里是一个过早优化的案例。我猜这个差异是因为fun22 在 Python 2 中为每个查询建立了一个不必要的临时列表。我们还看到二进制搜索非常糟糕(请看fun23)。

    【讨论】:

    • 啊,太棒了!像魅力一样工作。而且 10% 的改进要快得多,至少对于我的用例来说是这样。因此,我只想评论您的解决方案不能解决问题 1,但显然您在 :-D 中编辑了解决方案非常感谢!顺便说一句,这是一个很好的答案,包括基准等:-)
    • @JiaYow:你自己解决了问题 1。我最初认为in1d 是用 C 实现的,但它不是(而且它还使用了一种不太理想的算法)所以即使使用普通 Python 也可以做得更好
    • 是的,但我在想/希望这两个问题可以同时得到回答。由于我们已经循环遍历列表以转换条目,因此应该可以在同一个过程中获得 Not-None 索引。 filter() 只获取 Not-None values,而 np.nonzero() 将获取 Not-None indices。我希望这些可以结合起来(也许没有 np。根据我的计时器,np.array() 现在开始成为瓶颈)
    • @JiaYow:当然,您可以一次性完成这两项操作:[(i,index[x]) for i,x in enumerate(random_list) if x in index] 为您提供了 (index in random_list, index in A) 对的列表,这些 random_list 元素也存在于 A 中.
    • @JiaYow:顺便说一下,我将 PyPy 添加到了基准测试中。在所有基准测试中,它比 CPython 2.7 快 2-3 倍,算法解决问题二的速度快 4 倍。显然,它根本不会从使用内置函数中受益,这是意料之中的,因为它 JIT 编译了列表推导。您绝对应该考虑使用它,但不幸的是,到目前为止它仅部分支持 numpy :(
    【解决方案2】:
    def numpy_optimized(index, values):
        I = np.argsort(values)
        Q = np.searchsorted(values, index, sorter=I)
        return I[Q]
    

    这计算与 OP 相同的东西,但索引的顺序与查询的值匹配,我认为这是功能上的改进。它比我机器上的 OP 解决方案快两倍;如果我正确解释您的基准,它会稍微领先于非 pypy 解决方案。

    或者,如果我们不能假设所有索引都存在于值中,并且希望以静默方式删除丢失的查询:

    def numpy_optimized_filtered(index, values):
        I = np.argsort(values)
        Q = np.searchsorted(values, index, sorter=I)
        Z = I[Q]
        return Z[values[Z]==index]
    

    【讨论】:

    • 感谢您的建议。我会将它添加到基准测试中
    • 那么还有没有办法过滤掉不存在的条目?
    • 你是对的;在这方面,这些解决方案并不相同。它在我的代码中工作,其中查询是从 A 中采样的,但如果不是这种情况,则此代码会产生不同的结果。进行了编辑
    • 现在如果index 是一个数组,它就不再起作用了(很高兴第一个解决方案做到了)。
    • 顺便说一下,我在第一个版本中将它添加到了基准测试中。请看一下,这样我在改编它时没有犯错。在整个输入大小范围内,它几乎比 ops 尝试解决问题一的速度快,但它几乎用纯 numpy 解决了问题二,这很酷:)