【问题标题】:Django celery worker to send real-time status and result messages to front endDjango celery worker 向前端发送实时状态和结果消息
【发布时间】:2018-05-30 05:20:34
【问题描述】:

在 django 应用程序中,我正在运行异步任务,并希望向用户显示进度、错误等。如果有错误,用户应该被重定向到需要额外输入或一些操作来解决问题的页面。从 celery 工作回到前端的最佳沟通方式是什么?

这是伪代码的基本结构:

# views.py
from tasks import run_task

def view_task():
    run_task.delay()
    return render(request, 'template.html')

# tasks.py
from compute_module import compute_fct

@shared_task
def run_task():
    result = compute_fct()

    # how to catch status update messages from compute_module while compute_fct is running??

    if result == 'error':
        handle_error()
    else:
        handle_succes()     

# compute_module
import pandas as pd

def compute_fct():
    # send message: status = loading file
    df = pd.read_csv('test.csv')
    # send message: status = computing
    val = df['col'].mean()

    if val is None:
        return {'status':'error'}
    else:
        return {'status':'success','val':val}

我最想要的:

  • compute_module.py 模块使用 python 原生记录器。通过职责分离,我希望尽可能保持日志的通用性并使用标准的 python/django 记录器。但它们似乎并非旨在将消息发送到前端。
  • celery 任务以某种方式处理日志,而不是在标准输出上显示它们,而是将它们重定向到推送器
  • 前端js显示并处理消息

芹菜工人和前端之间可能有标准的通信方式,我不知道。这种情况必须经常发生,我很惊讶它如此难以实施。在某种程度上,应该为此设计 rabbitmq 消息队列或 aws sns。以下是我查看过的资源,但感觉它们中的任何一个都不是很好,但也许我只是感到困惑。

日志:这似乎更多是关于在服务器端登录,而不是向用户发送消息

Celery cam 似乎是关于管理员监控任务,而不是向用户发送消息

我喜欢推送器,但我不想让compute_module.py 处理它。例如,我不希望在compute_module.py 中进行任何 pusher.com 集成。猜猜我可以传递一个已经实例化的推送器对象,这样模块就可以推送消息,但我还是希望它是通用的

【问题讨论】:

  • 在您的情况下,进度报告的位置是什么?您运行一项任务,它已完成或出错。如果您运行分解为子任务的任务,您可以使用网络工作者将每个子任务的最终输出推送回客户端吗?我也不是真的 感觉 python 日志记录作为用户反馈机制 - 我怀疑获得 nice 输出,尤其是 html 将比它的价值更麻烦。

标签: python django logging celery pusher


【解决方案1】:

我设法获得实时状态的唯一方法是简单地将一些 SQL 写入/api 调用放入任务本身。使用任务的返回值做事情要容易得多,因为您可以编写自定义任务类。

我不完全确定使用 Django 是如何工作的,但它应该看起来像这样。

class CustomTask(celery.Task):
    def __call__(self, *args, **kwargs):
        self.start_time = time.time()

    def on_success(self, retval, task_id, args, kwargs):
        do_success_stuff()

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        do_failure_stuff()

@shared_task(base=CustomTask)
def do_stuff():
    return create_widgets()

可在此处找到完整列表: http://docs.celeryproject.org/en/latest/userguide/tasks.html#handlers

【讨论】:

  • 好的,这些小部件如何在没有页面引用的情况下显示在 UI 上?
  • 我确信某处有一个优雅的解决方案,但我将任务写入表,然后更新状态列。由于您在开始工作后就拥有了 task_id,因此您可以做一些 jquery 魔术来获得新的状态。也许像giantflyingsaucer.com/blog/?p=4310
【解决方案2】:

编辑:现在移至 django-channels,效果很好,但比下面的解决方案更复杂。

上一个:

好的,下面是我现在如何解决它的伪代码。基本上我使用https://pusher.com/docs/javascript_quick_start 和服务器端将实例化的对象传递给compute_module。一个缺点是推送消息是短暂的,所以我将不得不在LogPusher 中做一些额外的工作来将它们存储在数据库中,改天再做一些事情......

此外,在我的实际实现中,我通过 $(document).ready() 中的 $.post() ajax 调用触发任务,因为小任务完成得如此之快,用户永远不会看到推送消息,因为未建立连接(回到那个历史消息问题)。

我上面没有提到的另一条替代路线是https://channels.readthedocs.io/en/latest/

[编辑]另一个解决方案是Server-sent events,它有django implementations,没有测试过。但它看起来很适合单向更新,例如从服务器到客户端(与 websockets 双向)。您需要像 redis pubsub 这样的消息传递系统来获取服务器 sse 路由的更新。

通过推送器从 django 服务器进行前端更新:

# views.py
from tasks import run_task

def view_task():
    run_task.delay('event')
    return render(request, 'template.html', 'pusher_event':'event')

    
# tasks.py
import pusher
from django.conf import settings
from compute_module import compute_fct

class LogPusher(object):
    def __init__(self, event):
        self.pusher_client = pusher.Pusher(app_id=settings.PUSHER_APP_ID,
                        key=settings.PUSHER_KEY,
                        secret=settings.PUSHER_SECRET,
                        cluster=settings.PUSHER_CLUSTER, ssl=True)
        self.event = event
        
    def send(self, data):
        self.pusher_client.trigger(settings.PUSHER_CHANNEL, self.event, json.dumps(data))

@shared_task
def run_task(pusher_event):
    
    log_pusher = LogPusher(pusher_event)
    result = compute_fct(log_pusher)

    # how to catch status update messages from compute_module while compute_fct is running??

    if result == 'error':
            log_pusher.send('status':'error')
    else:
            log_pusher.send('status':'success')

            
# compute_module.py
import pandas as pd

def compute_fct(log_pusher):
    # send message: status = loading file
    log_pusher.send('status':'loading file')
    df = pd.read_csv('test.csv')
    # send message: status = computing
    log_pusher.send('status':'computing')
    val = df['col'].mean()

    if val is None:
        return {'status':'error'}
    else:
        return {'status':'success','val':val}
        

# context_processors.py
# see https://stackoverflow.com/questions/433162/can-i-access-constants-in-settings-py-from-templates-in-django
from django.conf import settings 

def pusher(request):
    return {'PUSHER_KEY': settings.PUSHER_KEY, 'PUSHER_CLUSTER': settings.PUSHER_CLUSTER , 'PUSHER_CHANNEL': settings.PUSHER_CHANNEL }

        
# template.html
<script>
    
var pusher = new Pusher("{{PUSHER_KEY}}", {
  cluster: "{{PUSHER_CLUSTER}}",
  encrypted: true    
});

var channel = pusher.subscribe("{{PUSHER_CHANNEL}}");
channel.bind("{{pusher_event}}", function(data) {
    // process data
});

</script>

【讨论】:

    【解决方案3】:

    有一个名为 celery-progress 的库可能会有所帮助 celery-progress library

    他还发表了一篇关于手动操作的博客文章: blog about celery progress bars

    【讨论】:

    • 虽然此链接可能会回答问题,但最好在此处包含答案的基本部分并提供链接以供参考。如果链接页面发生更改,仅链接答案可能会失效。 - From Review
    猜你喜欢
    • 2017-06-09
    • 2019-08-06
    • 1970-01-01
    • 2015-05-20
    • 2022-01-20
    • 1970-01-01
    • 2019-08-15
    • 2015-03-17
    • 2017-10-28
    相关资源
    最近更新 更多