【问题标题】:Terminate previous Celery task with same task id and run again if created终止具有相同任务 ID 的先前 Celery 任务并在创建后再次运行
【发布时间】:2022-10-25 15:19:05
【问题描述】:

在我的 django 项目中,我使用 TemplateView 类创建了一个视图类。同样,我正在使用 django 频道并且也创建了一个消费者类。现在,每当用户刷新页面时,我都在尝试使用 celery worker 来提取查询集数据。但问题是,如果用户在任务完成之前再次刷新页面,它会创建另一个导致过载的任务。

因此,我使用 revoke 来终止之前正在运行的任务。但我明白了,撤销永久撤销了任务 ID。我不知道如何清除这个。因为,每当用户调用它时,我想再次运行该任务。

视图.py

class Analytics(LoginRequiredMixin,TemplateView):
    template_name = 'app/analytics.html'
    login_url = '/user/login/'

    def get_context_data(self, **kwargs):
        app.control.terminate(task_id=self.request.user.username+'_analytics')
        print(app.control.inspect().revoked())
        context = super().get_context_data(**kwargs)
        context['sub_title'] = 'Analytics'
        return context

消费者.py

class AppConsumer(AsyncJsonWebsocketConsumer):
    
    async def connect(self):
        await self.accept()
        analytics_queryset_for_selected_devices.apply_async(
            args=[self.scope['user'].username],
            task_id=self.scope['user'].username+'_analytics'
            )

【问题讨论】:

    标签: django django-views celery django-celery django-channels


    【解决方案1】:

    现在我正在通过以下方式解决问题。在 consumer.py 中,我创建了一个断开功能,当 Web 套接字关闭时,该功能会撤销该任务。

    counter = 0 
           
    class AppConsumer(AsyncJsonWebsocketConsumer):
        
        async def connect(self):
            await self.accept()
            analytics_queryset_for_selected_devices.apply_async(args=[self.scope['user'].username],
                                                    task_id=self.scope['user'].username+str(counter))    
    
        async def disconnect(self, close_code):
            global counter
            app.control.terminate(task_id=self.scope['user'].username+str(counter), signal='SIGKILL')
            counter += 1
            await self.close()
    

    计数器用于制作新的唯一任务 ID。但是在这种方法中,对于每个请求,都会在撤销列表中添加一个新的任务 ID,这会导致内存加载。为了尽量减少问题,我将撤销列表大小限制为 20。

    from celery.utils.collections import LimitedSet
    from celery.worker import state
    
    state.revoked = LimitedSet(maxlen=20, expires=3600)
    

    【讨论】:

      【解决方案2】:

      你如何获得应用程序的参考?您是否使用项目名称重新初始化应用程序对象?

      【讨论】:

        猜你喜欢
        • 2012-05-26
        • 2014-12-02
        • 1970-01-01
        • 2015-10-22
        • 2014-09-14
        • 1970-01-01
        • 2015-10-04
        • 2011-03-19
        • 1970-01-01
        相关资源
        最近更新 更多