【问题标题】:Deleting all pending tasks in celery / rabbitmq删除 celery / rabbitmq 中的所有待处理任务
【发布时间】:2011-11-01 05:18:05
【问题描述】:

如何在不知道每个任务的task_id 的情况下删除所有待处理任务?

【问题讨论】:

    标签: task rabbitmq celery celery-task


    【解决方案1】:

    来自docs

    $ celery -A proj purge
    

    from proj.celery import app
    app.control.purge()
    

    (编辑:用当前方法更新。)

    【讨论】:

    • 或者,来自 Django,对于 celery 3.0+:manage.py celery purgeceleryctl 现在已弃用,将在 3.1 中消失)。
    • 我找到了这个答案来寻找如何使用 redis 后端来做到这一点。我发现的最佳方法是redis-cli KEYS "celery*" | xargs redis-cli DEL,它对我有用。这将清除存储在您正在使用的 redis 后端的所有任务。
    • 如何在 celery 3.0 中做到这一点?
    • 对我来说,它只是celery purge(在相关的虚拟环境中)。哎呀-下面有一个相同的答案.....stackoverflow.com/a/20404976/1213425
    • 对于 Celery 4.0+ 与 Django 的组合,它又是这个命令,-A 的参数是 celery.py 所在的 Django 应用程序。
    【解决方案2】:

    对于 celery 3.0+:

    $ celery purge
    

    清除特定队列:

    $ celery -Q queue_name purge
    

    【讨论】:

    • 如果出现连接错误,请确保指定应用,例如celery -A proj purge.
    • 我相信 -Q 标志已被弃用(对我不起作用,“没有这样的选项”),删除 Celery 5.0.5 上的特定队列,你会运行 celery -A appname清除 --queues 队列名称
    【解决方案3】:

    对于 Celery 2.x 和 3.x:

    当使用带有-Q参数的worker来定义队列时,例如

    celery worker -Q queue1,queue2,queue3
    

    那么celery purge 将不起作用,因为您无法将队列参数传递给它。它只会删除默认队列。 解决方案是使用 --purge 参数启动您的工作人员,如下所示:

    celery worker -Q queue1,queue2,queue3 --purge
    

    然而,这将运行工人。

    其他选项是使用 celery 的 amqp 子命令

    celery amqp queue.delete queue1
    celery amqp queue.delete queue2
    celery amqp queue.delete queue3
    

    【讨论】:

    • 是的,这是针对旧版本(2.x 和 3.x)的 celery。我无法编辑答案
    【解决方案4】:

    在 Celery 3+ 中:

    命令行界面:

    $ celery -A proj purge
    

    以编程方式:

    >>> from proj.celery import app
    >>> app.control.purge()
    

    http://docs.celeryproject.org/en/latest/faq.html#how-do-i-purge-all-waiting-tasks

    【讨论】:

      【解决方案5】:

      我发现celery purge 不适用于我更复杂的 celery 配置。我将多个命名队列用于不同的目的:

      $ sudo rabbitmqctl list_queues -p celery name messages consumers
      Listing queues ...  # Output sorted, whitespaced for readability
      celery                                          0   2
      celery@web01.celery.pidbox                      0   1
      celery@web02.celery.pidbox                      0   1
      apns                                            0   1
      apns@web01.celery.pidbox                        0   1
      analytics                                       1   1
      analytics@web01.celery.pidbox                   0   1
      bcast.361093f1-de68-46c5-adff-d49ea8f164c0      0   1
      bcast.a53632b0-c8b8-46d9-bd59-364afe9998c1      0   1
      celeryev.c27b070d-b07e-4e37-9dca-dbb45d03fd54   0   1
      celeryev.c66a9bed-84bd-40b0-8fe7-4e4d0c002866   0   1
      celeryev.b490f71a-be1a-4cd8-ae17-06a713cc2a99   0   1
      celeryev.9d023165-ab4a-42cb-86f8-90294b80bd1e   0   1
      

      第一列是队列名称,第二列是队列中等待的消息数,第三列是该队列的侦听器数。队列是:

      • celery - 标准幂等 celery 任务的队列
      • apns - Apple 推送通知服务任务的队列,不完全是幂等的
      • 分析 - 为长时间运行的夜间分析排队
      • *.pidbox - 工作人员命令队列,例如关机和重置,每个工作人员一个(2 个 celery 工作人员,一个 apns 工作人员,一个分析工作人员)
      • bcast.* - 广播队列,用于向所有监听队列的工作人员发送消息(而不仅仅是第一个获取队列的工作人员)
      • celeryev.* - Celery 事件队列,用于报告任务分析

      分析任务是一项蛮力任务,在处理小型数据集时效果很好,但现在需要超过 24 小时才能处理。有时,会出现问题,它会卡在等待数据库上。它需要重新编写,但在那之前,当它卡住时,我会终止任务,清空队列,然后重试。我通过查看分析队列的消息计数来检测“卡顿”,它应该是 0(完成分析)或 1(等待昨晚的分析完成)。 2 或更高是不好的,我会收到一封电子邮件。

      celery purge 提议从其中一个广播队列中删除任务,但我没有看到选择不同命名队列的选项。

      这是我的过程:

      $ sudo /etc/init.d/celeryd stop  # Wait for analytics task to be last one, Ctrl-C
      $ ps -ef | grep analytics  # Get the PID of the worker, not the root PID reported by celery
      $ sudo kill <PID>
      $ sudo /etc/init.d/celeryd stop  # Confim dead
      $ python manage.py celery amqp queue.purge analytics
      $ sudo rabbitmqctl list_queues -p celery name messages consumers  # Confirm messages is 0
      $ sudo /etc/init.d/celeryd start
      

      【讨论】:

      • 虽然不是答案,是吗?但是信息量很大!
      • celeryctl purge 不适用于命名队列。 python manage.py celery amqp queue.purge &lt;queue_name&gt; 做到了。我认为上下文对那些设置复杂的人很有用,所以他们可以弄清楚如果celeryctl purge 对他们来说失败了,他们需要做什么。
      • 我在我的 Celery 3.1.17 中找不到 manage.py,该文件是被删除还是刚刚更新?但是,我在*/bin/amqp.py 中找到了看起来像相应接口(@98​​7654330@)的东西。但是在尝试将文件的内容与文档相关联之后,我必须遗憾地承认,Celery 没有文档可悲,而且也是一个非常令人费解的工作,至少从它的源代码来看是这样。跨度>
      • manage.py 是 Django 管理脚本,manage.py celery 在从 Django 设置加载配置后运行 celery。我没有在 Django 之外使用过 celery,但包含的 celery 命令可能就是您要找的:celery.readthedocs.org/en/latest/userguide/monitoring.html
      【解决方案6】:

      在 Celery 3+ 中

      http://docs.celeryproject.org/en/3.1/faq.html#how-do-i-purge-all-waiting-tasks

      CLI

      清除命名队列:

       celery -A proj amqp queue.purge <queue name>
      

      清除配置的队列

      celery -A proj purge
      

      我已清除消息,但队列中仍有消息? 答:任务在实际执行后立即得到确认(从队列中删除)。 Worker 收到任务后,需要一段时间才能真正执行,特别是如果有很多任务已经在等待执行。未确认的消息由工作程序保留,直到它关闭与代理(AMQP 服务器)的连接。当该连接关闭时(例如,因为工作人员已停止),代理将重新发送任务到下一个可用工作人员(或重新启动时的同一个工作人员),因此要正确清除等待任务的队列,您必须停止所有工作人员,然后使用 celery.control.purge() 清除任务。

      因此,要清除整个队列,必须停止工作人员。

      【讨论】:

        【解决方案7】:

        如果您想删除所有未决任务以及活动和保留任务以完全停止 Celery,这对我有用:

        from proj.celery import app
        from celery.task.control import inspect, revoke
        
        # remove pending tasks
        app.control.purge()
        
        # remove active tasks
        i = inspect()
        jobs = i.active()
        for hostname in jobs:
            tasks = jobs[hostname]
            for task in tasks:
                revoke(task['id'], terminate=True)
        
        # remove reserved tasks
        jobs = i.reserved()
        for hostname in jobs:
            tasks = jobs[hostname]
            for task in tasks:
                revoke(task['id'], terminate=True)
        

        【讨论】:

          【解决方案8】:

          1。 要正确清除等待任务的队列,您必须停止所有工作人员 (http://celery.readthedocs.io/en/latest/faq.html#i-ve-purged-messages-but-there-are-still-messages-left-in-the-queue):

          $ sudo rabbitmqctl stop
          

          或者(如果 RabbitMQ/消息代理由 Supervisor 管理):

          $ sudo supervisorctl stop all
          

          2。 ...然后从特定队列中清除任务:

          $ cd <source_dir>
          $ celery amqp queue.purge <queue name>
          

          3。 启动 RabbitMQ:

          $ sudo rabbitmqctl start
          

          或(如果 RabbitMQ 由 Supervisor 管理):

          $ sudo supervisorctl start all
          

          【讨论】:

            【解决方案9】:

            芹菜 4+ celery purge 命令清除所有配置的任务队列

            celery -A *APPNAME* purge
            

            以编程方式:

            from proj.celery import app
            app.control.purge()
            

            所有待处理的任务都将被清除。 参考:celerydoc

            【讨论】:

              【解决方案10】:

              对于以 RabbitMQ 作为代理的 Celery 版本 5.0+

              我们需要先建立一个从程序到代理的新连接, 并将连接与要清除的队列绑定。

              # proj/celery.py
              from celery import Celery
              app = Celery('proj')
              
              from proj.celery import app
              queues = ['queue_A', 'queue_B', 'queue_C']
              with app.connection_for_write() as conn:
                  conn.connect()
                  for queue in queues:
                      count = app.amqp.queues[queue].bind(conn).purge()
                      self.stdout.write(self.style.SUCCESS(f'Purge {queue} with {count} message(s)'))
              

              【讨论】:

                猜你喜欢
                • 2017-12-25
                • 1970-01-01
                • 1970-01-01
                • 1970-01-01
                • 1970-01-01
                • 1970-01-01
                • 2019-05-27
                • 1970-01-01
                • 2018-06-03
                相关资源
                最近更新 更多