【问题标题】:How to inspect and cancel Celery tasks by task name如何按任务名称检查和取消 Celery 任务
【发布时间】:2013-03-12 15:12:00
【问题描述】:

我使用 Celery (3.0.15) 和 Redis 作为代理。

是否有一种直接的方法可以查询 Celery 队列中存在的具有给定名称的任务的数量?

作为后续,有没有办法取消 Celery 队列中存在的所有具有给定名称的任务?

我已经通过Monitoring and Management Guide 并没有看到那里的解决方案。

【问题讨论】:

    标签: python redis celery


    【解决方案1】:
    # Retrieve tasks
    # Reference: http://docs.celeryproject.org/en/latest/reference/celery.events.state.html
    query = celery.events.state.tasks_by_type(your_task_name)
    
    # Kill tasks
    # Reference: http://docs.celeryproject.org/en/latest/userguide/workers.html#revoking-tasks
    for uuid, task in query:
        celery.control.revoke(uuid, terminate=True)
    

    【讨论】:

    • 这听起来很有希望,但我没有得到任何结果。我为我的员工将CELERY_SEND_TASK_SENT_EVENT 设置为True,但调用celery.events.State().tasks_by_type(...) 返回一个空列表。
    • 嗯,尝试直接 celery.events.state.state,就像celerymon一样。 See the source.
    • 来自 celery shell,celery.events.state.state 仍然给了我一个空列表。是否缺少从事件队列初始化消费的步骤?
    • 好吧,你是对的:你需要提供 State 对象。复制粘贴github.com/celery/celerymon/blob/master/celerymon/…,然后拨打EventConsumer.start()
    • 我真的不明白这与答案有何关系。什么时候开始停止?什么时候内存满了?
    【解决方案2】:

    早期的答案没有解决一个问题,如果人们没有意识到这一点,可能会让他们大吃一惊。

    在已经发布的这些解决方案中,我将使用 Danielle's 并稍作修改:我会将任务导入我的文件并使用其 .name 属性将任务名称传递给 .tasks_by_type()

    app.control.revoke(
        [uuid for uuid, _ in
         celery.events.state.State().tasks_by_type(task.name)])
    

    但是,此解决方案将忽略那些已安排在未来执行的任务。 就像一些对其他答案发表评论的人一样,当我检查 .tasks_by_type() 返回的内容时,我有一个空列表。事实上,我的队列是空的。但我知道有计划在未来执行的任务,这些是我的主要目标。我可以通过执行celery -A [app] inspect scheduled 看到它们,但它们不受上面代码的影响。

    我设法通过这样做撤销了计划任务:

    app.control.revoke(
        [scheduled["request"]["id"] for scheduled in
         chain.from_iterable(app.control.inspect().scheduled()
                             .itervalues())])
    

    app.control.inspect().scheduled() 返回一个字典,其键是工作人员名称,值是调度信息的列表(因此,需要从itertools 导入的chain.from_iterable)。任务信息在调度信息的"request"字段中,"id"包含任务id。请注意,即使撤销后,计划任务仍会显示在计划任务中。被撤销的计划任务不会从计划任务列表中删除,直到它们的计时器到期或直到 Celery 执行一些清理操作。 (重启工人会触发这种清理。)

    【讨论】:

    • 有什么方法可以使用命令知道 cron 下安排了哪些任务?像节拍调度器?
    【解决方案3】:

    您可以在一个请求中执行此操作:

    app.control.revoke([
        uuid
        for uuid, _ in
        celery.events.state.State().tasks_by_type(task_name)
    ])
    

    【讨论】:

    • 很棒的单线解决方案
    • n.b.,对于 celery 4+,您必须使用 State().tasks_by_type
    【解决方案4】:

    像往常一样使用 Celery,这里没有一个答案对我有用根本,所以我做了我平常的事情,并拼凑了一个直接检查 redis 的解决方案。来了……

    # First, get a list of tasks from redis:
    import redis, json
    
    r = redis.Redis(
        host=settings.REDIS_HOST,
        port=settings.REDIS_PORT,
        db=settings.REDIS_DATABASES['CELERY'],
    )
    l = r.lrange('celery', 0, -1)
    
    # Now import the task you want so you can get its name
    from my_django.tasks import my_task
    
    # Now, import your celery app and iterate over all tasks 
    # from redis and nuke the ones that have a matching name.
    from my_django.celery_init import app
    for task in l:
         task_headers = json.loads(task)['headers']
         task_name = task_headers["task"]
         if task_name == my_task.name:
             task_id = task_headers['id']
             print("Terminating: %s" % task_id)
             app.control.revoke(task_id, terminate=True)
    

    请注意,以这种方式撤销可能不会撤销预取的任务,因此您可能不会立即看到结果。

    此外,此答案不支持优先任务。如果你想修改它来做到这一点,你需要my other answer that hacks redis 中的一些提示。

    【讨论】:

    • 很好的答案。在我的情况下,尝试从 celery cli 中获取任何东西通常都不成功,但这个答案立即让我开始了!
    【解决方案5】:

    看起来flower提供了监控:

    https://github.com/mher/flower

    使用 Celery Events 进行实时监控

    任务进度和历史 能够显示任务详细信息(参数、 开始时间、运行时间等)图形和统计远程控制

    查看工作人员状态和统计信息关闭和重新启动工作人员 实例 控制工作池大小和自动缩放设置 查看和 修改工作人员实例当前从 View 消耗的队列 正在运行的任务 查看计划任务(ETA/倒计时) 查看保留和 撤销的任务 应用时间和速率限制 配置查看器 撤销 或终止任务 HTTP API

    OpenID 身份验证

    【讨论】:

      猜你喜欢
      • 2013-10-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-12-16
      • 1970-01-01
      • 1970-01-01
      • 2012-02-20
      相关资源
      最近更新 更多