【问题标题】:Detect whether Celery is Available/Running检测 Celery 是否可用/正在运行
【发布时间】:2012-01-20 08:54:29
【问题描述】:

我正在使用Celery 来管理异步任务。然而,偶尔,celery 进程会停止,导致没有任何任务被执行。我希望能够检查 celery 的状态并确保一切正常,如果我检测到任何问题,则会向用户显示错误消息。从 Celery Worker 文档看来,我可以使用 pinginspect 来解决这个问题,但是 ping 感觉很笨拙,并且不清楚检查的确切用途(如果 inspect().registered() 是空的?)。

对此的任何指导将不胜感激。基本上我正在寻找的是这样的方法:

def celery_is_alive():
    from celery.task.control import inspect
    return bool(inspect().registered()) # is this right??

编辑:它甚至看起来不像 celery 2.3.3 上提供了 registered() (即使 2.1 文档列出了它)。也许 ping 是正确的答案。

编辑:Ping 似乎也没有像我想象的那样做,所以仍然不确定这里的答案。

【问题讨论】:

  • 下面的答案不适合您吗?作为有类似问题要解决的人,我希望得到一些确认。
  • 我知道这是一个老问题,但您能否详细说明ping 不是答案?似乎ping 正是正确的答案,一个简单的“乒乓”响应表明工人还活着。

标签: python django celery django-celery


【解决方案1】:

这是我一直在使用的代码。 celery.task.control.Inspect.stats() 返回一个 dict,其中包含有关当前可用工作人员的大量详细信息,如果没有工作人员正在运行,则返回 None,如果无法连接到消息代理,则引发 IOError。我正在使用 RabbitMQ - 其他消息传递系统的行为可能略有不同。这在 Celery 2.3.x 和 2.4.x 中有效;我不确定它有多远。

def get_celery_worker_status():
    ERROR_KEY = "ERROR"
    try:
        from celery.task.control import inspect
        insp = inspect()
        d = insp.stats()
        if not d:
            d = { ERROR_KEY: 'No running Celery workers were found.' }
    except IOError as e:
        from errno import errorcode
        msg = "Error connecting to the backend: " + str(e)
        if len(e.args) > 0 and errorcode.get(e.args[0]) == 'ECONNREFUSED':
            msg += ' Check that the RabbitMQ server is running.'
        d = { ERROR_KEY: msg }
    except ImportError as e:
        d = { ERROR_KEY: str(e)}
    return d

【讨论】:

  • 我发现上面每次运行都会给rabbitmq添加两个reply.celery.pidbox队列。这导致 rabbitmq 的内存使用量逐渐增加。
  • 为了完整起见,可以使用:sudo service celerybeatd status,检查调度器是否启动。
  • 这对我不起作用。使用 Redis 作为代理,当 redis 不可用且没有 celery worker 正在运行时,insp.stats() 只是阻塞。
  • 这对我不起作用。我想 celery 的 API 发生了变化(现在是 4.2 版)。
【解决方案2】:

来自the documentation of celery 4.2

from your_celery_app import app


def get_celery_worker_status():
    i = app.control.inspect()
    availability = i.ping()
    stats = i.stats()
    registered_tasks = i.registered()
    active_tasks = i.active()
    scheduled_tasks = i.scheduled()
    result = {
        'availability': availability,
        'stats': stats,
        'registered_tasks': registered_tasks,
        'active_tasks': active_tasks,
        'scheduled_tasks': scheduled_tasks
    }
    return result

当然,您可以/应该通过错误处理来改进代码...

【讨论】:

  • 为了检查可用性,还有i.ping(),它在失败时返回None
  • 谢谢蒂姆。我在函数中添加了“可用性”
【解决方案3】:

在 celery 作为守护进程运行的情况下使用命令行进行检查,

  • 激活 virtualenv 并转到 'app' 所在的目录
  • 现在运行:celery -A [app_name] status
  • 它会显示 celery 是否已启动,加号。在线节点数

来源: http://michal.karzynski.pl/blog/2014/05/18/setting-up-an-asynchronous-task-queue-for-django-using-celery-redis/

【讨论】:

    【解决方案4】:

    以下内容对我有用:

    import socket
    from kombu import Connection
    
    celery_broker_url = "amqp://localhost"
    
    try:
        conn = Connection(celery_broker_url)
        conn.ensure_connection(max_retries=3)
    except socket.error:
        raise RuntimeError("Failed to connect to RabbitMQ instance at {}".format(celery_broker_url))
    

    【讨论】:

    • 我很确定如果 rabbitmq 正在运行,无论 celery 的状态如何,这都会成功。但是,如果 celery 无法知道失败是由于 rabbitmq 还是其他原因,这是一个很好的检查。
    【解决方案5】:

    测试任何工作人员是否响应的一种方法是发送“ping”广播并在第一次响应时返回成功结果。

    from .celery import app  # the celery 'app' created in your project
    
    def is_celery_working():
        result = app.control.broadcast('ping', reply=True, limit=1)
        return bool(result)  # True if at least one result
    

    这会广播“ping”并等待最多一秒钟的响应。一旦第一个响应进来,它就会返回一个结果。如果您想更快地获得False 结果,可以添加timeout 参数以减少放弃前等待的时间。

    【讨论】:

      【解决方案6】:

      我找到了一个优雅的解决方案:

      from .celery import app
      try:
          app.broker_connection().ensure_connection(max_retries=3)
      except Exception as ex:
          raise RuntimeError("Failed to connect to celery broker, {}".format(str(ex)))
      

      【讨论】:

        【解决方案7】:

        您可以使用ping 方法检查是否有任何工作人员(或特定工作人员)还活着
        https://docs.celeryproject.org/en/latest/_modules/celery/app/control.html#Control.ping

        celey_app.control.ping()

        【讨论】:

          【解决方案8】:

          下面的脚本适合我。

              #Import the celery app from project
              from application_package import app as celery_app
              def get_celery_worker_status():
                  insp = celery_app.control.inspect()
                  nodes = insp.stats()
                  if not nodes:
                      raise Exception("celery is not running.")
                  logger.error("celery workers are: {}".format(nodes))
                  return nodes
          

          【讨论】:

            猜你喜欢
            • 2014-05-17
            • 2017-01-08
            • 2016-12-24
            • 1970-01-01
            • 1970-01-01
            • 2010-09-06
            • 1970-01-01
            • 1970-01-01
            • 2012-12-25
            相关资源
            最近更新 更多