【问题标题】:Monitor a celery task state without polling?无需轮询即可监控 celery 任务状态?
【发布时间】:2015-09-12 14:57:17
【问题描述】:

是否可以在不轮询的情况下监控 celery 任务的状态?

例如,如果我有一个使用update_state 定期更新其状态的任务:

@task(bind=True)
def my_task(self):
    for x in range(100):
        time.sleep(1)
        self.update_state(state='PROGRESS', meta={'x': x})

是否可以在不进行轮询的情况下从另一个进程监视该状态?

【问题讨论】:

    标签: python celery


    【解决方案1】:

    我自己还没有这样做,所以这不是一个完整的答案,但我有兴趣解决同样的问题。我有三个值得考虑的想法:

    【讨论】:

      【解决方案2】:

      这取决于您的代理/后端。如果他们使用 pub/sub,那么您的进程将不需要轮询任务状态。如果他们不这样做,那么您将需要轮询(或更一般地说,是一个涉及发送消息询问状态的过程)。

      我认为 celery 和 redis 都有这种能力。

      【讨论】:

      • “进程不需要轮询任务状态”——但是它将如何访问任务状态?
      • 但是你具体是怎么做的呢? Celery 是否公开 API?
      【解决方案3】:

      假设您想跟踪生产者的进度,您需要为此使用事件监控 API。其他的api、handlers和signals,只在worker中运行;所以没有用跟踪生产者的进度。

      查看文档中名为“监控和管理指南”的部分,您应该能够几乎逐字运行示例代码(用于事件接收器)。但是,它没有提到,也需要为工作人员启用事件生成。你工作的 -E 参数将解决这个问题。

      【讨论】:

        【解决方案4】:

        on_message 回调允许您指定在任务更改状态时调用的函数。

        例如,以下任务在被调用时会发送一个自定义的PROGRESS 状态。

        from celery import Celery
        
        app = Celery("tasks", broker="pyamqp://guest@localhost//", backend="rpc://")
        
        @app.task(bind=True)
        def square(self, x):
            self.update_state(state="PROGRESS")
            return x**2
        

        你可以看到这是这样的调用者。

        >>> tasks.square.delay(5).get(on_message=print)
        {'task_id': 'f63eb965-3cdc-4bcc-a5e1-b0bb2149c376', 'status': 'PROGRESS', 'result': None, 'traceback': None, 'children': []}
        {'task_id': 'f63eb965-3cdc-4bcc-a5e1-b0bb2149c376', 'status': 'SUCCESS', 'result': 25, 'traceback': None, 'children': []}
        25
        

        如果您想要一个更复杂的示例,我有一个 gist,它使用 Celery 在更新进度条的同时并行复制文件目录。

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 2016-10-30
          • 1970-01-01
          • 1970-01-01
          • 2017-12-21
          • 1970-01-01
          • 1970-01-01
          • 2014-06-13
          • 2012-02-11
          相关资源
          最近更新 更多