【问题标题】:Pause celery task暂停芹菜任务
【发布时间】:2018-07-04 00:52:43
【问题描述】:

我正在尝试根据用户按钮单击临时暂停 celery 任务。

我所做的是:

当用户点击按钮时;我发布了一个 AJAX 请求,将我的 celery 任务状态更新为“PAUSE”

然后;我的策略是;当我对芹菜发起一项任务时;它运行一个 for 循环。 每个 for 循环;我读了我的数据库“状态”,看看它是否设置为暂停:如果它设置为暂停;我想让它休眠 60 秒或让它休眠直到用户点击恢复按钮;同样的想法。

这是我的代码:

r  = redis.StrictRedis(host='localhost', port=6379, db=0)

@celery.task(bind=True)
def runTask(self, arr)

   for items in arr:
      current_task_id = self.request.id
      item = r.get('celery-task-meta-'+current_task_id)
      load_as_json = json.loads(item)
      if "PAUSE" in load_as_json['status']:
        sleep(50) 




@app.route('/start')
def start_task()
   runTask.apply_async(args=[arr])
   return 'task started running

这是我的暂停 API 端点的样子:

@app.route('/stop/<task_id>')
def updateTaskState():
  task_id = request.cookie.get('task_id')
  loadAsJson = json.loads(r.get('celery-task-meta-'+str(task_id)))
  loadAsJson['status'] = 'PAUSE'
  loadAsJson.update(loadAsJson)
  dump_as_json = json.dumps(loadAsJson)
  updated_state = r.set('celery-task-meta-'+last_key, dump_as_json)
  return 'updated state';

根据我的概念理解;是我没有看到更新状态的原因是因为;该任务已经执行并且无法从数据库中检索更新的值。 仅供参考:任务更新状态立即设置为暂停;我通过创建一个单独的脚本来检查这一点,该脚本检查while循环中的状态;每次我单击释放 AJAX 请求以更新状态的按钮时;我的数据库得到更新,它在单独的脚本上显示“PAUSE”;但是在 @celery.task 装饰器中,我似乎无法获得更新的状态。

以下是我用来测试的单独脚本;它似乎是预期的更新状态;我只是无法在任务装饰器中获得更新的状态......奇怪。

r  = redis.StrictRedis(host='localhost', port=6379, db=0)
last_key = r.keys()
while True:
    response = r.get('celery-task-meta-b1534a87-e18b-4f0a-89e2-08348d833056')
    loadAsJson = json.loads(response)
    print loadAsJson['status']

【问题讨论】:

    标签: python flask celery


    【解决方案1】:

    面对同样的问题并且没有好的答案,我想出了您可能喜欢的解决方案,它不依赖于您使用的消息队列(又名 Redis 或 RabbitMQ)。对我来说,关键是 celery.app.task.Task 类中的 update_state 方法将 task_id 作为可选参数。就我而言,我正在通过多个工作节点运行长时间运行的文件复制和校验和任务,有时用户希望暂停一个正在运行的任务以降低对存储的性能要求,以允许其他任务首先完成。我还在运行一个无状态的 Flask REST API 来启动后端任务并检索正在运行的任务的状态,因此我需要一种方法来让 API 调用进入以暂停和恢复任务。

    这是我的测试函数,它可以通过监控自己的状态接收“消息”来暂停自己:

    celery.task(bind=True)
    def long_test(self, i):
        print('long test starting with delay of ' + str(i) + 'seconds on each loop')
        print('task_id =' + str(self.request.id))
        self.update_state(state='PROCESSING')
        count = 0
        while True:
            task = celery.AsyncResult(self.request.id)
            while task.state == 'PAUSING' or task.state == 'PAUSED':
                if task.state == 'PAUSING':
                    self.update_state(state='PAUSED')
                time.sleep(i)
            if task.state == 'RESUME':
                self.update_state(state='PROCESSING')
            print('long test loop ' + str(count) + ' ' + str(task.state))
            count += 1
            time.sleep(i)
    

    然后,为了暂停或恢复,我可以执行以下操作:

    >>> from project.celeryworker.tasks import long_test
    >>> from project import create_app, make_celery
    >>> flaskapp = create_app()
    >>> celery = make_celery(flaskapp)
    >>> from celery.app.task import Task
    >>> long_test.apply_async(kwargs={'i': 5})
    <AsyncResult: bf19d50f-cf04-47f0-a069-6545fb253887>
    >>> Task.update_state(self=celery, task_id='bf19d50f-cf04-47f0-a069-6545fb253887', state='PAUSING')
    >>> celery.AsyncResult('bf19d50f-cf04-47f0-a069-6545fb253887').state
    'PAUSED'
    >>> Task.update_state(self=celery, task_id='bf19d50f-cf04-47f0-a069-6545fb253887', state='RESUME')
    >>> celery.AsyncResult('bf19d50f-cf04-47f0-a069-6545fb253887').state
    'PROCESSING'
    >>> Task.update_state(self=celery, task_id='bf19d50f-cf04-47f0-a069-6545fb253887', state='PAUSING')
    >>> celery.AsyncResult('bf19d50f-cf04-47f0-a069-6545fb253887').state
    'PAUSED'
    

    【讨论】:

    • 你能帮我写代码吗?我试图实现你的方法,但我得到了很多错误。需要帮助
    猜你喜欢
    • 2017-04-23
    • 2013-05-05
    • 2014-12-04
    • 1970-01-01
    • 2012-12-01
    • 2014-04-16
    • 2012-08-30
    • 2011-11-20
    • 2017-10-14
    相关资源
    最近更新 更多