【问题标题】:How to implement rate limiting using Redis如何使用 Redis 实现速率限制
【发布时间】:2012-11-01 10:22:49
【问题描述】:

我使用INCREXPIRE来实现速率限制,例如每分钟5个请求:

if EXISTS counter
    count = INCR counter
else
    EXPIRE counter 60
    count = INCR counter

if count > 5
    print "Exceeded the limit"    

但是,在第一分钟的最后一秒可以发送 5 个请求,在第二分钟的第一秒可以再发送 5 个请求,即两秒内可以发送 10 个请求。

如何避免这个问题?


更新:我想出了这个列表实现。这是一个好方法吗?

times = LLEN counter
if times < 5
    LPUSH counter now()
else
    time = LINDEX counter -1
    if now() - time < 60
        print "Exceeded the limit"
    else
        LPUSH counter now()
LTRIM counter 5

【问题讨论】:

  • 是的,这是一个有效且良好的解决方案。甚至比使用集合更好;)
  • Redis LUA 脚本不支持您的解决方案 now() 对吗?所以你想通过 now() 作为参数,那么不同的机器会有不同的毫秒粒度 rit ..?所以 now() - 时间不准确?
  • 第二个例子,我想在大约 120 秒后过期 counter 是有意义的,特别是如果你有很多 counter 键。
  • 前五个请求是突发的,它们之间没有 (now() - time &lt; 60) 分钟间隔...

标签: redis rate-limiting


【解决方案1】:

您可以从“最后一分钟的 5 个请求”切换到“第 x 分钟的 5 个请求”。这样就可以做到:

counter = current_time # for example 15:03
count = INCR counter
EXPIRE counter 60 # just to make sure redis doesn't store it forever

if count > 5
  print "Exceeded the limit"

如果你想继续使用“最后一分钟的 5 个请求”,那么你可以这样做

counter = Time.now.to_i # this is Ruby and it returns the number of milliseconds since 1/1/1970
key = "counter:" + counter
INCR key
EXPIRE key 60

number_of_requests = KEYS "counter"*"
if number_of_requests > 5
  print "Exceeded the limit"

如果您有生产限制(尤其是性能),则使用not advised 关键字是KEYS。我们可以使用 sets 代替:

counter = Time.now.to_i # this is Ruby and it returns the number of milliseconds since 1/1/1970
set = "my_set"
SADD set counter 1

members = SMEMBERS set

# remove all set members which are older than 1 minute
members {|member| SREM member if member[key] < (Time.now.to_i - 60000) }

if (SMEMBERS set).size > 5
  print "Exceeded the limit"

这都是伪 Ruby 代码,但应该会给你一些想法。

【讨论】:

  • 你是对的。但是我们对任何与生产相关的限制一无所知。不过,我编辑了我的答案,改为使用 Redis 集
  • 第三个想法太棒了!它对我有很大帮助。非常感谢您的绝妙主意。还有一条评论:如果没有添加新元素,该集合应该在 1 分钟后过期。这将有助于节省存储空间。类似于:EXPIRE set 60 紧跟在SADD set counter 1 之后
  • 那就更好了?
【解决方案2】:

进行速率限制的规范方法是通过Leaky bucket algorithm。使用计数器的缺点是,用户可以在计数器重置后立即执行一堆请求,即在下一分钟的第一秒内执行 5 次操作。漏桶算法解决了这个问题。简而言之,您可以使用有序集合来存储您的“漏桶”,使用操作时间戳作为填充它的键。

查看这篇文章了解具体实现: Better Rate Limiting With Redis Sorted Sets

更新:

还有另外一种算法,相比漏桶有一些优势。它被称为 Generic Cell Rate Algorithm 。这是它在更高级别的工作方式,如Rate Limiting, Cells, and GCRA 中所述:

GCRA 的工作原理是通过称为“理论到达时间”(TAT) 的时间跟踪剩余限制,该时间通过将表示其成本的持续时间添加到当前时间来在第一个请求上播种。成本计算为我们的“排放间隔”(T)的乘数,它是从我们希望桶重新装满的速率得出的。当任何后续请求进来时,我们取现有的 TAT,从中减去一个表示限制的总突发容量的固定缓冲区 (τ + T),并将结果与​​当前时间进行比较。此结果表示下一次允许请求。如果是过去,我们允许传入的请求,如果是在未来,我们不允许。请求成功后,通过添加 T 来计算新的 TAT。

在 GitHub 上有一个实现该算法的 redis 模块:https://github.com/brandur/redis-cell

