【问题标题】:Celery rate-limiting: Is it possible to rate-limit a celery task differently based on a run-time parameter?Celery 速率限制:是否可以根据运行时参数对 celery 任务进行不同的速率限制?
【发布时间】:2019-09-25 09:43:52
【问题描述】:

我想根据运行时确定的某些参数对 Celery 任务进行速率限制。例如:如果参数为 1,则速率限制可能为 100。如果参数为 2,则速率限制可能为 25。此外,我希望能够在运行时修改这些速率限制。

celery 提供了一种方法吗?我可以使用 routing_key 根据参数将任务发送到不同的队列,但 celery 似乎不支持队列级速率限制。

一种可能的解决方案是在排队任务时使用eta,但我想知道是否有更好的方法来实现这一点。

【问题讨论】:

    标签: django redis queue celery message-queue


    【解决方案1】:

    您可以在运行时通过celery_app.control.rate_limit() 访问 Celery 应用实例的应用程序部分更新 rate_limit。

    ./task.py

    from celery import Celery
    
    app = Celery("sample")
    
    app.conf.update(
        broker_url='amqp://guest:guest@localhost:5672',
        task_annotations={
            'task.func1': {
                'rate_limit': '10/s'  # Default is 10 per second
            }
        },
    )
    
    
    @app.task
    def func1(ctr):
        print(f"I have now processed task {ctr}")
    

    ./runner.py

    import task
    
    print(f"Current rate_limit is 10/s")
    
    for ctr in range(7):
        print(f"Enqueue task {ctr}")
        task.func1.delay(ctr)
    
        if ctr == 3:
            choice = input("Let's update the rate limit setting [1/2]: ")
            if choice == "1":
                new_rate_limit = '1/m'
                print(f"Changing rate_limit to {new_rate_limit}")
                task.app.control.rate_limit('task.func1', new_rate_limit)
            elif choice == "2":
                new_rate_limit = '1/h'
                print(f"Changing rate_limit to {new_rate_limit}")
                task.app.control.rate_limit('task.func1', new_rate_limit)
            else:
                print("Retaining default rate_limit")
    
    • 为了简单起见,这里我们有一个原始的 python 可运行脚本,它充当我们 celery 任务的调用者。在现实生活中的应用程序中,这可能是与 celery 或其他任何东西集成的 Django 视图。

    执行任务监听器(消费者):

    $ celery --app=task worker --loglevel=INFO
    

    执行任务调用者(生产者):

    $ python3 runner.py 
    Current rate_limit is 10/s
    Enqueue task 0
    Enqueue task 1
    Enqueue task 2
    Enqueue task 3
    Let's update the rate limit setting [1/2]: 1
    Changing rate_limit to 1/m
    Enqueue task 4
    Enqueue task 5
    Enqueue task 6
    
    • 在这里,我们可以看到前 4 次运行的速率为每秒 10 次。然后使用运行时输入,我们将剩余的 3 次运行更新为每分钟 1 次。

    任务监听器(消费者)的日志:

    [2021-04-30 10:35:44,006: INFO/MainProcess] Received task: task.func1[60600074-16ad-41b1-afbf-7a89da5af2f0]  
    [2021-04-30 10:35:44,007: INFO/MainProcess] Received task: task.func1[e93f9936-4d56-49a7-bb8b-757817235aa2]  
    [2021-04-30 10:35:44,007: WARNING/ForkPoolWorker-2] I have now processed task 0
    [2021-04-30 10:35:44,008: INFO/ForkPoolWorker-2] Task task.func1[60600074-16ad-41b1-afbf-7a89da5af2f0] succeeded in 0.000337354000293999s: None
    [2021-04-30 10:35:44,010: INFO/MainProcess] Received task: task.func1[c0c369c4-dbcf-43db-b79c-49d5866b136f]  
    [2021-04-30 10:35:44,010: INFO/MainProcess] Received task: task.func1[38b32102-7313-4e64-be77-f9565ce04683]  
    [2021-04-30 10:35:44,217: WARNING/ForkPoolWorker-3] I have now processed task 2
    [2021-04-30 10:35:44,218: INFO/ForkPoolWorker-3] Task task.func1[c0c369c4-dbcf-43db-b79c-49d5866b136f] succeeded in 0.0006413599985535257s: None
    [2021-04-30 10:35:44,217: WARNING/ForkPoolWorker-2] I have now processed task 1
    [2021-04-30 10:35:44,219: INFO/ForkPoolWorker-2] Task task.func1[e93f9936-4d56-49a7-bb8b-757817235aa2] succeeded in 0.0021943179999652784s: None
    [2021-04-30 10:35:44,726: WARNING/ForkPoolWorker-2] I have now processed task 3
    [2021-04-30 10:35:44,727: INFO/ForkPoolWorker-2] Task task.func1[38b32102-7313-4e64-be77-f9565ce04683] succeeded in 0.00125738899987482s: None
    [2021-04-30 10:35:44,809: INFO/MainProcess] New rate limit for tasks of type task.func1: 1/m.
    [2021-04-30 10:35:44,810: INFO/MainProcess] Received task: task.func1[1acb9b7e-755e-4773-a3db-0a284c7024bb]  
    [2021-04-30 10:35:44,811: INFO/MainProcess] Received task: task.func1[b861a33a-0856-4044-a498-250c0da48d53]  
    [2021-04-30 10:35:44,811: WARNING/ForkPoolWorker-2] I have now processed task 4
    [2021-04-30 10:35:44,812: INFO/ForkPoolWorker-2] Task task.func1[1acb9b7e-755e-4773-a3db-0a284c7024bb] succeeded in 0.0006612189990846673s: None
    [2021-04-30 10:35:44,812: INFO/MainProcess] Received task: task.func1[e2e79f75-7628-4449-b880-e3a03020da7e]  
    [2021-04-30 10:36:44,892: WARNING/ForkPoolWorker-2] I have now processed task 5
    [2021-04-30 10:36:44,892: INFO/ForkPoolWorker-2] Task task.func1[b861a33a-0856-4044-a498-250c0da48d53] succeeded in 0.00017851099983090535s: None
    [2021-04-30 10:37:44,830: WARNING/ForkPoolWorker-2] I have now processed task 6
    [2021-04-30 10:37:44,831: INFO/ForkPoolWorker-2] Task task.func1[e2e79f75-7628-4449-b880-e3a03020da7e] succeeded in 0.0007846450007491512s: None
    
    • 在这里,您可以看到前 4 个任务(速率为每秒 10 个)都在 10:35:44 处理,而其他 3 个任务(更新速率为每分钟 1 个)在 10分别为:35:44、10:36:44 和 10:37:44。

    参考:https://docs.celeryproject.org/en/latest/userguide/workers.html#changing-rate-limits-at-run-time

    【讨论】:

      【解决方案2】:

      Celery 提供了一个内置的速率限制系统,但它并没有像大多数人期望的速率限制系统那样工作,并且它有几个限制。我根据你提到的 ETA 和 Redis 上的一些 Lua 脚本实现了一个分布式速率限制系统,它运行良好,所以我会推荐这种方法。

      这篇文章详细介绍了一种类似的方法:

      https://callhub.io/distributed-rate-limiting-with-redis-and-celery/

      我使用了一个更简单的版本,我的 lua 脚本是这样的:

      local current_time = tonumber(ARGV[1])
      local eta = tonumber(redis.call('get', KEYS[1]))
      local interval = tonumber(ARGV[2])
      
      if not eta or eta < current_time then
          redis.call('set', KEYS[1], current_time + interval, 'EX', 10800)
          return nil
      else
          redis.call('set', KEYS[1], eta + interval, 'EX', 10800)
          return tostring(eta)
      end
      

      我不得不简单地重写任务apply_async 方法并以我想要的延迟调用该lua 脚本:

      def apply_async(self, *args, **kwargs):
          now = int(time.time())
      
          # From django-redis
          conn = get_redis_connection('default')
      
          cache_key = 'something'
      
          eta = conn.eval(self.rate_limit_script, 1, cache_key, now, rate_limiter.get_delay())
      
          if eta:
              eta = datetime.fromtimestamp(float(eta), tz=timezone.get_current_timezone())
              kwargs['eta'] = eta
          return super().apply_async(*args, **kwargs)
      

      【讨论】:

        猜你喜欢
        • 2023-03-05
        • 2013-10-14
        • 2017-01-11
        • 2020-10-20
        • 1970-01-01
        • 2015-02-20
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多