【问题标题】:find length of sequences of identical values in a numpy array (run length encoding)在 numpy 数组中查找相同值序列的长度(运行长度编码)
【发布时间】:2010-11-07 04:41:14
【问题描述】:

在一个 pylab 程序(也可能是一个 matlab 程序)中,我有一个代表距离的 numpy 数组:d[t] 是时间 t距离(以及时间跨度)我的数据是len(d) 时间单位)。

我感兴趣的事件是距离低于某个阈值时,我想计算这些事件的持续时间。使用b = d<threshold 很容易得到一个布尔数组,而问题归结为计算b 中的True-only 单词的长度序列。但我不知道如何有效地做到这一点(即使用 numpy 原语),我求助于遍历数组并进行手动更改检测(即当值从 False 变为 True 时初始化计数器,只要值为 True 就增加计数器,并在值返回 False 时将计数器输出到序列)。但这非常慢。

如何有效地检测 numpy 数组中的那种序列?

下面是一些说明我的问题的python代码:第四个点需要很长时间才能出现(如果没有,请增加数组的大小)

from pylab import *

threshold = 7

print '.'
d = 10*rand(10000000)

print '.'

b = d<threshold

print '.'

durations=[]
for i in xrange(len(b)):
    if b[i] and (i==0 or not b[i-1]):
        counter=1
    if  i>0 and b[i-1] and b[i]:
        counter+=1
    if (b[i-1] and not b[i]) or i==len(b)-1:
        durations.append(counter)

print '.'

