您可以在运行时通过celery_app.control.rate_limit() 访问 Celery 应用实例的应用程序部分更新 rate_limit。
./task.py
from celery import Celery
app = Celery("sample")
app.conf.update(
broker_url='amqp://guest:guest@localhost:5672',
task_annotations={
'task.func1': {
'rate_limit': '10/s' # Default is 10 per second
}
},
)
@app.task
def func1(ctr):
print(f"I have now processed task {ctr}")
./runner.py
import task
print(f"Current rate_limit is 10/s")
for ctr in range(7):
print(f"Enqueue task {ctr}")
task.func1.delay(ctr)
if ctr == 3:
choice = input("Let's update the rate limit setting [1/2]: ")
if choice == "1":
new_rate_limit = '1/m'
print(f"Changing rate_limit to {new_rate_limit}")
task.app.control.rate_limit('task.func1', new_rate_limit)
elif choice == "2":
new_rate_limit = '1/h'
print(f"Changing rate_limit to {new_rate_limit}")
task.app.control.rate_limit('task.func1', new_rate_limit)
else:
print("Retaining default rate_limit")
- 为了简单起见,这里我们有一个原始的 python 可运行脚本,它充当我们 celery 任务的调用者。在现实生活中的应用程序中,这可能是与 celery 或其他任何东西集成的 Django 视图。
执行任务监听器(消费者):
$ celery --app=task worker --loglevel=INFO
执行任务调用者(生产者):
$ python3 runner.py
Current rate_limit is 10/s
Enqueue task 0
Enqueue task 1
Enqueue task 2
Enqueue task 3
Let's update the rate limit setting [1/2]: 1
Changing rate_limit to 1/m
Enqueue task 4
Enqueue task 5
Enqueue task 6
- 在这里,我们可以看到前 4 次运行的速率为每秒 10 次。然后使用运行时输入,我们将剩余的 3 次运行更新为每分钟 1 次。
任务监听器(消费者)的日志:
[2021-04-30 10:35:44,006: INFO/MainProcess] Received task: task.func1[60600074-16ad-41b1-afbf-7a89da5af2f0]
[2021-04-30 10:35:44,007: INFO/MainProcess] Received task: task.func1[e93f9936-4d56-49a7-bb8b-757817235aa2]
[2021-04-30 10:35:44,007: WARNING/ForkPoolWorker-2] I have now processed task 0
[2021-04-30 10:35:44,008: INFO/ForkPoolWorker-2] Task task.func1[60600074-16ad-41b1-afbf-7a89da5af2f0] succeeded in 0.000337354000293999s: None
[2021-04-30 10:35:44,010: INFO/MainProcess] Received task: task.func1[c0c369c4-dbcf-43db-b79c-49d5866b136f]
[2021-04-30 10:35:44,010: INFO/MainProcess] Received task: task.func1[38b32102-7313-4e64-be77-f9565ce04683]
[2021-04-30 10:35:44,217: WARNING/ForkPoolWorker-3] I have now processed task 2
[2021-04-30 10:35:44,218: INFO/ForkPoolWorker-3] Task task.func1[c0c369c4-dbcf-43db-b79c-49d5866b136f] succeeded in 0.0006413599985535257s: None
[2021-04-30 10:35:44,217: WARNING/ForkPoolWorker-2] I have now processed task 1
[2021-04-30 10:35:44,219: INFO/ForkPoolWorker-2] Task task.func1[e93f9936-4d56-49a7-bb8b-757817235aa2] succeeded in 0.0021943179999652784s: None
[2021-04-30 10:35:44,726: WARNING/ForkPoolWorker-2] I have now processed task 3
[2021-04-30 10:35:44,727: INFO/ForkPoolWorker-2] Task task.func1[38b32102-7313-4e64-be77-f9565ce04683] succeeded in 0.00125738899987482s: None
[2021-04-30 10:35:44,809: INFO/MainProcess] New rate limit for tasks of type task.func1: 1/m.
[2021-04-30 10:35:44,810: INFO/MainProcess] Received task: task.func1[1acb9b7e-755e-4773-a3db-0a284c7024bb]
[2021-04-30 10:35:44,811: INFO/MainProcess] Received task: task.func1[b861a33a-0856-4044-a498-250c0da48d53]
[2021-04-30 10:35:44,811: WARNING/ForkPoolWorker-2] I have now processed task 4
[2021-04-30 10:35:44,812: INFO/ForkPoolWorker-2] Task task.func1[1acb9b7e-755e-4773-a3db-0a284c7024bb] succeeded in 0.0006612189990846673s: None
[2021-04-30 10:35:44,812: INFO/MainProcess] Received task: task.func1[e2e79f75-7628-4449-b880-e3a03020da7e]
[2021-04-30 10:36:44,892: WARNING/ForkPoolWorker-2] I have now processed task 5
[2021-04-30 10:36:44,892: INFO/ForkPoolWorker-2] Task task.func1[b861a33a-0856-4044-a498-250c0da48d53] succeeded in 0.00017851099983090535s: None
[2021-04-30 10:37:44,830: WARNING/ForkPoolWorker-2] I have now processed task 6
[2021-04-30 10:37:44,831: INFO/ForkPoolWorker-2] Task task.func1[e2e79f75-7628-4449-b880-e3a03020da7e] succeeded in 0.0007846450007491512s: None
- 在这里,您可以看到前 4 个任务(速率为每秒 10 个)都在 10:35:44 处理,而其他 3 个任务(更新速率为每分钟 1 个)在 10分别为:35:44、10:36:44 和 10:37:44。
参考:https://docs.celeryproject.org/en/latest/userguide/workers.html#changing-rate-limits-at-run-time