【问题标题】:What's a good rate limiting algorithm?什么是好的速率限制算法?
【发布时间】:2010-10-14 15:04:59
【问题描述】:

我可以使用一些伪代码,或者更好的 Python。我正在尝试为 Python IRC 机器人实现一个限速队列,它可以部分工作,但是如果有人触发的消息少于限制(例如,速率限制是每 8 秒 5 条消息,而这个人只触发 4 条),并且下一个触发时间超过 8 秒(例如,16 秒后),机器人发送消息,但队列已满,机器人等待 8 秒,尽管由于 8 秒的时间段已经过去,因此不需要它。

【问题讨论】:

    标签: python algorithm message-queue


    【解决方案1】:

    令牌桶的实现相当简单。

    从一个包含 5 个令牌的存储桶开始。

    每 5/8 秒:如果桶中的令牌少于 5 个,则添加一个。

    每次要发送消息时:如果bucket中的token≥1个,取出一个token发送消息。否则,等待/丢弃消息/无论如何。

    (显然,在实际代码中,您将使用整数计数器而不是真实令牌,并且您可以通过存储时间戳来优化每 5/8 秒的步骤)


    再次阅读问题,如果速率限制每8秒完全重置一次,那么这里是一个修改:

    从很久以前的时间戳last_send 开始(例如,在纪元)。另外,从相同的 5 令牌桶开始。

    执行每 5/8 秒规则。

    每次发送消息时:首先检查last_send是否≥8秒前。如果是这样,请填充存储桶(将其设置为 5 个令牌)。其次,如果桶中有令牌,则发送消息(否则,丢弃/等待/等)。第三,将last_send设置为现在。

    这应该适用于那种情况。


    我实际上已经使用这样的策略(第一种方法)编写了一个 IRC 机器人。它在 Perl 中,而不是 Python,但这里有一些代码来说明:

    这里的第一部分处理向存储桶添加令牌。可以看到基于时间(倒数第二行)添加token的优化,然后最后一行将bucket内容钳制到最大值(MESSAGE_BURST)

        my $start_time = time;
        ...
        # Bucket handling
        my $bucket = $conn->{fujiko_limit_bucket};
        my $lasttx = $conn->{fujiko_limit_lasttx};
        $bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL;
        ($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST;
    

    $conn 是一个被传递的数据结构。这是在一个例行运行的方法中(它计算下一次它什么时候有事情要做,并且休眠那么长时间或者直到它获得网络流量)。该方法的下一部分处理发送。这相当复杂,因为消息具有与之相关的优先级。

        # Queue handling. Start with the ultimate queue.
        my $queues = $conn->{fujiko_queues};
        foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) {
                # Ultimate is special. We run ultimate no matter what. Even if
                # it sends the bucket negative.
                --$bucket;
                $entry->{code}(@{$entry->{args}});
        }
        $queues->[PRIORITY_ULTIMATE] = [];
    

    这是第一个队列,无论如何都会运行。即使它使我们的连接因洪水而被杀死。用于非常重要的事情,比如响应服务器的 PING。接下来,剩下的队列:

        # Continue to the other queues, in order of priority.
        QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) {
                my $queue = $queues->[$pri];
                while (scalar(@$queue)) {
                        if ($bucket < 1) {
                                # continue later.
                                $need_more_time = 1;
                                last QRUN;
                        } else {
                                --$bucket;
                                my $entry = shift @$queue;
                                $entry->{code}(@{$entry->{args}});
                        }
                }
        }
    

    最后,bucket 状态被保存回 $conn 数据结构(实际上在该方法中稍晚一点;它首先计算它多久会有更多工作)

        # Save status.
        $conn->{fujiko_limit_bucket} = $bucket;
        $conn->{fujiko_limit_lasttx} = $start_time;
    

    如您所见,实际的桶处理代码非常小——大约四行。其余代码是优先队列处理。该机器人具有优先级队列,因此与它聊天的人无法阻止它执行重要的踢/禁止职责。

    【讨论】:

    • 我错过了什么...看起来这会在您完成前 5 秒后将您限制为每 8 秒发送 1 条消息
    • @chills42:是的,我把问题读错了……看答案的后半部分。
    • @chills:如果 last_send 小于 8 秒,则不会向存储桶添加任何令牌。如果您的存储桶包含令牌,您可以发送消息;否则你不能(你已经在过去 8 秒内发送了 5 条消息)
    • 如果有人对此投反对票,我将不胜感激,请解释原因...我想解决您看到的任何问题,但如果没有反馈就很难做到!
    【解决方案2】:

    只是来自已接受答案的代码的 python 实现。

    import time
    
    class Object(object):
        pass
    
    def get_throttler(rate, per):
        scope = Object()
        scope.allowance = rate
        scope.last_check = time.time()
        def throttler(fn):
            current = time.time()
            time_passed = current - scope.last_check;
            scope.last_check = current;
            scope.allowance = scope.allowance + time_passed * (rate / per)
            if (scope.allowance > rate):
              scope.allowance = rate
            if (scope.allowance < 1):
              pass
            else:
              fn()
              scope.allowance = scope.allowance - 1
        return throttler
    

    【讨论】:

    【解决方案3】:

    我需要 Scala 的变体。这里是:

    case class Limiter[-A, +B](callsPerSecond: (Double, Double), f: A ⇒ B) extends (A ⇒ B) {
    
      import Thread.sleep
      private def now = System.currentTimeMillis / 1000.0
      private val (calls, sec) = callsPerSecond
      private var allowance  = 1.0
      private var last = now
    
      def apply(a: A): B = {
        synchronized {
          val t = now
          val delta_t = t - last
          last = t
          allowance += delta_t * (calls / sec)
          if (allowance > calls)
            allowance = calls
          if (allowance < 1d) {
            sleep(((1 - allowance) * (sec / calls) * 1000d).toLong)
          }
          allowance -= 1
        }
        f(a)
      }
    
    }
    

    它的使用方法如下:

    val f = Limiter((5d, 8d), { 
      _: Unit ⇒ 
        println(System.currentTimeMillis) 
    })
    while(true){f(())}
    

    【讨论】:

      【解决方案4】:

      这里是simplest algorithm,如果您只想在消息到达太快时丢弃它们(而不是排队,这是有道理的,因为队列可能会变得任意大):

      rate = 5.0; // unit: messages
      per  = 8.0; // unit: seconds
      allowance = rate; // unit: messages
      last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds
      
      when (message_received):
        current = now();
        time_passed = current - last_check;
        last_check = current;
        allowance += time_passed * (rate / per);
        if (allowance > rate):
          allowance = rate; // throttle
        if (allowance < 1.0):
          discard_message();
        else:
          forward_message();
          allowance -= 1.0;
      

      在这个解决方案中没有数据结构、计时器等,它工作得很干净 :) 要看到这一点,'allowance' 最多以每秒 5/8 个单位的速度增长,即每八秒最多五个单位。转发的每条消息都会扣除一个单位,因此每 8 秒发送的消息不能超过 5 条。

      注意rate 应该是一个整数,即没有非零小数部分,否则算法将无法正常工作(实际速率不会是rate/per)。例如。 rate=0.5; per=1.0; 不起作用,因为 allowance 永远不会增长到 1.0。但是rate=1.0; per=2.0; 工作正常。

      【讨论】:

      • 另外值得指出的是,'time_passed'的维度和比例必须与'per'相同,例如秒。
      • 嗨 skaffman,感谢您的赞美 --- 我把它从袖子里扔了,但有 99.9% 的可能性有人早些时候想出了类似的解决方案 :)
      • 这是一个标准算法——它是一个令牌桶,没有队列。桶是allowance。存储桶大小为rateallowance += … 行是每 rate ÷ per 秒添加一个令牌的优化。
      • @zwirbeltier 你上面写的不是真的。 'Allowance' 总是以'rate' 为上限(查看“//throttle”行),因此它只允许在任何特定时间(即 5 条)准确地发出“rate”消息。
      • 这很好,但可能会超出速率。假设在时间 0 转发 5 条消息,然后在时间 N * (8/5) 为 N = 1, 2, ...您可以发送另一条消息,从而在 8 秒内发送超过 5 条消息
      【解决方案5】:

      如果有人仍然感兴趣,我将这个简单的可调用类与定时 LRU 键值存储结合使用来限制每个 IP 的请求率。使用双端队列,但可以重写为与列表一起使用。

      from collections import deque
      import time
      
      
      class RateLimiter:
          def __init__(self, maxRate=5, timeUnit=1):
              self.timeUnit = timeUnit
              self.deque = deque(maxlen=maxRate)
      
          def __call__(self):
              if self.deque.maxlen == len(self.deque):
                  cTime = time.time()
                  if cTime - self.deque[0] > self.timeUnit:
                      self.deque.append(cTime)
                      return False
                  else:
                      return True
              self.deque.append(time.time())
              return False
      
      r = RateLimiter()
      for i in range(0,100):
          time.sleep(0.1)
          print(i, "block" if r() else "pass")
      

      【讨论】:

        【解决方案6】:

        阻塞处理直到可以发送消息,从而使更多消息排队,antti的漂亮解决方案也可以这样修改:

        rate = 5.0; // unit: messages
        per  = 8.0; // unit: seconds
        allowance = rate; // unit: messages
        last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds
        
        when (message_received):
          current = now();
          time_passed = current - last_check;
          last_check = current;
          allowance += time_passed * (rate / per);
          if (allowance > rate):
            allowance = rate; // throttle
          if (allowance < 1.0):
            time.sleep( (1-allowance) * (per/rate))
            forward_message();
            allowance = 0.0;
          else:
            forward_message();
            allowance -= 1.0;
        

        它只是等到有足够的余量来发送消息。为了不以两倍的费率开始,津贴也可以用0初始化。

        【讨论】:

        • 当你睡觉时(1-allowance) * (per/rate),你需要将同样的数量添加到last_check
        【解决方案7】:

        这个怎么样:

        long check_time = System.currentTimeMillis();
        int msgs_sent_count = 0;
        
        private boolean isRateLimited(int msgs_per_sec) {
            if (System.currentTimeMillis() - check_time > 1000) {
                check_time = System.currentTimeMillis();
                msgs_sent_count = 0;
            }
        
            if (msgs_sent_count > (msgs_per_sec - 1)) {
                return true;
            } else {
                msgs_sent_count++;
            }
        
            return false;
        }
        

        【讨论】:

          【解决方案8】:

          在你的入队函数之前使用这个装饰器@RateLimited(ratepersec)。

          基本上,这会检查自上次以来是否经过 1/rate secs,如果没有,则等待剩余时间,否则不等待。这有效地限制了您的速率/秒。装饰器可以应用于任何你想要限制速率的函数。

          在您的情况下,如果您希望每 8 秒最多发送 5 条消息,请在您的 sendToQueue 函数之前使用 @RateLimited(0.625)。

          import time
          
          def RateLimited(maxPerSecond):
              minInterval = 1.0 / float(maxPerSecond)
              def decorate(func):
                  lastTimeCalled = [0.0]
                  def rateLimitedFunction(*args,**kargs):
                      elapsed = time.clock() - lastTimeCalled[0]
                      leftToWait = minInterval - elapsed
                      if leftToWait>0:
                          time.sleep(leftToWait)
                      ret = func(*args,**kargs)
                      lastTimeCalled[0] = time.clock()
                      return ret
                  return rateLimitedFunction
              return decorate
          
          @RateLimited(2)  # 2 per second at most
          def PrintNumber(num):
              print num
          
          if __name__ == "__main__":
              print "This should print 1,2,3... at about 2 per second."
              for i in range(1,100):
                  PrintNumber(i)
          

          【讨论】:

          • 我喜欢为此目的使用装饰器的想法。为什么lastTimeCalled 是一个列表?另外,我怀疑当多个线程调用同一个 RateLimited 函数时这会起作用......
          • 这是一个列表,因为像 float 这样的简单类型在被闭包捕获时是常量。通过使它成为一个列表,列表是恒定的,但它的内容不是。是的,它不是线程安全的,但可以用锁轻松修复。
          • time.clock() 在我的系统上没有足够的分辨率,所以我修改了代码并改为使用time.time()
          • 对于速率限制,您绝对不想使用time.clock(),它测量经过的 CPU 时间。 CPU 时间可以比“实际”时间运行得快得多或慢得多。您想改用time.time(),它测量挂墙时间(“实际”时间)。
          • 顺便说一句,用于实际生产系统:使用 sleep() 调用实现速率限制可能不是一个好主意,因为它会阻塞线程并因此阻止其他客户端使用它。
          【解决方案9】:

          一种解决方案是为每个队列项目附加一个时间戳,并在 8 秒后丢弃该项目。您可以在每次添加队列时执行此检查。

          这仅在您将队列大小限制为 5 并在队列已满时丢弃任何添加时才有效。

          【讨论】:

            【解决方案10】:

            保留最后五行发送的时间。保留排队的消息,直到第五个最近的消息(如果存在)过去至少 8 秒(last_five 作为时间数组):

            now = time.time()
            if len(last_five) == 0 or (now - last_five[-1]) >= 8.0:
                last_five.insert(0, now)
                send_message(msg)
            if len(last_five) > 5:
                last_five.pop()
            

            【讨论】:

            • 不是因为你修改过我不是。
            • 您正在存储五个时间戳并在内存中反复移动它们(或执行链表操作)。我正在存储一个整数计数器和一个时间戳。而且只做算术和赋值。
            • 除了如果我尝试发送 5 行会更好,但在此时间段内只允许再发送 3 行。您的将允许发送前三个,并在发送 4 和 5 之前强制等待 8 秒。我的将允许在最近的第四行和第五行之后 8 秒发送 4 和 5。
            • 但是在这个问题上,性能可以通过使用长度为 5 的循环链表来提高,指向第五个最近的发送,在新发送时覆盖它,并将指针向前移动一个。
            • 对于具有速率限制器速度的 irc 机器人来说不是问题。我更喜欢列表解决方案,因为它更具可读性。由于修订,给出的桶答案令人困惑,但它也没有错。
            猜你喜欢
            • 1970-01-01
            • 2010-11-29
            • 1970-01-01
            • 2022-01-11
            • 1970-01-01
            • 2014-05-07
            • 1970-01-01
            • 2011-01-20
            相关资源
            最近更新 更多