【讨论】:

    【解决方案3】:

    这是一个已经回答的老问题,但这是我从这里获得一些灵感的实现。我在 Node.js 中使用 ioredis

    这是滚动窗口时间限制器的所有异步但无竞争条件(我希望)的荣耀:

    var Ioredis = require('ioredis');
    var redis = new Ioredis();
    
    // Rolling window rate limiter
    //
    // key is a unique identifier for the process or function call being limited
    // exp is the expiry in milliseconds
    // maxnum is the number of function calls allowed before expiry
    var redis_limiter_rolling = function(key, maxnum, exp, next) {
      redis.multi([
        ['incr', 'limiter:num:' + key],
        ['time']
      ]).exec(function(err, results) {
        if (err) {
          next(err);
        } else {
          // unique incremented list number for this key
          var listnum = results[0][1];
          // current time
          var tcur = (parseInt(results[1][1][0], 10) * 1000) + Math.floor(parseInt(results[1][1][1], 10) / 1000);
          // absolute time of expiry
          var texpiry = tcur - exp;
          // get number of transacation in the last expiry time
          var listkey = 'limiter:list:' + key;
          redis.multi([
            ['zadd', listkey, tcur.toString(), listnum],
            ['zremrangebyscore', listkey, '-inf', texpiry.toString()],
            ['zcard', listkey]
          ]).exec(function(err, results) {
            if (err) {
              next(err);
            } else {
              // num is the number of calls in the last expiry time window
              var num = parseInt(results[2][1], 10);
              if (num <= maxnum) {
                // does not reach limit
                next(null, false, num, exp);
              } else {
                // limit surpassed
                next(null, true, num, exp);
              }
            }
          });
        }
      });
    };
    

    这是一种锁定式限速器:

    // Lockout window rate limiter
    //
    // key is a unique identifier for the process or function call being limited
    // exp is the expiry in milliseconds
    // maxnum is the number of function calls allowed within expiry time
    var util_limiter_lockout = function(key, maxnum, exp, next) {
      // lockout rate limiter
      var idkey = 'limiter:lock:' + key;
      redis.incr(idkey, function(err, result) {
        if (err) {
          next(err);
        } else {
          if (result <= maxnum) {
            // still within number of allowable calls
            // - reset expiry and allow next function call
            redis.expire(idkey, exp, function(err) {
              if (err) {
                next(err);
              } else {
                next(null, false, result);
              }
            });
          } else {
            // too many calls, user must wait for expiry of idkey
            next(null, true, result);
          }
        }
      });
    };
    

    Here's a gist of the functions。如果您发现任何问题,请告诉我。

    【讨论】:

      【解决方案4】:

      注意:以下代码是 Java 中的示例实现。

      private final String COUNT = "count";
      
      @Autowired
      private StringRedisTemplate stringRedisTemplate;
      private HashOperations hashOperations;
      
      @PostConstruct
      private void init() {
          hashOperations = stringRedisTemplate.opsForHash();
      }
      
      @Override
      public boolean isRequestAllowed(String key, long limit, long timeout, TimeUnit timeUnit) {
          Boolean hasKey = stringRedisTemplate.hasKey(key);
          if (hasKey) {
              Long value = hashOperations.increment(key, COUNT, -1l);
              return value > 0;
          } else {
              hashOperations.put(key, COUNT, String.valueOf(limit));
              stringRedisTemplate.expire(key, timeout, timeUnit);
          }
          return true;
      }
      

      【讨论】:

        【解决方案5】:

        这是我的leaky bucket 速率限制实现,使用 Redis Lists

        注意:以下代码是php中的示例实现,您可以用自己的语言实现。

        $list = $redis->lRange($key, 0, -1); // get whole list
        $noOfRequests = count($list);
        if ($noOfRequests > 5) {
            $expired = 0;
            foreach ($list as $timestamp) {
                if ((time() - $timestamp) > 60) { // Time difference more than 1 min == expired
                    $expired++;
                }
            }
            if ($expired > 0) {
                $redis->lTrim($key, $expired, -1); // Remove expired requests
                if (($noOfRequests - $expired) > 5) { // If still no of requests greater than 5, means fresh limit exceeded.
                    die("Request limit exceeded");
                }
            } else { // No expired == all fresh.
                die("Request limit exceeded");
            }
        }
        $redis->rPush($key, time()); // Add this request as a genuine one to the list, and proceed.
        

        【讨论】:

          【解决方案6】:

          您的更新是一个非常好的算法,尽管我做了一些更改:

          times = LLEN counter
          if times < 5
              LPUSH counter now()
          else
              time = LINDEX counter -1
              if now() - time <= 60
                  print "Exceeded the limit"
              else
                  LPUSH counter now()
                  RPOP counter
          

          【讨论】:

          • 你为什么要做出这样的改变?你的理由是什么?
          【解决方案7】:

          与其他 Java 答案类似,但往返 Redis 的次数更少:

              @Autowired
              private StringRedisTemplate stringRedisTemplate;
              private HashOperations hashOperations;
          
              @PostConstruct
              private void init() {
                  hashOperations = stringRedisTemplate.opsForHash();
              }
          
              @Override
              public boolean isRequestAllowed(String key, long limit, long timeout, TimeUnit timeUnit) {
                  Long value = hashOperations.increment(key, COUNT, 1l);
                  if (value == 1) {
                      stringRedisTemplate.expire(key, timeout, timeUnit);
                  }
                  return value > limit;
              }
          

          【讨论】:

            【解决方案8】:

            这是另一种方法。如果目标是将请求数限制为每 Y 秒的 X 个请求,并且计时器在收到第一个请求时开始计时,那么您可以为要跟踪的每个用户创建 2 个键:一个用于第一个请求的时间已收到,另一个用于请求的数量。

            key = "123"
            key_count = "ct:#{key}"
            key_timestamp = "ts:#{key}"
            
            if (not redis[key_timestamp].nil?) && (not redis[key_count].nil?) && (redis[key_count].to_i > 3)
                puts "limit reached"
            else
                if redis[key_timestamp].nil?
                    redis.multi do
                        redis.set(key_count, 1)
                        redis.set(key_timestamp, 1)
                        redis.expire(key_timestamp,30)
                    end
                else
                    redis.incr(key_count)
                end
                puts redis[key_count].to_s + " : " + redis[key_timestamp].to_s + " : " + redis.ttl(key_timestamp).to_s
            end
            

            【讨论】:

            • 此算法中的密钥计数是否会减少?好像没有
            【解决方案9】:

            这足够小,您可以不对其进行散列处理。

            local f,k,a,b f=redis.call k=KEYS[1] a=f('incrby',k,ARGV[1]) b=f('pttl',k) if b<0 then f('pexpire',k,ARGV[2]) end return a
            

            参数为:

            KEYS[1] = 键名,例如可以是速率限制的操作
            ARGV[1] = 增量数量,通常为 1,但您可以在客户端上每 10 或 100 毫秒间隔进行批处理
            ARGV[2] = 窗口,以毫秒为单位,以毫秒为单位,在
            Returns:新的递增值,然后可以与代码中的值进行比较,看看它是否超过了速率限制。

            该方法不会将 ttl 设置回基值,它会继续向下滑动,直到密钥过期,此时它将在下次调用时以 ARGV[2] ttl 重新开始。

            【讨论】:

              【解决方案10】:

              上一个间隔/滑动窗口中的请求

              interval == 接受请求数(吞吐量)的时间量
              吞吐量 == 每个间隔的请求数
              RequestTimeList == 添加到此列表的每个请求时间

              // Remove older request entries
              while (!RequestTimeList.isEmpty() && (now() - RequestTimeList.get(0)) > interval) {
              
                  RequestTimeList.remove(0)
              }
              
              if (RequestTimeList.length < throughput) {
              
                  RequestTimeList.add(now())
              
              } else {
              
                  throw err;
              }
              

              间隔/固定窗口中的请求

              我尝试过使用 LIST、EXPIRE 和 PTTL

              如果 tps 是每秒 5 次,那么
              吞吐量 = 5
              斜坡上升 = 1000(1000 毫秒 = 1 秒)
              间隔 = 200 毫秒

              local tpsKey = KEYS[1]
              local throughput = tonumber(ARGV[1])
              local rampUp = tonumber(ARGV[2])
              -- Minimum interval to accept the next request.
              local interval = rampUp / throughput
              local currentTime = redis.call('PTTL', tpsKey)
              
              --  -2 if the key does not exist, so set an year expiry
              if currentTime == -2 then
                  currentTime = 31536000000 - interval
                  redis.call('SET', tpsKey, 31536000000, "PX", currentTime)
              end
              
              local previousTime = redis.call('GET', tpsKey)
              
              if (previousTime - currentTime) >=  interval then
                     redis.call('SET', tpsKey, currentTime, "PX", currentTime)
                     return true
              else
                     redis.call('ECHO',"0. ERR - MAX PERMIT REACHED IN THIS INTERVAL")
                     return false
              end
              

              List 的另一种方式

              local counter = KEYS[1]
              local throughput = tonumber(ARGV[1]) 
              local rampUp = tonumber(ARGV[2])
              local interval = rampUp / throughput
              local times = redis.call('LLEN', counter)
              
              if times == 0 then
                  redis.call('LPUSH', counter, rampUp)
                  redis.call('PEXPIRE', counter, rampUp)
                  return true
              elseif times < throughput then
                  local lastElemTTL = tonumber(redis.call('LINDEX', counter, 0))
                  local currentTTL = redis.call('PTTL', counter)
                  if  (lastElemTTL-currentTTL) < interval then
                      return false
                  else
                      redis.call('LPUSH', counter, currentTTL)
                      return true
                   end
              else
                  return false
              end
              

              【讨论】:

                猜你喜欢
                • 2018-05-04
                • 1970-01-01
                • 1970-01-01
                • 2021-05-16
                • 2020-10-04
                • 2010-09-12
                • 1970-01-01
                相关资源
                最近更新 更多