【问题标题】:Stop the thread until the celery task finishes停止线程,直到 celery 任务完成
【发布时间】:2013-12-12 09:46:15
【问题描述】:

我有一个 django 网络服务器和一个用户输入信息的表单。每次表单信息更改时,我都会更新数据库中的模型,并且在某些验证的时候,我将在 celery 中创建一个长时间运行的任务,以便在用户单击下一步之前获得我的结果。

我正在使用带有 RabbitMQ 的 Django Celery 作为代理,我的问题是如果任务仍未完成,只需锁定 django 中的响应线程,直到任务状态为 state.SUCCESSFUL 我尝试使用的最合适的方法是什么AsyncResult.get 方法,但它只是将线程锁定很长时间,然后给我结果。 IE 不是即时的,有没有人知道如何解决这个问题?

【问题讨论】:

    标签: python django celery


    【解决方案1】:

    你可以等到结果是ready()

    from time import sleep
    result = some_task.apply_async(args=myargs)
    while not result.ready():
        sleep(0.5)
    result_output = result.get()
    

    似乎还有一个wait(),所以你可以使用它。下面的应该基本上和上面的代码做的是一样的事情。

    result = some_task.apply_async(args=myargs)
    result_output = result.wait(timeout=None, interval=0.5)
    

    【讨论】:

    • 是否需要睡眠循环?
    • sleep 提供了一个等待间隔,您不想在 ready() 检查每个可用周期时都受到冲击。
    • 当然,我想问的是是否有其他解决方案不依赖于“完成了吗”方法
    • hmmm...所以您正在寻找类似回调的东西?你也许可以给任务一个回调任务......或者可能是链任务。 docs.celeryproject.org/en/latest/userguide/canvas.html#chains
    • 新版本的 celery 还有一个更简单的delay() 函数,所以如果你不想在调用任务时添加额外的选项,则不需要apply_async()。只需some_task.delay(myargs)。来源:docs.celeryproject.org/en/latest/userguide/calling.html#example
    【解决方案2】:

    实现这一点的一种方法是让结果在 redis 中等待,并使用 blocking pop 操作使用一些唯一值(如会话 ID)获取它们,注意它的超时功能。

    【讨论】:

    • 这似乎是一个不错的解决方案,但它需要添加 redis 只是为了弹出任务,这可能是一种矫枉过正。不过我会考虑的
    • 也许如果我已经将 redis 作为代理,那将是一个更整洁的选择
    【解决方案3】:

    我在 Celery 版本 5.1.1 / Python 3.9.5 / Django 3.2.x 上的工作示例:

    def import_media():
        keys = []
        urls = []
        for obj in s3_resource.Bucket(env.str('S3_BUCKET')).objects.all():
            if obj.key.endswith(('.m4v', '.mp4', '.m4a', '.mp3')):
                keys.append(obj.key)
        for key in keys:
            url = s3_client.generate_presigned_url(
                ClientMethod='get_object',
                Params={'Bucket': env.str('S3_BUCKET'), 'Key': key},
                ExpiresIn=86400,
            )
            if not Files.objects.filter(descriptor=strip_descriptor_url_scheme(url)).exists():
                new_file = Files.objects.create(descriptor=strip_descriptor_url_scheme(url))
                new_file.save()
                urls.append(url)
        workflow = (
            group([extract_descriptor.s(url) for url in urls]).delay()
        )
        workflow.get(timeout=None, interval=0.5)
        print("hello - Further processing here")
        return None
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-07-04
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-12-25
      • 1970-01-01
      相关资源
      最近更新 更多