【问题标题】:How to stop celery task while it is being executed and continue the execution after some time?如何在执行 celery 任务时停止它并在一段时间后继续执行?
【发布时间】:2017-04-14 17:55:19
【问题描述】:

我有以下 celery 任务(简化),它与 Twitter API 交互。

@app.task
def get_followers(screen_name, **kwargs):
    cursor = kwargs.get('cursor', -1)
    followers = kwargs.get('followers', [])
    while True:
        response = twitter_api.call('followers', 'ids', screen_name=screen_name, cursor=cursor)
        if response.status_code == '429': # RATE LIMIT EXCEEDED
            # do something here

        cursor = response.json()['next_cursor']
        if cursor == 0: # we're done
            break
    return followers

我希望能够在达到速率限制时暂停任务一段时间,并从停止点恢复执行。 (或者抛出错误并重试任务,传入额外的 kwargs)。这如何实现?

【问题讨论】:

  • @mohammad-yusuf-ghazi 我知道这些页面,我对这里的芹菜部分特别感兴趣。据我了解,如果我有sleep_on_rate_limit=True 并且超过了速率限制,那么任务只会“挂在那里”,在接下来的 15 分钟内消耗工作池中的空间,不是吗?
  • 你将不得不对 api 进行异步调用。检查这个问题的链接。 *.com/questions/17668895/…
  • @mohammad-yusuf-ghazi 抱歉,但我不明白这与我的问题有什么关系。我对 celery 任务有问题,而不是 API 调用。此外,我使用 django 而不是 tornado(仅在问题的标签部分提到,但它应该与 hypothetical 解决方案无关)

标签: python django python-3.x celery


【解决方案1】:

当您发现 429 错误代码时,您可以重试您的任务:

@app.task(bind=True)
def get_followers(self, screen_name, **kwargs):
    cursor = kwargs.get('cursor', -1)
    followers = kwargs.get('followers', [])
    while True:
        response = twitter_api.call('followers', 'ids', screen_name=screen_name, cursor=cursor)
        if response.status_code == '429': 
            # RATE LIMIT EXCEEDED
            self.retry(countdown=15*60)

        cursor = response.json()['next_cursor']
        if cursor == 0: # we're done
            break
    return followers

请注意,我在您的任务装饰器中添加了bind=True,并在您的任务定义中添加了self 作为参数,以便在您获得429 时能够执行self.retry

retry 中使用参数countdown 表示您希望何时重试任务(以秒为单位)。这里我选择了 15min(twitter API 速率限制)

您可以在 celery 文档中找到有关重试的更多信息:

http://docs.celeryproject.org/en/latest/userguide/tasks.html#retrying

【讨论】: