【问题标题】:Receiving events from celery task从 celery 任务接收事件
【发布时间】:2015-12-11 13:58:03
【问题描述】:

我有一个长时间运行的 celery 任务,它遍历一组项目并执行一些操作。

任务应该以某种方式报告它当前正在处理的项目,以便最终用户知道任务的进度。

目前我的 django 应用程序和 celery 一起在一台服务器上,所以我可以使用 Django 的模型来报告状态,但我计划添加更多远离 Django 的工作人员,所以他们无法访问数据库。

现在我看到几个解决方案:

  • 使用一些存储手动存储中间结果,例如 redis 或 mongodb,然后在网络上可用。这让我有点担心,因为例如,如果我将使用 redis,那么我应该在 Django 端保持同步读取状态的代码和写入状态的 Celery 任务,因此它们使用相同的键。
  • 使用 REST 调用从 celery 向 Django 报告状态。喜欢PUT http://django.com/api/task/123/items_processed
  • 也许使用 Celery 事件系统并创建像 Item processed 这样 django 更新计数器的事件
  • 创建一个单独的worker,它在一个带有django的服务器上运行,它持有一个只会增加items proceeded计数的任务,所以当任务完成一个项目时它会发出increase_messages_proceeded_count.delay(task_id)

我提到的有什么解决办法或隐藏的问题吗?

【问题讨论】:

  • 为什么他们不能到达数据库?
  • 因为我想减少耦合,不想暴露DB。
  • 我可以理解这一点,并且我将工作人员设计为尽可能解耦,但对于我为工作人员设计的工作负载,如果它知道一些事情(例如,可以访问数据库),那么它的能力会更强。我可以看到不需要 Django DB 的任务,例如“生成 PDF”或“渲染此图像”。我只是想知道你是否需要它或者你想要它。我将在下面的答案中详细说明。

标签: django celery


【解决方案1】:

可能有很多方法可以实现您的目标,但我会这样做。

在您长期运行的 celery 任务中,使用 django's caching framework 设置进度:

from django.core.cache import cache

@app.task()
def long_running_task(self, *args, **kwargs):
    key = "my_task: %s" % self.result.id
    ...
    # do whatever you need to do and set the progress
    # using cache:
    cache.set(key, progress, timeout="whatever works for you")
    ...

然后您所要做的就是使用该键发出一个重复的 AJAX GET 请求并从缓存中检索进度。类似的东西:

 def task_progress_view(request, *args, **kwargs):
     key = request.GET.get('task_key')
     progress = cache.get(key)
     return HttpResponse(content=json.dumps({'progress': progress}),
                         content_type="application/json; charset=utf-8")

这里有一个警告,如果您将服务器作为多个进程运行,请确保您使用的是 memcached 之类的东西,因为 django 的本机缓存将在进程之间不一致。另外,我可能不会使用 celery 的 task_id 作为键,但它足以用于演示目的。