【问题讨论】:

    标签: python matlab numpy matplotlib


    【解决方案1】:

    也许迟到了,但基于 Numba 的方法将是迄今为止最快的。

    import numpy as np
    import numba as nb
    
    
    @nb.njit
    def count_steps_nb(arr):
        result = 1
        last_x = arr[0]
        for x in arr[1:]:
            if last_x != x:
                result += 1
                last_x = x
        return result
    
    
    @nb.njit
    def rle_nb(arr):
        n = count_steps_nb(arr)
        values = np.empty(n, dtype=arr.dtype)
        lengths = np.empty(n, dtype=np.int_)
        last_x = arr[0]
        length = 1
        i = 0
        for x in arr[1:]:
            if last_x != x:
                values[i] = last_x
                lengths[i] = length
                i += 1
                length = 1
                last_x = x
            else:
                length += 1
        values[i] = last_x
        lengths[i] = length
        return values, lengths
    

    基于 Numpy 的方法(受 @ThomasBrowne answer 启发,但速度更快,因为使用昂贵的 numpy.concatenate() 被减少到最低限度)是亚军(这里介绍了两种类似的方法,一种使用不等式来查找步骤的位置,另一个使用差异):

    import numpy as np
    
    
    def rle_np_neq(arr):
        n = len(arr)
        if n == 0:
            values = np.empty(0, dtype=arr.dtype)
            lengths = np.empty(0, dtype=np.int_)
        else:
            positions = np.concatenate(
                [[-1], np.nonzero(arr[1:] != arr[:-1])[0], [n - 1]])
            lengths = positions[1:] - positions[:-1]
            values = arr[positions[1:]]
        return values, lengths
    
    import numpy as np
    
    
    def rle_np_diff(arr):
        n = len(arr)
        if n == 0:
            values = np.empty(0, dtype=arr.dtype)
            lengths = np.empty(0, dtype=np.int_)
        else:
            positions = np.concatenate(
                [[-1], np.nonzero(arr[1:] - arr[:-1])[0], [n - 1]])
            lengths = positions[1:] - positions[:-1]
            values = arr[positions[1:]]
            return values, lengths
    

    这些都优于简单简单的单循环方法:

    import numpy as np
    
    
    def rle_loop(arr):
        values = []
        lengths = []
        last_x = arr[0]
        length = 1
        for x in arr[1:]:
            if last_x != x:
                values.append(last_x)
                lengths.append(length)
                length = 1
                last_x = x
            else:
                length += 1
        values.append(last_x)
        lengths.append(length)
        return np.array(values), np.array(lengths)
    

    相反,使用itertools.groupby() 不会比简单循环快(除非在非常特殊的情况下,例如@AlexMartelli answer 或者有人会在组对象上实现__len__),因为一般来说除了循环遍历组本身之外,没有简单的方法来提取组大小信息,这并不是很快:

    import itertools
    import numpy as np
    
    
    def rle_groupby(arr):
        values = []
        lengths = []
        for x, group in itertools.groupby(arr):
            values.append(x)
            lengths.append(sum(1 for _ in group))
        return np.array(values), np.array(lengths)
    

    报告了一些针对不同大小的随机整数数组的基准测试结果:

    (完整分析here)。

    【讨论】:

      【解决方案2】:

      适用于任何数组的完全 numpy 向量化和通用 RLE(也适用于字符串、布尔值等)。

      输出运行长度、起始位置和值的元组。

      import numpy as np
      
      def rle(inarray):
              """ run length encoding. Partial credit to R rle function. 
                  Multi datatype arrays catered for including non Numpy
                  returns: tuple (runlengths, startpositions, values) """
              ia = np.asarray(inarray)                # force numpy
              n = len(ia)
              if n == 0: 
                  return (None, None, None)
              else:
                  y = ia[1:] != ia[:-1]               # pairwise unequal (string safe)
                  i = np.append(np.where(y), n - 1)   # must include last element posi
                  z = np.diff(np.append(-1, i))       # run lengths
                  p = np.cumsum(np.append(0, z))[:-1] # positions
                  return(z, p, ia[i])
      

      相当快(i7):

      xx = np.random.randint(0, 5, 1000000)
      %timeit yy = rle(xx)
      100 loops, best of 3: 18.6 ms per loop
      

      多种数据类型:

      rle([True, True, True, False, True, False, False])
      Out[8]: 
      (array([3, 1, 1, 2]),
       array([0, 3, 4, 5]),
       array([ True, False,  True, False], dtype=bool))
      
      rle(np.array([5, 4, 4, 4, 4, 0, 0]))
      Out[9]: (array([1, 4, 2]), array([0, 1, 5]), array([5, 4, 0]))
      
      rle(["hello", "hello", "my", "friend", "okay", "okay", "bye"])
      Out[10]: 
      (array([2, 1, 1, 2, 1]),
       array([0, 2, 3, 4, 6]),
       array(['hello', 'my', 'friend', 'okay', 'bye'], 
             dtype='|S6'))
      

      与上述 Alex Martelli 的结果相同:

      xx = np.random.randint(0, 2, 20)
      
      xx
      Out[60]: array([1, 1, 1, 1, 0, 0, 1, 1, 1, 1, 1, 0, 1, 1, 0, 1, 1, 1, 1, 1])
      
      am = runs_of_ones_array(xx)
      
      tb = rle(xx)
      
      am
      Out[63]: array([4, 5, 2, 5])
      
      tb[0][tb[2] == 1]
      Out[64]: array([4, 5, 2, 5])
      
      %timeit runs_of_ones_array(xx)
      10000 loops, best of 3: 28.5 µs per loop
      
      %timeit rle(xx)
      10000 loops, best of 3: 38.2 µs per loop
      

      比 Alex 稍慢(但仍然非常快),而且更灵活。

      【讨论】:

        【解决方案3】:

        这是一个仅使用数组的解决方案:它采用一个包含一系列布尔值的数组并计算转换的长度。

        >>> from numpy import array, arange
        >>> b = array([0,0,0,1,1,1,0,0,0,1,1,1,1,0,0], dtype=bool)
        >>> sw = (b[:-1] ^ b[1:]); print sw
        [False False  True False False  True False False  True False False False
          True False]
        >>> isw = arange(len(sw))[sw]; print isw
        [ 2  5  8 12]
        >>> lens = isw[1::2] - isw[::2]; print lens
        [3 4]
        

        sw 在有开关的地方包含一个 true,isw 将它们转换为索引。然后在lens 中成对减去isw 的项。

        请注意,如果序列以 1 开头,它将计算 0 序列的长度:这可以在索引中固定以计算镜头。另外,我还没有测试过长度为 1 的极端情况。


        返回所有True-子数组的起始位置和长度的完整函数。

        import numpy as np
        
        def count_adjacent_true(arr):
            assert len(arr.shape) == 1
            assert arr.dtype == np.bool
            if arr.size == 0:
                return np.empty(0, dtype=int), np.empty(0, dtype=int)
            sw = np.insert(arr[1:] ^ arr[:-1], [0, arr.shape[0]-1], values=True)
            swi = np.arange(sw.shape[0])[sw]
            offset = 0 if arr[0] else 1
            lengths = swi[offset+1::2] - swi[offset:-1:2]
            return swi[offset:-1:2], lengths
        

        针对不同的布尔一维数组进行了测试(空数组;单个/多个元素;偶数/奇数长度;以 True/False 开头;仅包含 True/False 元素)。

        【讨论】:

          【解决方案4】:

          虽然不是numpy 原语,但itertools 函数通常非常快,所以请尝试一下(当然,还要测量包括这个在内的各种解决方案的时间):

          def runs_of_ones(bits):
            for bit, group in itertools.groupby(bits):
              if bit: yield sum(group)
          

          如果你确实需要列表中的值,当然可以使用 list(runs_of_ones(bits));但也许列表理解可能会稍微快一些:

          def runs_of_ones_list(bits):
            return [sum(g) for b, g in itertools.groupby(bits) if b]
          

          转向“numpy-native”的可能性,怎么样:

          def runs_of_ones_array(bits):
            # make sure all runs of ones are well-bounded
            bounded = numpy.hstack(([0], bits, [0]))
            # get 1 at run starts and -1 at run ends
            difs = numpy.diff(bounded)
            run_starts, = numpy.where(difs > 0)
            run_ends, = numpy.where(difs < 0)
            return run_ends - run_starts
          

          再次重申:请务必在实际示例中对解决方案进行基准测试!

          【讨论】:

          • 非常感谢! diff/where 解决方案正是我想到的(更不用说它比其他解决方案快 10 倍)。如果您愿意,可以称其为“不太聪明”,但我希望我足够聪明地想出它:-)
          • @gnovice,我不做 matlab(很有趣,我的女儿,现在是高级无线电工程的博士生,做;-),但现在看着你的答案,我确实看到了类比 -得到运行结束减去运行开始,通过在差异中定位 0 点来获得那些,并用零填充位以确保所有运行都结束。估计没有那么多方法可以解决这个“运行长度”问题!-)
          • @Gyom,不客气——正如@gnovice 所暗示的那样,matlab 解决方案也很相似,或者我想如果有人知道 matlab 就会是这样——所以肯定两者都不是聪明;-)...这更像是一个以前必须进行运行长度编码的问题(我编辑的大部分时间都是关于从数字翻译,这是我仍然本能地倾向于转向的东西,很多-更好的 numpy —— 但我真正第一次学习这些东西的地方是 30 年前的 APL,那时我还是一名硬件设计师......!-)。
          【解决方案5】:

          以防万一有人好奇(因为您顺便提到了 MATLAB),这是在 MATLAB 中解决它的一种方法:

          threshold = 7;
          d = 10*rand(1,100000);  % Sample data
          b = diff([false (d < threshold) false]);
          durations = find(b == -1)-find(b == 1);
          

          我对 Python 不太熟悉,但也许这可以帮助给你一些想法。 =)

          【讨论】:

          • diff() 也存在于 numpy 中,所以这或多或少是您想要的,尽管将 find(foo) 替换为 where(foo)[0]。
          【解决方案6】:
          durations = []
          counter   = 0
          
          for bool in b:
              if bool:
                  counter += 1
              elif counter > 0:
                  durations.append(counter)
                  counter = 0
          
          if counter > 0:
              durations.append(counter)
          

          【讨论】:

          • 当然,这更简洁,但同样低效;我想要做的是将循环向下移动到 C 层,通过使用一些巧妙的 numpy 调用组合......
          • 检查我编辑的答案,我现在提供一种这样的“聪明的组合”(虽然总是努力不要太聪明;-) - 但是,请测量那个和 itertools 的速度。基于 groupby 的解决方案,并让我们知道哪个更快(以及多少)在适合您的示例中!
          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2015-07-28
          • 1970-01-01
          • 2013-02-01
          • 2012-07-11
          相关资源
          最近更新 更多