【问题标题】:Measuring Celery task execution time测量 Celery 任务执行时间
【发布时间】:2023-03-09 14:47:01
【问题描述】:

我已将一个独立的批处理作业转换为使用 celery 来调度要完成的工作。我正在使用 RabbitMQ。一切都在一台机器上运行,没有其他进程正在使用 RabbitMQ 实例。我的脚本只是创建了一堆由工作人员处理的任务。

有没有一种简单的方法来测量从我的脚本开始到所有任务完成的时间?我知道在使用消息队列时,这在设计上有点复杂。但我不想在生产中这样做,只是为了测试和获得性能估计。

【问题讨论】:

    标签: python rabbitmq celery


    【解决方案1】:

    你可以使用celery signals,注册的函数会在任务执行前后被调用,测量经过的时间很简单:

    from time import time
    from celery.signals import task_prerun, task_postrun
    
    
    d = {}
    
    @task_prerun.connect
    def task_prerun_handler(signal, sender, task_id, task, args, kwargs, **extras):
        d[task_id] = time()
    
    
    @task_postrun.connect
    def task_postrun_handler(signal, sender, task_id, task, args, kwargs, retval, state, **extras):
        try:
            cost = time() - d.pop(task_id)
        except KeyError:
            cost = -1
        print task.__name__, cost
    

    【讨论】:

    • @vikas-prasad kwargs 用于接收“任务关键字参数”,添加**extras 用于 celery 4 兼容性。
    【解决方案2】:

    您可以通过在末尾添加一个假任务来使用chord,该任务将经过发送任务的时间,并返回当前时间与执行时经过的时间之间的差异。

    import celery
    import datetime
    from celery import chord
    
    @celery.task
    def dummy_task(res=None, start_time=None):
        print datetime.datetime.now() - start_time
    
    def send_my_task():
        chord(my_task.s(), dummy_task.s(start_time=datetime.datetime.now()).delay()
    

    send_my_task 发送您想要分析的任务以及dummy_task,它将打印花费了多长时间(或多或少)。如果您想要更准确的数字,我建议将 start_time 直接传递给您的任务,并使用 signals

    【讨论】:

    • 但是 dummy_task 将是另一个任务,可以在不同的工作人员上执行,或者比原始任务更晚。
    • @homm,是的,但是 OP 明确声明只有一个工作节点,并且没有其他进程正在使用 RabbitMQ 节点,因此只计算我们正在测量的任务。唯一的延迟来自上次接收时间测量任务,但和弦是在一个 1 秒的周期计时器上。
    • 没有其他进程,但不是“没有其他任务”,对吧?如果没有空闲的工作进程,dummy_task 将等待。
    • @homm,是的,但是 OP 说除了他的脚本之外没有其他进程使用队列,并且 OP 想要测量从脚本开始到 all的时间> 任务已完成。