【讨论】:

    【解决方案2】:

    看看flower - 一个用于 Celery 分布式任务队列的实时监控器和网络管理员:

    您需要它来进行演示,对吗? Flower 与 websocket 一起使用。

    例如 - 实时接收任务完成事件(取自官方文档):

    var ws = new WebSocket('ws://localhost:5555/api/task/events/task-succeeded/');
    ws.onmessage = function (event) {
        console.log(event.data);
    }
    

    您可能需要处理任务 ('ws://localhost:5555/api/tasks/')。

    我希望这会有所帮助。

    【讨论】:

    • 仅限于 celery 提供的事件。对于任务开始/完成的事件,我不需要 Flower。我需要自定义事件,例如完成任务的阶段 X。
    • Flower 甚至不记录自定义状态。例如,“进展”状态更新不会反映在 Flower 中。我想,提问者(和我)想要的是报告类似于记录器的任务执行的东西。
    【解决方案3】:

    最简单的:

    您的任务和 django 应用程序已经共享访问一两个数据存储 - 代理和结果后端(如果您使用的与代理不同)

    您可以简单地将一些数据放入这些数据存储中的一个或另一个中,以指示任务当前正在处理的项目。

    例如如果使用 redis 只需有一个键“当前正在处理的任务”并将与正在处理的项目相关的数据存储在其中。

    【讨论】:

    • 你说得对,我可以通过手动维护一些计数器来利用redis/mongo/mysql。这就是我现在所做的。我想到的是如何利用已经使用的 MQ Celery。所以每个任务都会产生一些像item proceeded这样的事件,这些事件将在django所在的服务器上得到处理。因为当物品处理结束时,柜台并不是我唯一需要做的事情。可能还有更多任务处理项不知道的事情需要处理。
    • 我以为你说数据库(mysql)对工人不可用
    • 现在是,但这是我想要避免的。
    • 目前我需要一个简单的计数器来向用户显示进行了多少项目。但在最近的功能中,我需要更多。 Celery 已经使用了 pub/sub redis,所以我认为有一种方法可以在 celery 中完成这项工作。它有一个事件系统,但事件类型的数量确实有限,而且它们都与 celery 监控有关。所以否则我需要手动使用redis,这不是很好。
    • 因为它只是一个计数器 - 是的。但随后我需要创建一个发布/订阅频道,以便任何人都可以订阅“项目进行”事件,而不仅仅是一个只会增加计数器的函数。因此我需要创建一个像观察者/观察者这样的抽象,这样人们就可以订阅item proceeded事件和其他来触发事件等等。所以我必须创建一个适当的基础设施。
    【解决方案4】:

    您可以使用Swampdragon 之类的东西从 Celery 实例访问用户(您必须能够从客户端访问它,注意不要与 CORS thou 发生冲突)。它可以锁在柜台上,而不是模型本身。

    【讨论】:

    • what sort of process worker 为什么不呢?工作人员以 json 对象的形式接收任务数据。他们不知道我的数据库架构。我想对安装在许多不同服务器上的 Django + DB 容器和工作人员进行适当的分离。解析页面的 Worker 不应该知道用户帐户模型是什么,但它应该具有报告任务进度的适当方式。
    • 你看过沼泽龙了吗?它会导致在您的工作人员上运行龙卷风节点的额外工作,但 websockets 只是保持客户端更新的那种东西。
    • 是的,我知道并计划深入研究它,因为它支持 python3,不像 django-redis-websockets。但它似乎还很年轻,不确定它是否可以投入生产。而且我不确定是不是Django方向的会导致代码解耦的一些问题
    【解决方案5】:

    lehins' solution 看起来不错,如果您不介意您的客户反复轮询您的后端。这可能很好,但随着客户数量的增长,它会变得昂贵。

    Artur Barseghyan's solution 适用于只需要 Celery 内部机制生成的任务生命周期事件的情况。

    或者,您可以使用Django Channels 和 WebSockets 将更新实时推送到客户端。设置非常简单。

    1. channels 添加到您的INSTALLED_APPS 并设置通道层。例如,使用 Redis 后端:
    CHANNEL_LAYERS = {
        "default": {
            "BACKEND": "channels_redis.core.RedisChannelLayer",
            "CONFIG": {
                "hosts": [("redis", 6379)]
            }
        }
    }
    
    1. 创建事件使用者。这将从 Channels 接收事件并通过 Websockets 将它们推送到客户端。例如:
    import json
    from asgiref.sync import async_to_sync
    from channels.generic.websocket import WebSocketConsumer
    
    
    class TaskConsumer(WebsocketConsumer):
        def connect(self):
            self.task_id = self.scope['url_route']['kwargs']['task_id'] # your task's identifier
            async_to_sync(self.channel_layer.group_add)(f"tasks-{self.task_id}", self.channel_name)
            self.accept()
    
        def disconnect(self, code):
            async_to_sync(self.channel_layer.group_discard)(f"tasks-{self.task_id}", self.channel_name)
    
        def item_processed(self, event):
            item = event['item']
            self.send(text_data=json.dumps(item))
    
    1. 从您的 Celery 任务中推送事件,如下所示:
    from asgiref.sync import async_to_sync
    from channels.layers import get_channel_layer
    
    ...
    async_to_sync(get_channel_layer.group_send)(f"tasks-{task.task_id}", {
        'type': 'item_processed',
        'item': item,
    })
    

    您还可以编写异步使用者和/或异步调用group_send。无论哪种情况,您都不再需要 async_to_sync 包装器。

    1. websocket_urlpatterns 添加到您的urls.py
    websocket_urlpatterns = [
        path(r'ws/tasks/<task_id>/', TaskConsumer.as_asgi()),
    ]
    
    1. 最后,要在客户端使用 JavaScript 中的事件,您可以执行以下操作:
    let task_id = 123;
    let protocol = location.protocol === 'https:' ? 'wss://' : 'ws://';
    let socket = new WebSocket(`${protocol}${window.location.host}/ws/tasks/${task_id}/`);
    
    socket.onmessage = function(event) {
        let data = JSON.parse(event.data);
        let item = data.item;
        // do something with the item (e.g., push it into your state container)
    }
    

    【讨论】:

      猜你喜欢
      • 2018-03-22
      • 1970-01-01
      • 1970-01-01
      • 2014-12-15
      • 2012-09-06
      • 2014-11-03
      • 2019-05-21
      • 2018-07-30
      • 1970-01-01
      相关资源
      最近更新 更多