【问题标题】:How can I ensure that sub-tasks of a Celery task go into the same queue as the parent task?如何确保 Celery 任务的子任务与父任务进入同一队列?
【发布时间】:2015-10-20 18:58:59
【问题描述】:

我有一个 celery 任务,它可能会排队其他子任务。如果工作人员从高优先级队列中拉出该任务,然后该任务将其他任务排队,我希望新任务返回进入高优先级队列。但是如何以编程方式获取当前正在执行的任务来自的队列?

我知道我可以做一些事情,比如将一个附加参数传递给原始的 my_task.apply_async() 调用,它指定一个用于子任务的队列,然后我可以通过紧缩的方法/类链传递它任务,但这似乎很混乱且难以维护。似乎只需询问 Celery 即可在某处获得队列信息。

【问题讨论】:

    标签: python celery


    【解决方案1】:

    我发现队列信息可以通过current_task.request.delivery_info['exchange']获得。
    所以,我最终使用的解决方案如下:

    def get_source_queue(default=None):
        """
        Finds and returns the queue that the currently-executing task (if any) came from.
        """
        from celery import current_task
        if current_task is not None and 'exchange' in current_task.request.delivery_info:
            source_queue = current_task.request.delivery_info['exchange']
            if source_queue is not None:
                return source_queue
        return default
    

    然后我将它与这样的子任务一起使用:

    my_task.apply_async(args=('my', 'args'), queue=get_source_queue(default='foo_queue'))
    


    我不知道这是否是最好的方法......也许芹菜中内置了一些东西,上面写着“使用与源队列相同的队列”(?)但是,上述工作。

    【讨论】:

    猜你喜欢
    • 2013-11-20
    • 1970-01-01
    • 2015-06-16
    • 1970-01-01
    • 1970-01-01
    • 2019-05-19
    • 2020-08-11
    • 2017-04-02
    • 2012-04-12
    相关资源
    最近更新 更多