【问题标题】:Pass parameter to Celery task's on_failure method将参数传递给 Celery 任务的 on_failure 方法
【发布时间】:2018-01-04 22:12:28
【问题描述】:

我正在创建一个自定义 Celery 任务类,以覆盖当任务达到最大重试次数 (on_failure) 时发生的情况。如果任务失败,我需要更新用户模型的状态。

下面是我的自定义任务类:

class ReadyTask(Task):

     def run(self, user):
         try:
             user.get_results()
         except Exception as exc:
             raise self.retry(exc=exc, max_retries=3)

     def on_failure(self, exc, task_id, *args, **kwargs):
         user.status = Status.READY
         user.save()

如何将 User 对象传递给 on_failure() 方法以更新其状态?

【问题讨论】:

    标签: python task celery scheduled-tasks django-celery


    【解决方案1】:

    如果您将其作为参数发送到您调用的任务中,我相信您可以检查您的argskwargs 以获取用户的ID。如果你在 kwargs 中做到这一点会更容易,这样你就不必进行 arg 位置检查。然后从 id 中获取您的用户并进行更改?

    因此,不要将其发送到run 函数,而是作为参数/关键字参数发送给您正在调用的任务的函数,通过function.apply(kwargs)function.apply_async(kwargs=kwargs)function.delay(kwargs)

    所以:

    user_id = kwargs.get('user_id')

    # then resolve to user object, then update object

    【讨论】:

      【解决方案2】:

      您还可以将对象绑定到您的自定义任务类。通过使用bind=True

      class ReadyTask(Task):
      
           def run(self, user):
      
               self.user_object = user
      
               try:
                   self.user_object.get_results()
               except Exception as exc:
                   raise self.retry(exc=exc, max_retries=3)
      
           def on_failure(self, exc, task_id, *args, **kwargs):
               self.user_object.status = Status.READY 
               self.user_object.save()
      
      @app.task(base=ReadyTask, bind=True)
      def do_stuff(self, *args, **kwargs):
          self.user_object.do_stuff()
      

      【讨论】:

        猜你喜欢
        • 2018-10-19
        • 2019-11-13
        • 2012-03-22
        • 1970-01-01
        • 1970-01-01
        • 2016-02-28
        • 2015-12-05
        • 2015-08-17
        • 2021-11-22
        相关资源
        最近更新 更多