【发布时间】:2016-01-12 06:54:52
【问题描述】:
我有 celery 任务,队列中有 100 个输入数据,需要使用 5 个工作人员执行。
- 如何获取哪个工作人员正在执行哪个输入?
- 每个工作人员执行了多少输入及其状态?
- 如果任何任务失败,如何单独获取失败的输入数据并使用可用的工作人员重新执行?
是否有任何可能的方法来根据工人的具体情况定制芹菜。
我们可以把芹菜工人限制和花结合起来
我没有使用任何框架。
【问题讨论】:
标签: python celery-task
我有 celery 任务,队列中有 100 个输入数据,需要使用 5 个工作人员执行。
是否有任何可能的方法来根据工人的具体情况定制芹菜。
我们可以把芹菜工人限制和花结合起来
我没有使用任何框架。
【问题讨论】:
标签: python celery-task
我怎样才能知道哪个工作人员正在执行哪个输入?
使用多个工人有两种选择:
-c 即并发运行在一个命令中第一种方法,flower 将支持它,并将向您显示所有工人、所有任务(您称为输入)、哪个工人处理了哪个任务以及其他信息。
使用第二种方法,flower 将向您显示单个工人正在处理的所有任务。在这种情况下,您只能通过查看 celery worker 生成的日志来区分,因为它确实存储了哪个 worker 线程执行了哪个任务。所以,我认为根据您的要求,使用第一个选项会更好。
每个工作人员执行了多少输入及其状态?
正如我提到的,使用第一种方法,flower 会给你这个信息。
如果任何任务失败,如何分别获取失败的输入数据和 使用可用的工作人员重新执行?
Flower 确实提供了过滤器来过滤失败的任务,并且确实提供了退出时返回的状态任务。您还可以设置 celery 重试失败任务的次数。但即使重试任务失败,您也必须自己重新启动任务。
【讨论】:
For the first and second question:
1) Using Flower API:
You can use celery flower to keep track of it. Flower api can provide you the information like which task is being executed by which worker through simple api calls (/api/task/info/<task_id>)
2) Querying celery directly:
from celery import Celery
celery = Celery('vwadaptor', broker='redis://workerdb:6379/0',backend='redis://workerdb:6379/0')
celery.control.inspect().active()
3) Using celery events:
Link : http://docs.celeryproject.org/en/latest/userguide/monitoring.html
(look Real-time Processing)
You can create an event ( task created, task received, etc) and the response will have the worker name(hostname , see the link)
For the third question:
Use the config entry 'CELERY_ACKS_LATE=True' to retry failed tasks.
celery.conf.update(
CELERY_ACKS_LATE=True,
)
You can also track failed tasks using celery events mentioned above and retry failed tasks manually.
【讨论】: