【问题标题】:Update tasks in Celery with RabbitMQ使用 RabbitMQ 更新 Celery 中的任务
【发布时间】:2019-08-15 15:49:18
【问题描述】:

我在我的 django 项目中使用 Celery 来创建在未来特定时间发送电子邮件的任务。用户可以使用notify_on 日期时间字段创建通知实例。然后我将notify_on 的值作为eta 传递。

class Notification(models.Model):
    ...
    notify_on = models.DateTimeField()


def notification_post_save(instance, *args, **kwargs):
    send_notification.apply_async((instance,), eta=instance.notify_on)

signals.post_save.connect(notification_post_save, sender=Notification)

这种方法的问题在于,如果notify_on 将被用户更改,他将收到两个(或更多)通知而不是一个。

问题是我如何更新与特定通知关联的任务,或者以某种方式删除旧的并创建新的。

【问题讨论】:

    标签: django rabbitmq celery


    【解决方案1】:

    首先,通过使用post_save,我们无法获取旧数据。所以,我在这里重写了 Notification 模型的 save() 方法。除此之外,创建一个字段来存储 celery task_id。

    from celery.task.control import revoke
    
    
    class Notification(models.Model):
        ...
        notify_on = models.DateTimeField()
        celery_task_id = models.CharField(max_length=100)
    
        def save(self, *args, **kwargs):
            pre_notify_on = Notification.objects.get(pk=self.pk).notify_on
            super().save(*args, **kwargs)
            post_notify_on = self.notify_on
            if not self.celery_task_id:  # initial task creation
                task_object = send_notification.apply_async((self,), eta=self.notify_on)
                Notification.objects.filter(pk=self.pk).update(celery_task_id=task_object.id)
            elif pre_notify_on != post_notify_on:
                # revoke the old task
                revoke(self.celery_task_id, terminate=True)
                task_object = send_notification.apply_async((self,), eta=self.notify_on)
                Notification.objects.filter(pk=self.pk).update(celery_task_id=task_object.id)

    参考

    1. Cancel an already executing task with Celery?
    2. Django: How to access original (unmodified) instance in post_save signal

    【讨论】:

    • 好的,但是如果模型实例已更新,它将如何帮助我更新任务上的eta?并且通知不会在任务创建后立即发送,它将在将来的某个时间发送。
    • 成功了,谢谢!我只需要尝试 - 除了save 方法的第一行,它在初始创建时给了我DoesNotExist Notification matching query does not exist.。非常感谢您的帮助!
    【解决方案2】:

    我认为没有必要删除以前的任务。您只需要验证正在执行的任务是最后一个。为此,创建一个名为 checksum 的新字段,它是一个 UUID 字段,每次更改 notify_on 时都会更新该字段。在您发送电子邮件的任务中检查此校验和。

    class Notification(models.Model):
        checksum = models.UUIDField(default=uuid.uuid4)
        notify_on = models.DateTimeField()
    
    def notification_post_save(instance, *args, **kwargs):
        send_notification.apply_async((instance.id, str(instance.checksum)),eta=instance.notify_on)
    
    signals.post_save.connect(notification_post_save, sender=Notification)
    
    
    @shared_task 
    def send_notification(notification_id, checksum):
        notification = Notification.objects.get(id=notification_id)
        if str(notification.checksum) != checksum:
            return False
        #send email
    

    另外请不要每次都在通知对象上发送信号,只需在 notify_on 更改时发送。你也可以检查这个 Identify the changed fields in django post_save signal

    【讨论】:

    • 那行不通。假设用户使用notify_on=14:40 创建了通知,任务会立即创建并在那个时候执行。如果稍后,在任务执行之前,用户将更新通知并将notify_on 设置为 14:50,该任务也将并且应该注册。用户将收到两个通知,而不是一个。
    • 不,这不会发生,因为在任务执行期间我正在调用数据库并获取最新的校验和。假设用户使用 notify_on=14:40 任务创建的通知注册了校验和“123”(假设)。我现在一个用户将 notify_on 更改为 14:50 使用不同的校验和“456”创建新任务。现在,当第一个任务在 14:40 执行时,它将检查来自 db 的最新校验和,即“456”和随任务 args 发送的校验和,即“123”,它将返回 False。
    • 但我认为 JPG 解决方案最好只是撤销任务。
    • 哦,现在我明白了。谢谢,以后一定会用到这个技巧
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2011-07-18
    • 2011-07-17
    • 1970-01-01
    • 1970-01-01
    • 2014-06-13
    • 2015-08-08
    • 1970-01-01
    相关资源
    最近更新 更多