【问题标题】:tracking progress of a celery.group task?跟踪 celery.group 任务的进度?
【发布时间】:2013-02-14 11:49:05
【问题描述】:
@celery.task
def my_task(my_object):
    do_something_to_my_object(my_object)


#in the code somewhere 
tasks = celery.group([my_task.s(obj) for obj in MyModel.objects.all()])
group_task = tasks.apply_async()

问题:celery 有什么东西可以检测小组任务的进度吗?我可以知道有多少任务以及处理了多少?

【问题讨论】:

    标签: python django celery django-celery


    【解决方案1】:

    这是基于@dalore 回答的完整工作示例。

    第一个tasks.py

    import time
    from celery import Celery, group
    
    app = Celery('tasks', broker='pyamqp://guest@127.0.0.1//', backend='redis://localhost')
    
    @app.task(trail=True)
    def add(x, y):
        time.sleep(1)
        return x + y
    
    @app.task(trail=True)
    def group_add(l1, l2):
        return group(add.s(x1, x2) for x1, x2 in zip(l1, l2))()
    

    使用 Docker 启动 redis 服务器:docker run --name my-redis -p 6379:6379 -d redis

    使用 Docker 启动 RabbitMQ:docker run -d --hostname my-rabbit --name my-rabbit -p 5672:5672 rabbitmq:alpine

    在单独的 shell 中启动单进程 celery worker:celery -A tasks worker --loglevel=info -c 1

    然后运行下面的测试脚本。

    from tasks import group_add
    from tqdm import tqdm
    
    total = 10
    
    l1 = range(total)
    l2 = range(total)
    delayed_results = group_add.delay(l1, l2)
    delayed_results.get()  # Wait for parent task to be ready.
    
    results = []
    for result in tqdm(delayed_results.children[0], total=total):
        results.append(result.get())
    print(results)
    

    您应该会看到类似以下的内容,其中进度条每秒增加 10%。

    50%|#####     | 5/10 [00:05<00:05,  1.01s/it
    [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
    

    最后,清理你的 redis 和 rabbitmq 容器。

    docker stop my-rabbit my-redis
    docker rm my-rabbit my-redis
    

    【讨论】:

    • 我认为你必须将 'trail=True' 放在 @app.task 中,而不是放在函数的定义中
    • @lbcommer 谢谢。固定的。它是默认设置的,所以没有必要,但正如 dalore 的回答一样,我喜欢它是明确的,因为这种行为取决于它。
    【解决方案2】:

    阅读AsyncResult 的文档,有一个collect 方法可以在结果进入时收集它们。

    http://docs.celeryproject.org/en/latest/reference/celery.result.html#celery.result.AsyncResult.collect

    from celery import group
    from proj.celery import app
    
    @app.task(trail=True)
    def A(how_many):
        return group(B.s(i) for i in range(how_many))()
    
    @app.task(trail=True)
    def B(i):
        return pow2.delay(i)
    
    @app.task(trail=True)
    def pow2(i):
        return i ** 2
    

    示例输出:

    >>> from celery.result import ResultBase
    >>> from proj.tasks import A
    
    >>> result = A.delay(10)
    >>> [v for v in result.collect()
    ...  if not isinstance(v, (ResultBase, tuple))]
    [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
    

    注意: 必须启用Task.trail 选项,以便子列表存储在result.children 中。这是默认设置,但为了说明而明确启用。

    编辑:

    在进一步测试后发现,虽然收集状态它将收集结果,但它仍在等待。我发现要获得进度,您需要获得孩子的结果,如下所示:

    group_result = mygrouptask.delay().get()
    for result in tqdm(group_result.children, total=count):
        yield result.get()
    

    tqdm 在控制台中显示进度

    mygrouptask 是一个返回 celery 组,如下所示:

    return group(mytask.s(arg) for arg in args)()
    

    【讨论】:

    • 这将等待所有子任务完成然后返回他们的结果,当组仍在运行时它不显示进度
    • 更新到实际生产进度,在生产中工作
    【解决方案3】:

    在 shell 上摆弄(ipython 的选项卡自动完成)我发现 group_task(这是一个 celery.result.ResultSet 对象)有一个名为 completed_count 的方法,它提供了我所需要的。

    还在http://docs.celeryproject.org/en/latest/reference/celery.result.html#celery.result.ResultSet.completed_count找到文档

    【讨论】:

    • 您好,您可能已经很久没有遇到这个问题了,但我想知道您如何使用它来跟踪小组任务的进度而不会阻塞..?据我了解,我需要分配result = task_group.apply_async(),但仅仅是分配本身就会阻塞。另一方面,如果我们不分配,我们就没有 completed_count 等的 ResultSet 方法......
    • @zerohedge result = task_group.apply_async() 不应该阻塞等待结果,它会阻塞直到所有任务都入队,如果你有很多任务,这需要一段时间
    猜你喜欢
    • 2016-05-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多